diff --git a/src/internal/common/maps.go b/src/internal/common/maps.go new file mode 100644 index 000000000..901a3b4c6 --- /dev/null +++ b/src/internal/common/maps.go @@ -0,0 +1,16 @@ +package common + +// UnionMaps produces a new map containing all the values of the other +// maps. The last maps have the highes priority. Key collisions with +// earlier maps will favor the last map with that key. +func UnionMaps[K comparable, V any](ms ...map[K]V) map[K]V { + r := map[K]V{} + + for _, m := range ms { + for k, v := range m { + r[k] = v + } + } + + return r +} diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index eaeae9adf..49bf67f1b 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -126,7 +126,7 @@ func verifyBackupInputs(sels selectors.Selector, userPNs, siteIDs []string) erro func (gc *GraphConnector) createExchangeCollections( ctx context.Context, scope selectors.ExchangeScope, - deltas map[string]string, + dps exchange.DeltaPaths, ctrlOpts control.Options, ) ([]data.Collection, error) { var ( @@ -161,7 +161,7 @@ func (gc *GraphConnector) createExchangeCollections( gc.UpdateStatus, resolver, scope, - deltas, + dps, ctrlOpts) if err != nil { @@ -202,14 +202,15 @@ func (gc *GraphConnector) ExchangeDataCollection( errs error ) - _, deltas, err := exchange.ParseMetadataCollections(ctx, metadata) + cdps, err := exchange.ParseMetadataCollections(ctx, metadata) if err != nil { return nil, err } for _, scope := range scopes { - // Creates a map of collections based on scope - dcs, err := gc.createExchangeCollections(ctx, scope, deltas, control.Options{}) + dps := cdps[scope.Category().PathType()] + + dcs, err := gc.createExchangeCollections(ctx, scope, dps, control.Options{}) if err != nil { user := scope.Get(selectors.ExchangeUser) return nil, support.WrapAndAppend(user[0], err, errs) diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index f90ed374b..a95700d17 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -286,7 +286,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch() for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { - collections, err := gc.createExchangeCollections(ctx, test.scope, nil, control.Options{}) + collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{}) require.NoError(t, err) for _, c := range collections { @@ -338,7 +338,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() { for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { // get collections without providing any delta history (ie: full backup) - collections, err := gc.createExchangeCollections(ctx, test.scope, nil, control.Options{}) + collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{}) require.NoError(t, err) assert.Less(t, 1, len(collections), "retrieved metadata and data collections") @@ -352,12 +352,14 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() { require.NotNil(t, metadata, "collections contains a metadata collection") - _, deltas, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata}) + cdps, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata}) require.NoError(t, err) + dps := cdps[test.scope.Category().PathType()] + // now do another backup with the previous delta tokens, // which should only contain the difference. - collections, err = gc.createExchangeCollections(ctx, test.scope, deltas, control.Options{}) + collections, err = gc.createExchangeCollections(ctx, test.scope, dps, control.Options{}) require.NoError(t, err) // TODO(keepers): this isn't a very useful test at the moment. It needs to @@ -383,11 +385,15 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializ ctx, flush := tester.NewContext() defer flush() - t := suite.T() - connector := loadConnector(ctx, t, Users) - sel := selectors.NewExchangeBackup() + var ( + t = suite.T() + connector = loadConnector(ctx, t, Users) + sel = selectors.NewExchangeBackup() + ) + sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil, control.Options{}) + + collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], exchange.DeltaPaths{}, control.Options{}) require.NoError(t, err) for _, edc := range collection { @@ -396,9 +402,11 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializ // Verify that each message can be restored for stream := range streamChannel { buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) assert.NoError(t, err) assert.NotZero(t, read) + message, err := support.CreateMessageFromBytes(buf.Bytes()) assert.NotNil(t, message) assert.NoError(t, err) @@ -430,7 +438,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSeria scope := selectors. NewExchangeBackup(). ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0] - collections, err := connector.createExchangeCollections(ctx, scope, nil, control.Options{}) + collections, err := connector.createExchangeCollections(ctx, scope, exchange.DeltaPaths{}, control.Options{}) require.NoError(t, err) return collections @@ -497,7 +505,12 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial getCollection: func(t *testing.T) []data.Collection { sel := selectors.NewExchangeBackup() sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) - collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil, control.Options{}) + + collections, err := connector.createExchangeCollections( + ctx, + sel.Scopes()[0], + exchange.DeltaPaths{}, + control.Options{}) require.NoError(t, err) return collections @@ -509,7 +522,12 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial getCollection: func(t *testing.T) []data.Collection { sel := selectors.NewExchangeBackup() sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch())) - collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil, control.Options{}) + + collections, err := connector.createExchangeCollections( + ctx, + sel.Scopes()[0], + exchange.DeltaPaths{}, + control.Options{}) require.NoError(t, err) return collections @@ -538,7 +556,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial assert.NotZero(t, read) event, err := support.CreateEventFromBytes(buf.Bytes()) assert.NotNil(t, event) - assert.NoError(t, err, "experienced error parsing event bytes: "+buf.String()) + assert.NoError(t, err, "creating event from bytes: "+buf.String()) } } diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index d055679ba..fee39b6ac 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -22,45 +22,75 @@ func MetadataFileNames(cat path.CategoryType) []string { } } -// ParseMetadataCollections produces two maps: -// 1- paths: folderID->filePath, used to look up previous folder pathing -// in case of a name change or relocation. -// 2- deltas: folderID->deltaToken, used to look up previous delta token -// retrievals. +type CatDeltaPaths map[path.CategoryType]DeltaPaths + +type DeltaPaths struct { + deltas map[string]string + paths map[string]string +} + +func makeDeltaPaths() DeltaPaths { + return DeltaPaths{ + deltas: map[string]string{}, + paths: map[string]string{}, + } +} + +// ParseMetadataCollections produces a map of structs holding delta +// and path lookup maps. func ParseMetadataCollections( ctx context.Context, colls []data.Collection, -) (map[string]string, map[string]string, error) { - var ( - paths = map[string]string{} - deltas = map[string]string{} - ) +) (CatDeltaPaths, error) { + cdp := CatDeltaPaths{ + path.ContactsCategory: makeDeltaPaths(), + path.EmailCategory: makeDeltaPaths(), + path.EventsCategory: makeDeltaPaths(), + } for _, coll := range colls { - items := coll.Items() + var ( + breakLoop bool + items = coll.Items() + category = coll.FullPath().Category() + ) for { - var breakLoop bool - select { case <-ctx.Done(): - return nil, nil, errors.Wrap(ctx.Err(), "parsing collection metadata") + return nil, errors.Wrap(ctx.Err(), "parsing collection metadata") + case item, ok := <-items: if !ok { breakLoop = true break } + m := map[string]string{} + cdps := cdp[category] + + err := json.NewDecoder(item.ToReader()).Decode(&m) + if err != nil { + return nil, errors.New("decoding metadata json") + } + switch item.UUID() { - // case graph.PreviousPathFileName: - case graph.DeltaTokenFileName: - err := json.NewDecoder(item.ToReader()).Decode(&deltas) - if err != nil { - return nil, nil, errors.New("parsing delta token map") + case graph.PreviousPathFileName: + if len(cdps.paths) > 0 { + return nil, errors.Errorf("multiple versions of %s path metadata", category) } - breakLoop = true + cdps.paths = m + + case graph.DeltaTokenFileName: + if len(cdps.deltas) > 0 { + return nil, errors.Errorf("multiple versions of %s delta metadata", category) + } + + cdps.deltas = m } + + cdp[category] = cdps } if breakLoop { @@ -69,5 +99,5 @@ func ParseMetadataCollections( } } - return paths, deltas, nil + return cdp, nil } diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 6ced819f6..361a9d41c 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -37,6 +37,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { name string data []fileValues expectDeltas map[string]string + expectPaths map[string]string + expectError assert.ErrorAssertionFunc }{ { name: "delta urls", @@ -46,6 +48,47 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { expectDeltas: map[string]string{ "key": "delta-link", }, + expectError: assert.NoError, + }, + { + name: "multiple delta urls", + data: []fileValues{ + {graph.DeltaTokenFileName, "delta-link"}, + {graph.DeltaTokenFileName, "delta-link-2"}, + }, + expectError: assert.Error, + }, + { + name: "previous path", + data: []fileValues{ + {graph.PreviousPathFileName, "prev-path"}, + }, + expectPaths: map[string]string{ + "key": "prev-path", + }, + expectError: assert.NoError, + }, + { + name: "multiple previous paths", + data: []fileValues{ + {graph.PreviousPathFileName, "prev-path"}, + {graph.PreviousPathFileName, "prev-path-2"}, + }, + expectError: assert.Error, + }, + { + name: "delta urls and previous paths", + data: []fileValues{ + {graph.DeltaTokenFileName, "delta-link"}, + {graph.PreviousPathFileName, "prev-path"}, + }, + expectDeltas: map[string]string{ + "key": "delta-link", + }, + expectPaths: map[string]string{ + "key": "prev-path", + }, + expectError: assert.NoError, }, { name: "delta urls with special chars", @@ -55,6 +98,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { expectDeltas: map[string]string{ "key": "`!@#$%^&*()_[]{}/\"\\", }, + expectError: assert.NoError, }, { name: "delta urls with escaped chars", @@ -64,6 +108,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { expectDeltas: map[string]string{ "key": "\\n\\r\\t\\b\\f\\v\\0\\\\", }, + expectError: assert.NoError, }, { name: "delta urls with newline char runes", @@ -76,6 +121,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { expectDeltas: map[string]string{ "key": "\\n", }, + expectError: assert.NoError, }, } for _, test := range table { @@ -102,12 +148,19 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { colls = append(colls, coll) } - _, deltas, err := ParseMetadataCollections(ctx, colls) - require.NoError(t, err) - assert.NotEmpty(t, deltas, "deltas") + cdps, err := ParseMetadataCollections(ctx, colls) + test.expectError(t, err) + + emails := cdps[path.EmailCategory] + deltas, paths := emails.deltas, emails.paths + for k, v := range test.expectDeltas { assert.Equal(t, v, deltas[k], "deltas elements") } + + for k, v := range test.expectPaths { + assert.Equal(t, v, paths[k], "deltas elements") + } }) } } diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 862e3c255..4795370af 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -58,17 +58,21 @@ type Collection struct { collectionType optionIdentifier statusUpdater support.StatusUpdater - // FullPath is the slice representation of the action context passed down through the hierarchy. - // The original request can be gleaned from the slice. (e.g. {, , "emails"}) + ctrl control.Options + + // FullPath is the current hierarchical path used by this collection. fullPath path.Path - ctrl control.Options + // PrevPath is the previous hierarchical path used by this collection. + // It may be the same as fullPath, if the folder was not renamed or + // moved. It will be empty on its first retrieval. + prevPath path.Path } // NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated func NewCollection( user string, - fullPath path.Path, + fullPath, prevPath path.Path, collectionType optionIdentifier, service graph.Servicer, statusUpdater support.StatusUpdater, @@ -81,6 +85,7 @@ func NewCollection( service: service, statusUpdater: statusUpdater, fullPath: fullPath, + prevPath: prevPath, collectionType: collectionType, ctrl: ctrlOpts, } diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 4651731f1..ded6f2b39 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -30,7 +31,7 @@ func FilterContainersAndFillCollections( statusUpdater support.StatusUpdater, resolver graph.ContainerResolver, scope selectors.ExchangeScope, - oldDeltas map[string]string, + dps DeltaPaths, ctrlOpts control.Options, ) error { var ( @@ -38,7 +39,7 @@ func FilterContainersAndFillCollections( oi = CategoryToOptionIdentifier(qp.Category) // folder ID -> delta url for folder. deltaURLs = map[string]string{} - prevPaths = map[string]string{} + currPaths = map[string]string{} ) for _, c := range resolver.Items() { @@ -46,12 +47,26 @@ func FilterContainersAndFillCollections( return errs } + cID := *c.GetId() + dirPath, ok := pathAndMatch(qp, c, scope) if !ok { continue } - cID := *c.GetId() + var prevPath path.Path + + if ps, ok := dps.paths[cID]; ok { + // see below for the issue with building paths for root + // folders that have no displayName. + ps = strings.TrimSuffix(ps, rootFolderAlias) + + if pp, err := path.FromDataLayerPath(ps, false); err != nil { + logger.Ctx(ctx).Error("parsing previous path string") + } else { + prevPath = pp + } + } // Create only those that match service, err := createService(qp.Credentials) @@ -63,6 +78,7 @@ func FilterContainersAndFillCollections( edc := NewCollection( qp.ResourceOwner, dirPath, + prevPath, oi, service, statusUpdater, @@ -76,7 +92,7 @@ func FilterContainersAndFillCollections( continue } - jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, cID, oldDeltas[cID]) + jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, cID, dps.deltas[cID]) if err != nil { errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) } @@ -89,11 +105,11 @@ func FilterContainersAndFillCollections( // add the current path for the container ID to be used in the next backup // as the "previous path", for reference in case of a rename or relocation. - prevPaths[cID] = dirPath.Folder() + currPaths[cID] = dirPath.Folder() } entries := []graph.MetadataCollectionEntry{ - graph.NewMetadataEntry(graph.PreviousPathFileName, prevPaths), + graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths), } if len(deltaURLs) > 0 {