Create S3 retention checker (#3923)

Create a CLI utility that checks retention config
information for a small set of objects. Utility
works as follows:
* given a set of prefixes find one current version
  of each object
* for each object selected above, retrieve the
  retention config
* compare retention mode and expiry with the given
  mode and an expiry calculated from the current
  time

To attempt to reduce costs, the utility uses the
list API to fetch object names/versions and match
prefixes. It will cancel listing objects (i.e.
stop fetching new pages) once all prefixes are
matched

Also includes a flag to select non-current versions
of objects with the same prefix. However, setting
this flag may lead to more API calls because some
objects may not have non-current versions. In that
case all objects will be listed

sample usage to check current and non-current versions
of an object with prefix 'x' and current and
non-current versions of an object with prefix 'q'.
The current versions should expire in >= 8h and the
non-current versions should expire <= 7h
```
s3checker --bucket <> --bucket-prefix <> \
  --prefix x --prefix q \
  --retention-mode GOVERNANCE --live-retention-duration 8h \
  --with-non-current --dead-retention-duration 7h
```

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [x] 🤖 Supportability/Tests
- [x] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #3799

#### Test Plan

- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-08-02 11:23:39 -07:00 committed by GitHub
parent c1de34d438
commit a670c07735
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 582 additions and 0 deletions

View File

@ -0,0 +1,206 @@
package s3
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/alcionai/clues"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const noLockCode = "NoSuchObjectLockConfiguration"
var ErrNoRetention = clues.New("missing ObjectLock configuration")
type Options struct {
BucketName string
Prefix string
Endpoint string
DoNotUseTLS bool
DoNotVerifyTLS bool
AccessKeyID string
SecretAccessKey string
SessionToken string
Region string
}
type ObjInfo struct {
Key string
Version string
}
type Client struct {
client *minio.Client
bucketName string
prefix string
}
// Below init code scavenged from
// https://github.com/kopia/kopia/blob/83b88d8bbf60f4eb17b40dea440c08b594b9e5d3/repo/blob/s3/s3_storage.go
func getCustomTransport(opt *Options) *http.Transport {
if opt.DoNotVerifyTLS {
return &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}
return http.DefaultTransport.(*http.Transport).Clone()
}
func New(opt *Options) (*Client, error) {
creds := credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.Static{
Value: credentials.Value{
AccessKeyID: opt.AccessKeyID,
SecretAccessKey: opt.SecretAccessKey,
SessionToken: opt.SessionToken,
SignerType: credentials.SignatureV4,
},
},
&credentials.EnvAWS{},
&credentials.IAM{
Client: &http.Client{
Transport: http.DefaultTransport,
},
},
})
if len(opt.BucketName) == 0 {
return nil, clues.New("empty bucket name")
}
minioOpts := &minio.Options{
Creds: creds,
Secure: !opt.DoNotUseTLS,
Region: opt.Region,
}
minioOpts.Transport = getCustomTransport(opt)
cli, err := minio.New(opt.Endpoint, minioOpts)
if err != nil {
return nil, clues.Wrap(err, "creating client")
}
return &Client{
client: cli,
bucketName: opt.BucketName,
prefix: opt.Prefix,
}, nil
}
func (c *Client) ListUntilAllFound(
ctx context.Context,
wantedPrefixes []string,
alsoFindDeleted bool,
) (map[string]ObjInfo, map[string]ObjInfo, error) {
notDeleted := map[string]ObjInfo{}
deleted := map[string]ObjInfo{}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
objChan := c.client.ListObjects(
ctx,
c.bucketName,
minio.ListObjectsOptions{
Prefix: c.prefix,
WithVersions: alsoFindDeleted,
})
for obj := range objChan {
if err := obj.Err; err != nil {
return notDeleted, deleted, clues.Wrap(err, "searching object list")
}
// Getting retention info for a deletion marker isn't allowed.
if obj.IsDeleteMarker {
continue
}
var (
matchesPrefix string
trimmedKey = obj.Key[len(c.prefix):]
)
for _, prefix := range wantedPrefixes {
if strings.HasPrefix(trimmedKey, prefix) {
matchesPrefix = prefix
break
}
}
// Item doesn't have a prefix that matches anything we want.
if len(matchesPrefix) == 0 {
continue
}
objI := ObjInfo{
Key: obj.Key,
Version: obj.VersionID,
}
// Since we skip deletion markers above, if the item is the latest version
// then it's considered not deleted. If it's not the latest then it's likely
// deleted. Not asking for versions always sets IsLatest to false.
//
// Doesn't really matter if we overwrite a previous selected item so long as
// we pick a matching item at some point along the way.
if !alsoFindDeleted || obj.IsLatest {
notDeleted[matchesPrefix] = objI
} else {
deleted[matchesPrefix] = objI
}
// Break if we've found all the non-deleted items we're looking for and
// either we're not looking for deleted items or we've found all the deleted
// items we're looking for.
if len(notDeleted) == len(wantedPrefixes) &&
(!alsoFindDeleted || len(deleted) == len(wantedPrefixes)) {
break
}
}
return notDeleted, deleted, nil
}
func (c *Client) ObjectRetention(
ctx context.Context,
obj ObjInfo,
) (*minio.RetentionMode, *time.Time, error) {
mode, retainUntil, err := c.client.GetObjectRetention(
ctx,
c.bucketName,
obj.Key,
obj.Version)
if err != nil {
// Unfortunately they don't have a sentinel type that we can compare
// against. We can check the error code though.
var e minio.ErrorResponse
if errors.As(err, &e) {
if e.Code == noLockCode {
return nil, nil, clues.Stack(ErrNoRetention, err)
}
}
}
return mode, retainUntil, clues.Wrap(err, fmt.Sprintf(
"getting object (key) %q (versionID) %q",
obj.Key,
obj.Version,
)).
With("object_key", obj.Key, "object_version", obj.Version).
OrNil()
}

