diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index 1a4e52be1..87838024e 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -24,16 +24,31 @@ func MetadataFileNames(cat path.CategoryType) []string { type CatDeltaPaths map[path.CategoryType]DeltaPaths -type DeltaPaths struct { - deltas map[string]string - paths map[string]string +type DeltaPaths map[string]DeltaPath + +func (dps DeltaPaths) AddDelta(k, d string) { + dp, ok := dps[k] + if !ok { + dp = DeltaPath{} + } + + dp.delta = d + dps[k] = dp } -func makeDeltaPaths() DeltaPaths { - return DeltaPaths{ - deltas: map[string]string{}, - paths: map[string]string{}, +func (dps DeltaPaths) AddPath(k, p string) { + dp, ok := dps[k] + if !ok { + dp = DeltaPath{} } + + dp.path = p + dps[k] = dp +} + +type DeltaPath struct { + delta string + path string } // ParseMetadataCollections produces a map of structs holding delta @@ -42,10 +57,19 @@ func ParseMetadataCollections( ctx context.Context, colls []data.Collection, ) (CatDeltaPaths, error) { + // cdp stores metadata cdp := CatDeltaPaths{ - path.ContactsCategory: makeDeltaPaths(), - path.EmailCategory: makeDeltaPaths(), - path.EventsCategory: makeDeltaPaths(), + path.ContactsCategory: {}, + path.EmailCategory: {}, + path.EventsCategory: {}, + } + + // found tracks the metadata we've loaded, to make sure we don't + // fetch overlapping copies. + found := map[path.CategoryType]map[string]struct{}{ + path.ContactsCategory: {}, + path.EmailCategory: {}, + path.EventsCategory: {}, } for _, coll := range colls { @@ -66,8 +90,10 @@ func ParseMetadataCollections( break } - m := map[string]string{} - cdps := cdp[category] + var ( + m = map[string]string{} + cdps = cdp[category] + ) err := json.NewDecoder(item.ToReader()).Decode(&m) if err != nil { @@ -76,18 +102,26 @@ func ParseMetadataCollections( switch item.UUID() { case graph.PreviousPathFileName: - if len(cdps.paths) > 0 { + if _, ok := found[category]["path"]; ok { return nil, errors.Errorf("multiple versions of %s path metadata", category) } - cdps.paths = m + for k, p := range m { + cdps.AddPath(k, p) + } + + found[category]["path"] = struct{}{} case graph.DeltaURLsFileName: - if len(cdps.deltas) > 0 { + if _, ok := found[category]["delta"]; ok { return nil, errors.Errorf("multiple versions of %s delta metadata", category) } - cdps.deltas = m + for k, d := range m { + cdps.AddDelta(k, d) + } + + found[category]["delta"] = struct{}{} } cdp[category] = cdps @@ -99,5 +133,16 @@ func ParseMetadataCollections( } } + // Remove any entries that contain a path or a delta, but not both. + // That metadata is considered incomplete, and needs to incur a + // complete backup on the next run. + for _, dps := range cdp { + for k, dp := range dps { + if len(dp.delta) == 0 || len(dp.path) == 0 { + delete(dps, k) + } + } + } + return cdp, nil } diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 173189abc..54d295dd6 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -1,7 +1,6 @@ package exchange import ( - "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -34,20 +33,17 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { } table := []struct { - name string - data []fileValues - expectDeltas map[string]string - expectPaths map[string]string - expectError assert.ErrorAssertionFunc + name string + data []fileValues + expect map[string]DeltaPath + expectError assert.ErrorAssertionFunc }{ { - name: "delta urls", + name: "delta urls only", data: []fileValues{ {graph.DeltaURLsFileName, "delta-link"}, }, - expectDeltas: map[string]string{ - "key": "delta-link", - }, + expect: map[string]DeltaPath{}, expectError: assert.NoError, }, { @@ -59,13 +55,11 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { expectError: assert.Error, }, { - name: "previous path", + name: "previous path only", data: []fileValues{ {graph.PreviousPathFileName, "prev-path"}, }, - expectPaths: map[string]string{ - "key": "prev-path", - }, + expect: map[string]DeltaPath{}, expectError: assert.NoError, }, { @@ -82,21 +76,43 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { {graph.DeltaURLsFileName, "delta-link"}, {graph.PreviousPathFileName, "prev-path"}, }, - expectDeltas: map[string]string{ - "key": "delta-link", + expect: map[string]DeltaPath{ + "key": { + delta: "delta-link", + path: "prev-path", + }, }, - expectPaths: map[string]string{ - "key": "prev-path", + expectError: assert.NoError, + }, + { + name: "delta urls and empty previous paths", + data: []fileValues{ + {graph.DeltaURLsFileName, "delta-link"}, + {graph.PreviousPathFileName, ""}, }, + expect: map[string]DeltaPath{}, + expectError: assert.NoError, + }, + { + name: "empty delta urls and previous paths", + data: []fileValues{ + {graph.DeltaURLsFileName, ""}, + {graph.PreviousPathFileName, "prev-path"}, + }, + expect: map[string]DeltaPath{}, expectError: assert.NoError, }, { name: "delta urls with special chars", data: []fileValues{ {graph.DeltaURLsFileName, "`!@#$%^&*()_[]{}/\"\\"}, + {graph.PreviousPathFileName, "prev-path"}, }, - expectDeltas: map[string]string{ - "key": "`!@#$%^&*()_[]{}/\"\\", + expect: map[string]DeltaPath{ + "key": { + delta: "`!@#$%^&*()_[]{}/\"\\", + path: "prev-path", + }, }, expectError: assert.NoError, }, @@ -104,9 +120,13 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { name: "delta urls with escaped chars", data: []fileValues{ {graph.DeltaURLsFileName, `\n\r\t\b\f\v\0\\`}, + {graph.PreviousPathFileName, "prev-path"}, }, - expectDeltas: map[string]string{ - "key": "\\n\\r\\t\\b\\f\\v\\0\\\\", + expect: map[string]DeltaPath{ + "key": { + delta: "\\n\\r\\t\\b\\f\\v\\0\\\\", + path: "prev-path", + }, }, expectError: assert.NoError, }, @@ -117,9 +137,13 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { // error in serializing/deserializing and produce a single newline // character from those two runes. {graph.DeltaURLsFileName, string([]rune{rune(92), rune(110)})}, + {graph.PreviousPathFileName, "prev-path"}, }, - expectDeltas: map[string]string{ - "key": "\\n", + expect: map[string]DeltaPath{ + "key": { + delta: "\\n", + path: "prev-path", + }, }, expectError: assert.NoError, }, @@ -129,45 +153,33 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { ctx, flush := tester.NewContext() defer flush() - colls := []data.Collection{} + entries := []graph.MetadataCollectionEntry{} for _, d := range test.data { - bs, err := json.Marshal(map[string]string{"key": d.value}) - require.NoError(t, err) - - p, err := path.Builder{}.ToServiceCategoryMetadataPath( - "t", "u", - path.ExchangeService, - path.EmailCategory, - false, - ) - require.NoError(t, err) - - item := []graph.MetadataItem{graph.NewMetadataItem(d.fileName, bs)} - coll := graph.NewMetadataCollection(p, item, func(cos *support.ConnectorOperationStatus) {}) - colls = append(colls, coll) + entries = append( + entries, + graph.NewMetadataEntry(d.fileName, map[string]string{"key": d.value})) } - cdps, err := ParseMetadataCollections(ctx, colls) + coll, err := graph.MakeMetadataCollection( + "t", "u", + path.ExchangeService, + path.EmailCategory, + entries, + func(cos *support.ConnectorOperationStatus) {}, + ) + require.NoError(t, err) + + cdps, err := ParseMetadataCollections(ctx, []data.Collection{coll}) test.expectError(t, err) emails := cdps[path.EmailCategory] - deltas, paths := emails.deltas, emails.paths - if len(test.expectDeltas) > 0 { - assert.Len(t, deltas, len(test.expectDeltas), "deltas len") - } + assert.Len(t, emails, len(test.expect)) - if len(test.expectPaths) > 0 { - assert.Len(t, paths, len(test.expectPaths), "paths len") - } - - for k, v := range test.expectDeltas { - assert.Equal(t, v, deltas[k], "deltas elements") - } - - for k, v := range test.expectPaths { - assert.Equal(t, v, paths[k], "paths elements") + for k, v := range emails { + assert.Equal(t, v.delta, emails[k].delta, "delta") + assert.Equal(t, v.path, emails[k].path, "path") } }) } diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 3a9fa7801..40a5ae3fe 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -62,16 +62,30 @@ func FilterContainersAndFillCollections( currPaths = map[string]string{} // copy of previousPaths. any folder found in the resolver get // deleted from this map, leaving only the deleted maps behind - deletedPaths = common.CopyMap(dps.paths) + deletedPaths = common.CopyMap(dps) ) + getJobs, err := getFetchIDFunc(qp.Category) + if err != nil { + return support.WrapAndAppend(qp.ResourceOwner, err, errs) + } + for _, c := range resolver.Items() { if ctrlOpts.FailFast && errs != nil { return errs } + // cannot be moved out of the loop, + // else we run into state issues. + service, err := createService(qp.Credentials) + if err != nil { + errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) + continue + } + cID := *c.GetId() + // this folder exists (probably), do not delete it. delete(deletedPaths, cID) // Only create a collection if the path matches the scope. @@ -80,43 +94,32 @@ func FilterContainersAndFillCollections( continue } - var prevPath path.Path + var ( + dp = dps[cID] + prevDelta = dp.delta + prevPathStr = dp.path + prevPath path.Path + ) - if p, ok := dps.paths[cID]; ok { - var err error - if prevPath, err = pathFromPrevString(p); err != nil { + if len(prevPathStr) > 0 { + if prevPath, err = pathFromPrevString(prevPathStr); err != nil { logger.Ctx(ctx).Error(err) } } - service, err := createService(qp.Credentials) + jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta) if err != nil { - errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) - continue + // race conditions happen, the container might get + // deleted while this process in flight. + if errors.Is(err, errContainerDeleted) { + currPath = nil + } else { + errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) + } } - fetchFunc, err := getFetchIDFunc(qp.Category) - if err != nil { - errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) - continue - } - - var deletedInFlight bool - - jobs, delta, err := fetchFunc(ctx, service, qp.ResourceOwner, cID, dps.deltas[cID]) - if err != nil && !errors.Is(err, errContainerDeleted) { - deletedInFlight = true - errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) - } - - if len(delta) > 0 { - deltaURLs[cID] = delta - } - - // Delay creating the new container so we can handle setting the current - // path correctly if the folder was deleted. - if deletedInFlight { - currPath = nil + if len(currDelta) > 0 { + deltaURLs[cID] = currDelta } edc := NewCollection( @@ -130,7 +133,7 @@ func FilterContainersAndFillCollections( ) collections[cID] = &edc - if deletedInFlight { + if edc.State() == data.DeletedState { continue } @@ -145,14 +148,14 @@ func FilterContainersAndFillCollections( // relocations and renames will have removed the dir by id earlier. What's // left in deletedPaths are only the previous paths that did not appear as // children of the root. - for fID, ps := range deletedPaths { + for fID, dp := range deletedPaths { service, err := createService(qp.Credentials) if err != nil { errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) continue } - prevPath, err := pathFromPrevString(ps) + prevPath, err := pathFromPrevString(dp.path) if err != nil { logger.Ctx(ctx).Error(err) continue