Compare commits

...

5 Commits

Author SHA1 Message Date
ryanfkeepers
8299bb30c4 fix cherry pick conflicts 2023-03-22 15:18:49 -06:00
Keepers
77c6495bda additional logging when creating backup model (#2916)
- [x] 🤖 Supportability/Tests

* #2913
2023-03-22 15:14:13 -06:00
Keepers
a46a591efc add 10 day deadline to cli context (#2852)
Ensures all CLI usage has a context deadline so that we
don't bump into the graph api 100 second deadline.

---

- [x]  Yes, it's included

- [x] 🐛 Bugfix

- [x] 💪 Manual
2023-03-17 16:51:54 -07:00
Ashlie Martinez
9b1af517d2 Update logging to include retry-after header
Other headers seem to have empty values right now, but this will show us
the number of seconds after which a request should be retried.
2023-03-17 08:33:27 -07:00
Ashlie Martinez
43ccb4bccb Add option to control parallelism getting items
This option is only consumed by Exchange right now and will control the
number of items fetched in parallel per collection. Note that the total
number of parallel item fetches may be higher is kopia is concurrently
uploading multiple collections.
2023-03-17 08:33:27 -07:00
10 changed files with 90 additions and 46 deletions

View File

@ -14,6 +14,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- Fix repo connect not working without a config file - Fix repo connect not working without a config file
- Fix item re-download on expired links silently being skipped - Fix item re-download on expired links silently being skipped
- Improved permissions backup and restore for OneDrive
- CLI calls default to a 10-day context deadline to avoid letting graph api restrict requests to a 100 second deadline.
### Known Issues
- Owner (Full control) or empty (Restricted View) roles cannot be restored for OneDrive
## [v0.5.0] (beta) - 2023-03-13 ## [v0.5.0] (beta) - 2023-03-13

View File

@ -5,6 +5,7 @@ import (
"os" "os"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -146,8 +147,12 @@ func BuildCommandTree(cmd *cobra.Command) {
// Handle builds and executes the cli processor. // Handle builds and executes the cli processor.
func Handle() { func Handle() {
tenDays := time.Now().Add(time.Hour * 24 * 10)
//nolint:forbidigo //nolint:forbidigo
ctx := config.Seed(context.Background()) ctx, cancel := context.WithDeadline(context.Background(), tenDays)
defer cancel()
ctx = config.Seed(ctx)
ctx = print.SetRootCmd(ctx, corsoCmd) ctx = print.SetRootCmd(ctx, corsoCmd)
observe.SeedWriter(ctx, print.StderrWriter(ctx), observe.PreloadFlags()) observe.SeedWriter(ctx, print.StderrWriter(ctx), observe.PreloadFlags())

View File

@ -197,7 +197,21 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
} }
// Limit the max number of active requests to GC // Limit the max number of active requests to GC
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) fetchParallelism := col.ctrl.ItemFetchParallelism
if fetchParallelism == 0 || fetchParallelism > urlPrefetchChannelBufferSize {
fetchParallelism = urlPrefetchChannelBufferSize
logger.Ctx(ctx).Infow(
"fetch parallelism value not set or out of bounds, using default",
"default_parallelism",
urlPrefetchChannelBufferSize,
"requested_paralellism",
col.ctrl.ItemFetchParallelism,
)
}
logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism)
semaphoreCh := make(chan struct{}, fetchParallelism)
defer close(semaphoreCh) defer close(semaphoreCh)
// delete all removed items // delete all removed items

View File

@ -51,10 +51,7 @@ func filterContainersAndFillCollections(
tombstones = makeTombstones(dps) tombstones = makeTombstones(dps)
) )
logger.Ctx(ctx).Infow( logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
"filling collections",
"metadata_count",
len(dps))
// TODO(rkeepers): this should be passed in from the caller, probably // TODO(rkeepers): this should be passed in from the caller, probably
// as an interface that satisfies the NewCollection requirements. // as an interface that satisfies the NewCollection requirements.

View File

@ -23,6 +23,7 @@ import (
const ( const (
logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS" logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS"
log2xxGraphRequestsEnvKey = "LOG_2XX_GRAPH_REQUESTS"
numberOfRetries = 3 numberOfRetries = 3
retryAttemptHeader = "Retry-Attempt" retryAttemptHeader = "Retry-Attempt"
retryAfterHeader = "Retry-After" retryAfterHeader = "Retry-After"
@ -32,6 +33,7 @@ const (
rateLimitHeader = "RateLimit-Limit" rateLimitHeader = "RateLimit-Limit"
rateRemainingHeader = "RateLimit-Remaining" rateRemainingHeader = "RateLimit-Remaining"
rateResetHeader = "RateLimit-Reset" rateResetHeader = "RateLimit-Reset"
defaultHTTPClientTimeout = 1 * time.Hour
) )
// AllMetadataFileNames produces the standard set of filenames used to store graph // AllMetadataFileNames produces the standard set of filenames used to store graph
@ -195,7 +197,7 @@ func HTTPClient(opts ...option) *http.Client {
noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig() noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig()
middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay) middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay)
httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
httpClient.Timeout = time.Minute * 3 httpClient.Timeout = defaultHTTPClientTimeout
clientconfig.apply(httpClient) clientconfig.apply(httpClient)
@ -320,7 +322,8 @@ func (handler *LoggingMiddleware) Intercept(
"url", req.URL, "url", req.URL,
"limit", resp.Header.Get(rateLimitHeader), "limit", resp.Header.Get(rateLimitHeader),
"remaining", resp.Header.Get(rateRemainingHeader), "remaining", resp.Header.Get(rateRemainingHeader),
"reset", resp.Header.Get(rateResetHeader)) "reset", resp.Header.Get(rateResetHeader),
"retry-after", resp.Header.Get(retryAfterHeader))
} else if resp.StatusCode == http.StatusBadRequest { } else if resp.StatusCode == http.StatusBadRequest {
respDump, _ := httputil.DumpResponse(resp, true) respDump, _ := httputil.DumpResponse(resp, true)
logger.Ctx(ctx).Infow( logger.Ctx(ctx).Infow(

View File

@ -3,7 +3,6 @@ package graph
import ( import (
"net/http" "net/http"
"testing" "testing"
"time"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -54,7 +53,7 @@ func (suite *GraphUnitSuite) TestHTTPClient() {
name: "no options", name: "no options",
opts: []option{}, opts: []option{},
check: func(t *testing.T, c *http.Client) { check: func(t *testing.T, c *http.Client) {
assert.Equal(t, 3*time.Minute, c.Timeout, "default timeout") assert.Equal(t, defaultHTTPClientTimeout, c.Timeout, "default timeout")
}, },
}, },
{ {

View File

@ -960,14 +960,16 @@ func inflateBaseTree(
return nil return nil
} }
ctx = clues.Add(ctx, "snapshot_base_id", snap.ID)
root, err := loader.SnapshotRoot(snap.Manifest) root, err := loader.SnapshotRoot(snap.Manifest)
if err != nil { if err != nil {
return errors.Wrapf(err, "getting snapshot %s root directory", snap.ID) return clues.Wrap(err, "getting snapshot root directory").WithClues(ctx)
} }
dir, ok := root.(fs.Directory) dir, ok := root.(fs.Directory)
if !ok { if !ok {
return errors.Errorf("snapshot %s root is not a directory", snap.ID) return clues.New("snapshot root is not a directory").WithClues(ctx)
} }
// For each subtree corresponding to the tuple // For each subtree corresponding to the tuple
@ -976,27 +978,28 @@ func inflateBaseTree(
for _, subtreePath := range snap.SubtreePaths { for _, subtreePath := range snap.SubtreePaths {
// We're starting from the root directory so don't need it in the path. // We're starting from the root directory so don't need it in the path.
pathElems := encodeElements(subtreePath.PopFront().Elements()...) pathElems := encodeElements(subtreePath.PopFront().Elements()...)
ictx := clues.Add(ctx, "subtree_path", subtreePath)
ent, err := snapshotfs.GetNestedEntry(ctx, dir, pathElems) ent, err := snapshotfs.GetNestedEntry(ictx, dir, pathElems)
if err != nil { if err != nil {
if isErrEntryNotFound(err) { if isErrEntryNotFound(err) {
logger.Ctx(ctx).Infow("base snapshot missing subtree", "error", err) logger.CtxErr(ictx, err).Infow("base snapshot missing subtree")
continue continue
} }
return errors.Wrapf(err, "snapshot %s getting subtree root", snap.ID) return clues.Wrap(err, "getting subtree root").WithClues(ictx)
} }
subtreeDir, ok := ent.(fs.Directory) subtreeDir, ok := ent.(fs.Directory)
if !ok { if !ok {
return errors.Wrapf(err, "snapshot %s subtree root is not directory", snap.ID) return clues.Wrap(err, "subtree root is not directory").WithClues(ictx)
} }
// We're assuming here that the prefix for the path has not changed (i.e. // We're assuming here that the prefix for the path has not changed (i.e.
// all of tenant, service, resource owner, and category are the same in the // all of tenant, service, resource owner, and category are the same in the
// old snapshot (snap) and the snapshot we're currently trying to make. // old snapshot (snap) and the snapshot we're currently trying to make.
if err = traverseBaseDir( if err = traverseBaseDir(
ctx, ictx,
0, 0,
updatedPaths, updatedPaths,
subtreePath.Dir(), subtreePath.Dir(),
@ -1004,7 +1007,7 @@ func inflateBaseTree(
subtreeDir, subtreeDir,
roots, roots,
); err != nil { ); err != nil {
return errors.Wrapf(err, "traversing base snapshot %s", snap.ID) return clues.Wrap(err, "traversing base snapshot").WithClues(ictx)
} }
} }
@ -1042,9 +1045,13 @@ func inflateDirTree(
baseIDs = append(baseIDs, snap.ID) baseIDs = append(baseIDs, snap.ID)
} }
logger.Ctx(ctx).Infow( ctx = clues.Add(ctx, "len_base_snapshots", len(baseSnaps), "base_snapshot_ids", baseIDs)
"merging hierarchies from base snapshots",
"snapshot_ids", baseIDs) if len(baseIDs) > 0 {
logger.Ctx(ctx).Info("merging hierarchies from base snapshots")
} else {
logger.Ctx(ctx).Info("no base snapshots to merge")
}
for _, snap := range baseSnaps { for _, snap := range baseSnaps {
if err = inflateBaseTree(ctx, loader, snap, updatedPaths, roots); err != nil { if err = inflateBaseTree(ctx, loader, snap, updatedPaths, roots); err != nil {

View File

@ -202,7 +202,7 @@ func (w Wrapper) makeSnapshotWithRoot(
bc = &stats.ByteCounter{} bc = &stats.ByteCounter{}
) )
snapIDs := make([]manifest.ID, 0, len(prevSnapEntries)) snapIDs := make([]manifest.ID, 0, len(prevSnapEntries)) // just for logging
prevSnaps := make([]*snapshot.Manifest, 0, len(prevSnapEntries)) prevSnaps := make([]*snapshot.Manifest, 0, len(prevSnapEntries))
for _, ent := range prevSnapEntries { for _, ent := range prevSnapEntries {
@ -210,9 +210,17 @@ func (w Wrapper) makeSnapshotWithRoot(
snapIDs = append(snapIDs, ent.ID) snapIDs = append(snapIDs, ent.ID)
} }
logger.Ctx(ctx).Infow( ctx = clues.Add(
"using snapshots for kopia-assisted incrementals", ctx,
"snapshot_ids", snapIDs) "len_prev_base_snapshots", len(prevSnapEntries),
"assist_snap_ids", snapIDs,
"additional_tags", addlTags)
if len(snapIDs) > 0 {
logger.Ctx(ctx).Info("using snapshots for kopia-assisted incrementals")
} else {
logger.Ctx(ctx).Info("no base snapshots for kopia-assisted incrementals")
}
tags := map[string]string{} tags := map[string]string{}

View File

@ -661,20 +661,23 @@ func (op *BackupOperation) createBackupModels(
detailsStore streamstore.Writer, detailsStore streamstore.Writer,
snapID string, snapID string,
backupID model.StableID, backupID model.StableID,
backupDetails *details.Details, deets *details.Details,
) error { ) error {
ctx = clues.Add(ctx, "snapshot_id", snapID) ctx = clues.Add(ctx, "snapshot_id", snapID, "backup_id", backupID)
if backupDetails == nil { if deets == nil {
return clues.New("no backup details to record").WithClues(ctx) return clues.New("no backup details to record").WithClues(ctx)
} }
detailsID, err := detailsStore.Write(ctx, backupDetails, op.Errors) ctx = clues.Add(ctx, "details_entry_count", len(deets.Entries))
detailsID, err := detailsStore.Write(ctx, deets, op.Errors)
if err != nil { if err != nil {
return clues.Wrap(err, "creating backupDetails model").WithClues(ctx) return clues.Wrap(err, "creating backupDetails model").WithClues(ctx)
} }
ctx = clues.Add(ctx, "details_id", detailsID) ctx = clues.Add(ctx, "details_snapshot_id", detailsID)
b := backup.New( b := backup.New(
snapID, detailsID, op.Status.String(), snapID, detailsID, op.Status.String(),
backupID, backupID,
@ -683,6 +686,8 @@ func (op *BackupOperation) createBackupModels(
op.Results.StartAndEndTime, op.Results.StartAndEndTime,
op.Errors) op.Errors)
logger.Ctx(ctx).Info("creating new backup")
if err = op.store.Put(ctx, model.BackupSchema, b); err != nil { if err = op.store.Put(ctx, model.BackupSchema, b); err != nil {
return clues.Wrap(err, "creating backup model").WithClues(ctx) return clues.Wrap(err, "creating backup model").WithClues(ctx)
} }

View File

@ -11,6 +11,7 @@ type Options struct {
FailFast bool `json:"failFast"` FailFast bool `json:"failFast"`
RestorePermissions bool `json:"restorePermissions"` RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"` SkipReduce bool `json:"skipReduce"`
ItemFetchParallelism int `json:"itemFetchParallelism"`
ToggleFeatures Toggles `json:"ToggleFeatures"` ToggleFeatures Toggles `json:"ToggleFeatures"`
} }