From 414d2a490fd458adbe78b953911acfa423ddae27 Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 13 Dec 2022 12:03:33 -0700 Subject: [PATCH] consume old deltas on exchange mail backup (#1768) ## Description When backing up exchange data, parse the metadata collection of delta urls from prior runs (if any exist) and pass those tokens along to the fetch functions for re-use. ## Type of change - [x] :sunflower: Feature ## Issue(s) * #1725 ## Test Plan - [x] :zap: Unit test --- src/internal/connector/data_collections.go | 18 ++- .../connector/data_collections_test.go | 108 +++++++++++++----- .../connector/exchange/data_collections.go | 61 ++++++++++ .../exchange/data_collections_test.go | 55 +++++++++ .../connector/exchange/query_options.go | 12 -- .../connector/exchange/service_iterators.go | 64 +++++++---- src/internal/operations/backup.go | 13 ++- .../repository/repository_unexported_test.go | 1 - 8 files changed, 255 insertions(+), 77 deletions(-) create mode 100644 src/internal/connector/exchange/data_collections.go create mode 100644 src/internal/connector/exchange/data_collections_test.go diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 9250d71a9..792fe1373 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -32,7 +32,7 @@ import ( func (gc *GraphConnector) DataCollections( ctx context.Context, sels selectors.Selector, - metadataCols []data.Collection, + metadata []data.Collection, ) ([]data.Collection, error) { ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String())) defer end() @@ -42,11 +42,9 @@ func (gc *GraphConnector) DataCollections( return nil, err } - // serialize metadata into maps here - switch sels.Service { case selectors.ServiceExchange: - return gc.ExchangeDataCollection(ctx, sels) + return gc.ExchangeDataCollection(ctx, sels, metadata) case selectors.ServiceOneDrive: return gc.OneDriveDataCollections(ctx, sels) case selectors.ServiceSharePoint: @@ -119,6 +117,7 @@ func verifyBackupInputs(sels selectors.Selector, userPNs, siteIDs []string) erro func (gc *GraphConnector) createExchangeCollections( ctx context.Context, scope selectors.ExchangeScope, + deltas map[string]string, ) ([]data.Collection, error) { var ( errs *multierror.Error @@ -152,7 +151,8 @@ func (gc *GraphConnector) createExchangeCollections( collections, gc.UpdateStatus, resolver, - scope) + scope, + deltas) if err != nil { return nil, errors.Wrap(err, "filling collections") @@ -178,6 +178,7 @@ func (gc *GraphConnector) createExchangeCollections( func (gc *GraphConnector) ExchangeDataCollection( ctx context.Context, selector selectors.Selector, + metadata []data.Collection, ) ([]data.Collection, error) { eb, err := selector.ToExchangeBackup() if err != nil { @@ -190,9 +191,14 @@ func (gc *GraphConnector) ExchangeDataCollection( errs error ) + _, deltas, err := exchange.ParseMetadataCollections(ctx, metadata) + if err != nil { + return nil, err + } + for _, scope := range scopes { // Creates a map of collections based on scope - dcs, err := gc.createExchangeCollections(ctx, scope) + dcs, err := gc.createExchangeCollections(ctx, scope, deltas) if err != nil { user := scope.Get(selectors.ExchangeUser) return nil, support.WrapAndAppend(user[0], err, errs) diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index 4278ee4fe..81175888b 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -104,7 +104,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { - collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t)) + collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t), nil) require.NoError(t, err) // Categories with delta endpoints will produce a collection for metadata // as well as the actual data pulled. @@ -283,7 +283,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch() for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { - collections, err := gc.createExchangeCollections(ctx, test.scope) + collections, err := gc.createExchangeCollections(ctx, test.scope, nil) require.NoError(t, err) for _, c := range collections { @@ -302,6 +302,77 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch() } } +func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + userID = tester.M365UserID(suite.T()) + gc = loadConnector(ctx, suite.T(), Users) + ) + + tests := []struct { + name string + scope selectors.ExchangeScope + }{ + { + name: "Mail", + scope: selectors.NewExchangeBackup().MailFolders( + []string{userID}, + []string{exchange.DefaultMailFolder}, + selectors.PrefixMatch(), + )[0], + }, + { + name: "Contacts", + scope: selectors.NewExchangeBackup().ContactFolders( + []string{userID}, + []string{exchange.DefaultContactFolder}, + selectors.PrefixMatch(), + )[0], + }, + } + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + // get collections without providing any delta history (ie: full backup) + collections, err := gc.createExchangeCollections(ctx, test.scope, nil) + require.NoError(t, err) + assert.Less(t, 1, len(collections), "retrieved metadata and data collections") + + var metadata data.Collection + + for _, coll := range collections { + if coll.FullPath().Service() == path.ExchangeMetadataService { + metadata = coll + } + } + + require.NotNil(t, metadata, "collections contains a metadata collection") + + _, deltas, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata}) + require.NoError(t, err) + + // now do another backup with the previous delta tokens, + // which should only contain the difference. + collections, err = gc.createExchangeCollections(ctx, test.scope, deltas) + require.NoError(t, err) + + // TODO(keepers): this isn't a very useful test at the moment. It needs to + // investigate the items in the original and delta collections to at least + // assert some minimum assumptions, such as "deltas should retrieve fewer items". + // Delta usage is commented out at the moment, anyway. So this is currently + // a sanity check that the minimum behavior won't break. + for _, coll := range collections { + if coll.FullPath().Service() != path.ExchangeMetadataService { + ec, ok := coll.(*exchange.Collection) + require.True(t, ok, "collection is *exchange.Collection") + assert.NotNil(t, ec) + } + } + }) + } +} + // TestMailSerializationRegression verifies that all mail data stored in the // test account can be successfully downloaded into bytes and restored into // M365 mail objects @@ -313,7 +384,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializ connector := loadConnector(ctx, t, Users) sel := selectors.NewExchangeBackup() sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0]) + collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil) require.NoError(t, err) for _, edc := range collection { @@ -356,7 +427,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSeria scope := selectors. NewExchangeBackup(). ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0] - collections, err := connector.createExchangeCollections(ctx, scope) + collections, err := connector.createExchangeCollections(ctx, scope, nil) require.NoError(t, err) return collections @@ -423,7 +494,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial getCollection: func(t *testing.T) []data.Collection { sel := selectors.NewExchangeBackup() sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) - collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0]) + collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil) require.NoError(t, err) return collections @@ -435,7 +506,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial getCollection: func(t *testing.T) []data.Collection { sel := selectors.NewExchangeBackup() sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch())) - collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0]) + collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil) require.NoError(t, err) return collections @@ -468,31 +539,6 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial } } -// TestAccessOfInboxAllUsers verifies that GraphConnector can -// support `--users *` for backup operations. Selector.DiscreteScopes -// returns all of the users within one scope. Only users who have -// messages in their inbox will have a collection returned. -// The final test insures that more than a 75% of the user collections are -// returned. If an error was experienced, the test will fail overall -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestAccessOfInboxAllUsers() { - ctx, flush := tester.NewContext() - defer flush() - - t := suite.T() - connector := loadConnector(ctx, t, Users) - sel := selectors.NewExchangeBackup() - sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - scopes := sel.DiscreteScopes(connector.GetUsers()) - - for _, scope := range scopes { - users := scope.Get(selectors.ExchangeUser) - standard := (len(users) / 4) * 3 - collections, err := connector.createExchangeCollections(ctx, scope) - require.NoError(t, err) - suite.Greater(len(collections), standard) - } -} - // --------------------------------------------------------------------------- // CreateSharePointCollection tests // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go new file mode 100644 index 000000000..262cee34b --- /dev/null +++ b/src/internal/connector/exchange/data_collections.go @@ -0,0 +1,61 @@ +package exchange + +import ( + "context" + "encoding/json" + + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/data" +) + +// ParseMetadataCollections produces two maps: +// 1- paths: folderID->filePath, used to look up previous folder pathing +// in case of a name change or relocation. +// 2- deltas: folderID->deltaToken, used to look up previous delta token +// retrievals. +func ParseMetadataCollections( + ctx context.Context, + colls []data.Collection, +) (map[string]string, map[string]string, error) { + var ( + paths = map[string]string{} + deltas = map[string]string{} + ) + + for _, coll := range colls { + items := coll.Items() + + for { + var breakLoop bool + + select { + case <-ctx.Done(): + return nil, nil, errors.Wrap(ctx.Err(), "parsing collection metadata") + case item, ok := <-items: + if !ok { + breakLoop = true + break + } + + switch item.UUID() { + // case graph.PreviousPathFileName: + case graph.DeltaTokenFileName: + err := json.NewDecoder(item.ToReader()).Decode(&deltas) + if err != nil { + return nil, nil, errors.New("parsing delta token map") + } + + breakLoop = true + } + } + + if breakLoop { + break + } + } + } + + return paths, deltas, nil +} diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go new file mode 100644 index 000000000..9ddd6574a --- /dev/null +++ b/src/internal/connector/exchange/data_collections_test.go @@ -0,0 +1,55 @@ +package exchange + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/path" +) + +// --------------------------------------------------------------------------- +// Unit tests +// --------------------------------------------------------------------------- + +type DataCollectionsUnitSuite struct { + suite.Suite +} + +func TestDataCollectionsUnitSuite(t *testing.T) { + suite.Run(t, new(DataCollectionsUnitSuite)) +} + +func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { + t := suite.T() + ctx, flush := tester.NewContext() + + defer flush() + + bs, err := json.Marshal(map[string]string{"key": "token"}) + require.NoError(t, err) + + p, err := path.Builder{}.ToServiceCategoryMetadataPath( + "t", "u", + path.ExchangeService, + path.EmailCategory, + false, + ) + require.NoError(t, err) + + item := []graph.MetadataItem{graph.NewMetadataItem(graph.DeltaTokenFileName, bs)} + mdcoll := graph.NewMetadataCollection(p, item, func(cos *support.ConnectorOperationStatus) {}) + colls := []data.Collection{mdcoll} + + _, deltas, err := ParseMetadataCollections(ctx, colls) + require.NoError(t, err) + assert.NotEmpty(t, deltas, "delta urls") + assert.Equal(t, "token", deltas["key"]) +} diff --git a/src/internal/connector/exchange/query_options.go b/src/internal/connector/exchange/query_options.go index 34303f2bd..351794662 100644 --- a/src/internal/connector/exchange/query_options.go +++ b/src/internal/connector/exchange/query_options.go @@ -113,18 +113,6 @@ func CategoryToOptionIdentifier(category path.CategoryType) optionIdentifier { // which reduces the overall latency of complex calls // ----------------------------------------------------------------------- -// Delta requests for mail and contacts have the same parameters and config -// structs. -type DeltaRequestBuilderGetQueryParameters struct { - Count *bool `uriparametername:"%24count"` - Filter *string `uriparametername:"%24filter"` - Orderby []string `uriparametername:"%24orderby"` - Search *string `uriparametername:"%24search"` - Select []string `uriparametername:"%24select"` - Skip *int32 `uriparametername:"%24skip"` - Top *int32 `uriparametername:"%24top"` -} - func optionsForFolderMessagesDelta( moreOps []string, ) (*msuser.UsersItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration, error) { diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 630c677db..a7364e898 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -78,12 +78,13 @@ func FilterContainersAndFillCollections( statusUpdater support.StatusUpdater, resolver graph.ContainerResolver, scope selectors.ExchangeScope, + oldDeltas map[string]string, ) error { var ( errs error collectionType = CategoryToOptionIdentifier(qp.Category) - // folder ID -> delta token for folder. - deltaTokens = map[string]string{} + // folder ID -> delta url for folder. + deltaURLs = map[string]string{} ) for _, c := range resolver.Items() { @@ -128,7 +129,10 @@ func FilterContainersAndFillCollections( continue } - jobs, token, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId()) + dirID := *c.GetId() + oldDelta := oldDeltas[dirID] + + jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, dirID, oldDelta) if err != nil { errs = support.WrapAndAppend( qp.ResourceOwner, @@ -139,8 +143,8 @@ func FilterContainersAndFillCollections( edc.jobs = append(edc.jobs, jobs...) - if len(token) > 0 { - deltaTokens[*c.GetId()] = token + if len(delta) > 0 { + deltaURLs[dirID] = delta } } @@ -148,7 +152,7 @@ func FilterContainersAndFillCollections( qp.Credentials.AzureTenantID, qp.ResourceOwner, qp.Category, - deltaTokens, + deltaURLs, statusUpdater, ) if err != nil { @@ -214,7 +218,7 @@ func IterativeCollectCalendarContainers( type FetchIDFunc func( ctx context.Context, gs graph.Servicer, - user, containerID string, + user, containerID, oldDeltaToken string, ) ([]string, string, error) func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { @@ -234,7 +238,7 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { func FetchEventIDsFromCalendar( ctx context.Context, gs graph.Servicer, - user, calendarID string, + user, calendarID, oldDelta string, ) ([]string, string, error) { var ( errs *multierror.Error @@ -288,17 +292,17 @@ func FetchEventIDsFromCalendar( func FetchContactIDsFromDirectory( ctx context.Context, gs graph.Servicer, - user, directoryID string, + user, directoryID, oldDelta string, ) ([]string, string, error) { var ( - errs *multierror.Error - ids []string - deltaToken string + errs *multierror.Error + ids []string + deltaURL string ) options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"}) if err != nil { - return nil, deltaToken, errors.Wrap(err, "getting query options") + return nil, deltaURL, errors.Wrap(err, "getting query options") } builder := gs.Client(). @@ -307,10 +311,16 @@ func FetchContactIDsFromDirectory( Contacts(). Delta() + // TODO(rkeepers): Awaiting full integration of incremental support, else this + // will cause unexpected behavior/errors. + // if len(oldDelta) > 0 { + // builder = msuser.NewUsersItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()) + // } + for { resp, err := builder.Get(ctx, options) if err != nil { - return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -329,7 +339,7 @@ func FetchContactIDsFromDirectory( delta := resp.GetOdataDeltaLink() if delta != nil && len(*delta) > 0 { - deltaToken = *delta + deltaURL = *delta } nextLink := resp.GetOdataNextLink() @@ -340,7 +350,7 @@ func FetchContactIDsFromDirectory( builder = msuser.NewUsersItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter()) } - return ids, deltaToken, errs.ErrorOrNil() + return ids, deltaURL, errs.ErrorOrNil() } // FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail @@ -348,17 +358,17 @@ func FetchContactIDsFromDirectory( func FetchMessageIDsFromDirectory( ctx context.Context, gs graph.Servicer, - user, directoryID string, + user, directoryID, oldDelta string, ) ([]string, string, error) { var ( - errs *multierror.Error - ids []string - deltaToken string + errs *multierror.Error + ids []string + deltaURL string ) options, err := optionsForFolderMessagesDelta([]string{"id"}) if err != nil { - return nil, deltaToken, errors.Wrap(err, "getting query options") + return nil, deltaURL, errors.Wrap(err, "getting query options") } builder := gs.Client(). @@ -367,10 +377,16 @@ func FetchMessageIDsFromDirectory( Messages(). Delta() + // TODO(rkeepers): Awaiting full integration of incremental support, else this + // will cause unexpected behavior/errors. + // if len(oldDelta) > 0 { + // builder = msuser.NewUsersItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()) + // } + for { resp, err := builder.Get(ctx, options) if err != nil { - return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -389,7 +405,7 @@ func FetchMessageIDsFromDirectory( delta := resp.GetOdataDeltaLink() if delta != nil && len(*delta) > 0 { - deltaToken = *delta + deltaURL = *delta } nextLink := resp.GetOdataNextLink() @@ -400,5 +416,5 @@ func FetchMessageIDsFromDirectory( builder = msuser.NewUsersItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter()) } - return ids, deltaToken, errs.ErrorOrNil() + return ids, deltaURL, errs.ErrorOrNil() } diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index e6494950c..ebedf7407 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -127,7 +127,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { } }() - _, mdColls, err := produceManifestsAndMetadata(ctx, op.kopia, op.store, op.Selectors, op.account) + mans, mdColls, err := produceManifestsAndMetadata(ctx, op.kopia, op.store, op.Selectors, op.account) if err != nil { opStats.readErr = errors.Wrap(err, "connecting to M365") return opStats.readErr @@ -145,7 +145,13 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { return opStats.readErr } - opStats.k, backupDetails, err = consumeBackupDataCollections(ctx, op.kopia, op.Selectors, cs, op.Results.BackupID) + opStats.k, backupDetails, err = consumeBackupDataCollections( + ctx, + op.kopia, + op.Selectors, + mans, + cs, + op.Results.BackupID) if err != nil { opStats.writeErr = errors.Wrap(err, "backing up service data") return opStats.writeErr @@ -326,6 +332,7 @@ func consumeBackupDataCollections( ctx context.Context, kw *kopia.Wrapper, sel selectors.Selector, + mans []*snapshot.Manifest, cs []data.Collection, backupID model.StableID, ) (*kopia.BackupStats, *details.Details, error) { @@ -341,7 +348,7 @@ func consumeBackupDataCollections( kopia.TagBackupCategory: "", } - return kw.BackupCollections(ctx, nil, cs, sel.PathService(), tags) + return kw.BackupCollections(ctx, mans, cs, sel.PathService(), tags) } // writes the results metrics to the operation results. diff --git a/src/pkg/repository/repository_unexported_test.go b/src/pkg/repository/repository_unexported_test.go index 4fdea85f3..409955bde 100644 --- a/src/pkg/repository/repository_unexported_test.go +++ b/src/pkg/repository/repository_unexported_test.go @@ -19,7 +19,6 @@ func TestRepositoryModelSuite(t *testing.T) { if err := tester.RunOnAny( tester.CorsoCITests, tester.CorsoRepositoryTests, - "flomp", ); err != nil { t.Skip(err) }