diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 82d8b07c7..2fdc6b7d8 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -554,7 +554,7 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression( control.Options{}, newStatusUpdater(t, &wg)) require.NoError(t, err) - require.Equal(t, len(collections), 2) + require.Len(t, collections, 2) wg.Add(len(collections)) diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 04030871a..859892a1c 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -122,6 +122,7 @@ func (w Wrapper) BackupCollections( service path.ServiceType, oc *OwnersCats, tags map[string]string, + buildTreeWithBase bool, ) (*BackupStats, *details.Builder, map[string]path.Path, error) { if w.c == nil { return nil, nil, nil, errNotConnected @@ -140,7 +141,15 @@ func (w Wrapper) BackupCollections( toMerge: map[string]path.Path{}, } - dirTree, err := inflateDirTree(ctx, w.c, previousSnapshots, collections, progress) + // When running an incremental backup, we need to pass the prior + // snapshot bases into inflateDirTree so that the new snapshot + // includes historical data. + var base []IncrementalBase + if buildTreeWithBase { + base = previousSnapshots + } + + dirTree, err := inflateDirTree(ctx, w.c, base, collections, progress) if err != nil { return nil, nil, nil, errors.Wrap(err, "building kopia directories") } diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index c93b209c4..4027ec9f8 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -272,6 +272,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { path.ExchangeService, oc, customTags, + true, ) assert.NoError(t, err) @@ -353,6 +354,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { path.ExchangeService, oc, nil, + true, ) require.NoError(t, err) @@ -435,6 +437,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { path.ExchangeService, oc, nil, + true, ) require.NoError(t, err) @@ -480,6 +483,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() path.UnknownService, &OwnersCats{}, nil, + true, ) require.NoError(t, err) @@ -641,6 +645,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { path.ExchangeService, oc, nil, + false, ) require.NoError(t, err) require.Equal(t, stats.ErrorCount, 0) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 19a043c3e..cd7a18be9 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -92,6 +92,10 @@ type detailsWriter interface { WriteBackupDetails(context.Context, *details.Details) (string, error) } +// --------------------------------------------------------------------------- +// Primary Controller +// --------------------------------------------------------------------------- + // Run begins a synchronous backup operation. func (op *BackupOperation) Run(ctx context.Context) (err error) { ctx, end := D.Span(ctx, "operations:backup:run") @@ -104,6 +108,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { tenantID = op.account.ID() startTime = time.Now() detailsStore = streamstore.New(op.kopia, tenantID, op.Selectors.PathService()) + oc = selectorToOwnersCats(op.Selectors) + uib = useIncrementalBackup(op.Selectors, op.Options) ) op.Results.BackupID = model.StableID(uuid.NewString()) @@ -132,17 +138,13 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { ctx, detailsStore, opStats.k.SnapshotID, - backupDetails.Details(), - ) + backupDetails.Details()) if err != nil { opStats.writeErr = err } }() - oc := selectorToOwnersCats(op.Selectors) - srm := shouldRetrieveMetadata(op.Selectors, op.Options) - - mans, mdColls, err := produceManifestsAndMetadata(ctx, op.kopia, op.store, oc, tenantID, srm) + mans, mdColls, err := produceManifestsAndMetadata(ctx, op.kopia, op.store, oc, tenantID, uib) if err != nil { opStats.readErr = errors.Wrap(err, "connecting to M365") return opStats.readErr @@ -168,7 +170,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { oc, mans, cs, - op.Results.BackupID) + op.Results.BackupID, + uib) if err != nil { opStats.writeErr = errors.Wrap(err, "backing up service data") return opStats.writeErr @@ -199,12 +202,50 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { return err } -// checker to see if conditions are correct for retrieving metadata like delta tokens -// and previous paths. -func shouldRetrieveMetadata(sel selectors.Selector, opts control.Options) bool { +// checker to see if conditions are correct for incremental backup behavior such as +// retrieving metadata like delta tokens and previous paths. +func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool { return opts.EnabledFeatures.ExchangeIncrementals && sel.Service == selectors.ServiceExchange } +// --------------------------------------------------------------------------- +// Producer funcs +// --------------------------------------------------------------------------- + +// calls the producer to generate collections of data to backup +func produceBackupDataCollections( + ctx context.Context, + gc *connector.GraphConnector, + sel selectors.Selector, + metadata []data.Collection, + ctrlOpts control.Options, +) ([]data.Collection, error) { + complete, closer := observe.MessageWithCompletion("Discovering items to backup:") + defer func() { + complete <- struct{}{} + close(complete) + closer() + }() + + return gc.DataCollections(ctx, sel, metadata, ctrlOpts) +} + +// --------------------------------------------------------------------------- +// Consumer funcs +// --------------------------------------------------------------------------- + +type backuper interface { + BackupCollections( + ctx context.Context, + bases []kopia.IncrementalBase, + cs []data.Collection, + service path.ServiceType, + oc *kopia.OwnersCats, + tags map[string]string, + buildTreeWithBase bool, + ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) +} + // calls kopia to retrieve prior backup manifests, metadata collections to supply backup heuristics. func produceManifestsAndMetadata( ctx context.Context, @@ -257,15 +298,6 @@ func produceManifestsAndMetadata( return ms, collections, err } -type restorer interface { - RestoreMultipleItems( - ctx context.Context, - snapshotID string, - paths []path.Path, - bc kopia.ByteCounter, - ) ([]data.Collection, error) -} - func collectMetadata( ctx context.Context, r restorer, @@ -327,24 +359,6 @@ func selectorToOwnersCats(sel selectors.Selector) *kopia.OwnersCats { return oc } -// calls the producer to generate collections of data to backup -func produceBackupDataCollections( - ctx context.Context, - gc *connector.GraphConnector, - sel selectors.Selector, - metadata []data.Collection, - ctrlOpts control.Options, -) ([]data.Collection, error) { - complete, closer := observe.MessageWithCompletion("Discovering items to backup:") - defer func() { - complete <- struct{}{} - close(complete) - closer() - }() - - return gc.DataCollections(ctx, sel, metadata, ctrlOpts) -} - func builderFromReason(tenant string, r kopia.Reason) (*path.Builder, error) { // This is hacky, but we want the path package to format the path the right // way (e.x. proper order for service, category, etc), but we don't care about @@ -368,17 +382,6 @@ func builderFromReason(tenant string, r kopia.Reason) (*path.Builder, error) { return p.ToBuilder().Dir(), nil } -type backuper interface { - BackupCollections( - ctx context.Context, - bases []kopia.IncrementalBase, - cs []data.Collection, - service path.ServiceType, - oc *kopia.OwnersCats, - tags map[string]string, - ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) -} - // calls kopia to backup the collections of data func consumeBackupDataCollections( ctx context.Context, @@ -389,6 +392,7 @@ func consumeBackupDataCollections( mans []*kopia.ManifestEntry, cs []data.Collection, backupID model.StableID, + isIncremental bool, ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) { complete, closer := observe.MessageWithCompletion("Backing up data:") defer func() { @@ -422,7 +426,7 @@ func consumeBackupDataCollections( }) } - return bu.BackupCollections(ctx, bases, cs, sel.PathService(), oc, tags) + return bu.BackupCollections(ctx, bases, cs, sel.PathService(), oc, tags, isIncremental) } func matchesReason(reasons []kopia.Reason, p path.Path) bool { diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 26738006d..e04f1fbf6 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -289,6 +289,7 @@ type mockBackuper struct { service path.ServiceType, oc *kopia.OwnersCats, tags map[string]string, + buildTreeWithBase bool, ) } @@ -299,9 +300,10 @@ func (mbu mockBackuper) BackupCollections( service path.ServiceType, oc *kopia.OwnersCats, tags map[string]string, + buildTreeWithBase bool, ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) { if mbu.checkFunc != nil { - mbu.checkFunc(bases, cs, service, oc, tags) + mbu.checkFunc(bases, cs, service, oc, tags, buildTreeWithBase) } return &kopia.BackupStats{}, &details.Builder{}, nil, nil @@ -440,6 +442,7 @@ func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Pat service path.ServiceType, oc *kopia.OwnersCats, tags map[string]string, + buildTreeWithBase bool, ) { assert.ElementsMatch(t, test.expected, bases) }, @@ -455,6 +458,7 @@ func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Pat test.inputMan, nil, model.StableID(""), + true, ) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 3c85ae475..c532111a3 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -95,6 +95,15 @@ type restoreStats struct { restoreID string } +type restorer interface { + RestoreMultipleItems( + ctx context.Context, + snapshotID string, + paths []path.Path, + bc kopia.ByteCounter, + ) ([]data.Collection, error) +} + // Run begins a synchronous restore operation. func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.Details, err error) { ctx, end := D.Span(ctx, "operations:restore:run") diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 20e4262e9..285bff78f 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -80,6 +80,7 @@ func (ss *streamStore) WriteBackupDetails( ss.service, nil, nil, + false, ) if err != nil { return "", nil