View File

@ -0,0 +1,376 @@
package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/alcionai/clues"
"github.com/minio/minio-go/v7"
"github.com/spf13/cobra"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/cli/config"
"github.com/alcionai/corso/src/cmd/s3checker/pkg/s3"
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/storage"
)
// Matches other definitions of this const.
const defaultS3Endpoint = "s3.amazonaws.com"
type flags struct {
bucket string
bucketPrefix string
prefixes []string
withDeleted bool
liveRetentionDuration time.Duration
deadRetentionDuration time.Duration
retentionMode string
}
func checkerCommand() (*cobra.Command, error) {
f := flags{}
cmd := &cobra.Command{
Use: "check",
Short: "Check S3 objects' retention properties",
RunE: func(cmd *cobra.Command, args []string) error {
return handleCheckerCommand(cmd, args, f)
},
}
fs := cmd.Flags()
// AWS/global config.
fs.StringVar(
&f.bucket,
"bucket",
"",
"Name of bucket to check")
fs.StringVar(
&f.bucketPrefix,
"bucket-prefix",
"",
"Prefix to add to all object lookups")
fs.StringSliceVar(
&f.prefixes,
"prefix",
nil,
"Set of object prefixes to check. Pass multiple times for multiple prefixes")
// Live object config.
fs.StringVar(
&f.retentionMode,
"retention-mode",
"",
"Retention mode to check for on live objects")
fs.DurationVar(
&f.liveRetentionDuration,
"live-retention-duration",
0,
"Minimum amount of time from now that live objects should be locked for")
// Dead object config.
fs.BoolVar(
&f.withDeleted,
"with-non-current",
false,
"Whether to check non-current objects")
fs.DurationVar(
&f.deadRetentionDuration,
"dead-retention-duration",
0,
"Maximum amount of time from now that dead objects should be locked for. "+
"Can be negative if dead object lockss should have expired already")
required := []string{
"bucket",
"prefix",
"retention-mode",
"live-retention-duration",
}
for _, req := range required {
if err := cmd.MarkFlagRequired(req); err != nil {
return nil, clues.Wrap(err, "setting flag "+req+" as required")
}
}
return cmd, nil
}
func main() {
cmd, err := checkerCommand()
if err != nil {
fmt.Printf("%v\n", err)
os.Exit(1)
}
ls := logger.Settings{
Level: logger.LLDebug,
Format: logger.LFText,
}
ctx, _ := logger.CtxOrSeed(context.Background(), ls)
defer func() {
if err := crash.Recovery(ctx, recover(), "s3Checker"); err != nil {
logger.CtxErr(ctx, err).Error("panic in s3 checker")
}
logger.Flush(ctx)
}()
if err := cmd.ExecuteContext(ctx); err != nil {
logger.Flush(ctx)
os.Exit(1)
}
}
func validateFlags(f flags) error {
if f.liveRetentionDuration <= 0 {
return clues.New("live object retention duration must be > 0")
}
if f.retentionMode != "GOVERNANCE" && f.retentionMode != "COMPLIANCE" {
return clues.New("invalid retention mode")
}
return nil
}
func reportMissingPrefixes(
objDescriptor string,
wanted []string,
got map[string]s3.ObjInfo,
) error {
var err error
for _, want := range wanted {
if _, ok := got[want]; !ok {
fmt.Printf("missing %s object for prefix %q\n", objDescriptor, want)
err = clues.Stack(
err,
clues.New("missing "+objDescriptor+" object prefix \""+want+"\""))
}
}
return err
}
func handleCheckerCommand(cmd *cobra.Command, args []string, f flags) error {
if len(f.prefixes) == 0 {
return nil
}
if err := validateFlags(f); err != nil {
return clues.Stack(err)
}
cmd.SilenceUsage = true
fmt.Printf("Checking objects with prefix(es) %v\n", f.prefixes)
if err := config.InitFunc(cmd, args); err != nil {
return clues.Wrap(err, "setting viper")
}
ctx := cmd.Context()
// Scavenged from src/internal/kopia/s3/s3.go.
overrides := map[string]string{
storage.Bucket: f.bucket,
storage.Prefix: f.bucketPrefix,
}
repoDetails, err := config.GetConfigRepoDetails(ctx, false, false, overrides)
if err != nil {
return clues.Wrap(err, "getting storage config")
}
cfg, err := repoDetails.Storage.S3Config()
if err != nil {
return clues.Wrap(err, "getting S3 config")
}
endpoint := defaultS3Endpoint
if len(cfg.Endpoint) > 0 {
endpoint = cfg.Endpoint
}
opts := &s3.Options{
BucketName: cfg.Bucket,
Endpoint: endpoint,
Prefix: cfg.Prefix,
DoNotUseTLS: cfg.DoNotUseTLS,
DoNotVerifyTLS: cfg.DoNotVerifyTLS,
AccessKeyID: cfg.AccessKey,
SecretAccessKey: cfg.SecretKey,
SessionToken: cfg.SessionToken,
}
client, err := s3.New(opts)
if err != nil {
return clues.Wrap(err, "initializing S3 client")
}
live, dead, err := client.ListUntilAllFound(ctx, f.prefixes, f.withDeleted)
if err != nil {
return clues.Wrap(err, "getting objects to check")
}
// Reset error so we can return something at the end.
err = nil
if err2 := reportMissingPrefixes("live", f.prefixes, live); err2 != nil {
err = clues.Stack(err, clues.New("some live objects missing"))
}
if f.withDeleted {
// Only print here because it's possible there aren't dead objects for the
// given prefix.
//nolint:errcheck
reportMissingPrefixes("dead", f.prefixes, dead)
}
now := time.Now()
retentionMode := minio.RetentionMode(f.retentionMode)
lowerBound := now.Add(f.liveRetentionDuration)
upperBound := now.Add(f.deadRetentionDuration)
liveErrs := checkObjsWithRetention(
ctx,
client,
maps.Values(live),
hasAtLeastRetention(retentionMode, lowerBound))
deadErrs := checkObjsWithRetention(
ctx,
client,
maps.Values(dead),
hasAtMostRetention(upperBound))
if len(liveErrs) > 0 {
fmt.Printf("%d error(s) checking live object retention\n", len(liveErrs))
for i, err := range liveErrs {
fmt.Printf("\t%d: %s\n", i, err.Error())
}
err = clues.Stack(err, clues.New("live objects"))
} else {
fmt.Println("no errors for live objects")
}
if len(deadErrs) > 0 {
fmt.Printf("%d error(s) checking dead object retention\n", len(deadErrs))
for i, err := range deadErrs {
fmt.Printf("\t%d: %s\n", i, err.Error())
}
err = clues.Stack(err, clues.New("dead objects"))
} else {
fmt.Println("no errors for dead objects")
}
return err
}
// hasAtLeastRetention takes a mode and time and returns a function that checks
// that objects have that retention mode and will expire at or after the given
// time.
func hasAtLeastRetention(
wantedMode minio.RetentionMode,
lowerBound time.Time,
) func(*minio.RetentionMode, *time.Time) error {
return func(mode *minio.RetentionMode, expiry *time.Time) error {
if mode == nil {
return clues.New("nil retention mode")
}
if expiry == nil {
return clues.New("nil retention expiry")
}
if wantedMode != *mode {
return clues.New("unexpected retention mode " + string(*mode))
}
if expiry.Before(lowerBound) {
return clues.New("unexpected retention expiry " + expiry.String())
}
return nil
}
}
// hasAtMostRetention takes a time and returns a function that checks that
// objects have either no retention set or retention that expires at or before
// the given time.
func hasAtMostRetention(
upperBound time.Time,
) func(*minio.RetentionMode, *time.Time) error {
return func(mode *minio.RetentionMode, expiry *time.Time) error {
// We turn expired retention into (nil, nil) for ease of writing checks.
if mode == nil && expiry == nil {
return nil
}
if mode == nil {
return clues.New("nil retention mode")
}
if expiry == nil {
return clues.New("nil retention expiry")
}
if upperBound.Before(*expiry) {
return clues.New("unexpected retention expiry " + expiry.String())
}
return nil
}
}
// checkObjsWithRetention takes a set of objs to check retention for and a
// function to use to check retention and returns a slice of errors, one for
// every item that either could not be fetched or that failed the retention
// check.
func checkObjsWithRetention(
ctx context.Context,
client *s3.Client,
objs []s3.ObjInfo,
checkFunc func(*minio.RetentionMode, *time.Time) error,
) []error {
var errs []error
for _, obj := range objs {
mode, expiry, err := client.ObjectRetention(ctx, obj)
// Locks that have expired start returning errors instead. Turn those
// specific errors into (nil, nil) so that writing checks is a bit easier.
if errors.Is(err, s3.ErrNoRetention) {
mode = nil
expiry = nil
err = nil
}
if err != nil {
errs = append(errs, clues.Stack(err))
continue
}
if err := checkFunc(mode, expiry); err != nil {
errs = append(errs, clues.Wrap(err, fmt.Sprintf(
"checking object (key) %q (versionID) %q",
obj.Key,
obj.Version)))
}
}
return errs
}