From 03c7667679b301b3faeaa884944b899dcdb06b60 Mon Sep 17 00:00:00 2001 From: Keepers Date: Fri, 13 Oct 2023 21:22:12 -0600 Subject: [PATCH] iterate pager results with channels (#4414) corrects the prior PRs full-scale batching of driveItems by adding a channel to the api layer's enumerateItems and driveEnumerateItems funcs. To avoid changes in most parts of code, also adds batch wrappers that hide these changes from existing pager usage. DriveItem collection processing now ranges over a channel of pager results, allowing it to process pages in a stream rather than batching all items. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- .../m365/collection/drive/collections.go | 481 ++++++----- .../m365/collection/drive/collections_test.go | 801 +++++++++++------- .../m365/collection/drive/handlers.go | 8 +- .../collection/drive/item_collector_test.go | 6 +- .../m365/collection/drive/site_handler.go | 6 +- .../m365/collection/drive/url_cache.go | 49 +- .../m365/collection/drive/url_cache_test.go | 440 +++++++--- .../collection/drive/user_drive_handler.go | 6 +- .../m365/service/onedrive/mock/handlers.go | 144 ++-- .../m365/service/sharepoint/backup_test.go | 30 +- src/pkg/fault/skipped.go | 8 + src/pkg/services/m365/api/channels_pager.go | 10 +- src/pkg/services/m365/api/client.go | 2 +- src/pkg/services/m365/api/contacts_pager.go | 6 +- src/pkg/services/m365/api/drive_pager.go | 57 +- src/pkg/services/m365/api/drive_pager_test.go | 24 +- src/pkg/services/m365/api/events_pager.go | 6 +- src/pkg/services/m365/api/item_pager.go | 185 +++- src/pkg/services/m365/api/item_pager_test.go | 4 +- src/pkg/services/m365/api/mail_pager.go | 6 +- 20 files changed, 1470 insertions(+), 809 deletions(-) diff --git a/src/internal/m365/collection/drive/collections.go b/src/internal/m365/collection/drive/collections.go index 7ea00abaf..c213035e9 100644 --- a/src/internal/m365/collection/drive/collections.go +++ b/src/internal/m365/collection/drive/collections.go @@ -272,13 +272,6 @@ func (c *Collections) Get( excludedItemIDs = map[string]struct{}{} oldPrevPaths = oldPrevPathsByDriveID[driveID] prevDeltaLink = prevDriveIDToDelta[driveID] - - // itemCollection is used to identify which collection a - // file belongs to. This is useful to delete a file from the - // collection it was previously in, in case it was moved to a - // different collection within the same delta query - // item ID -> item ID - itemCollection = map[string]string{} ) delete(driveTombstones, driveID) @@ -295,13 +288,16 @@ func (c *Collections) Get( "previous metadata for drive", "num_paths_entries", len(oldPrevPaths)) - items, du, err := c.handler.EnumerateDriveItemsDelta( - ictx, + du, newPrevPaths, err := c.PopulateDriveCollections( + ctx, driveID, + driveName, + oldPrevPaths, + excludedItemIDs, prevDeltaLink, - api.DefaultDriveItemProps()) + errs) if err != nil { - return nil, false, err + return nil, false, clues.Stack(err) } // It's alright to have an empty folders map (i.e. no folders found) but not @@ -313,20 +309,6 @@ func (c *Collections) Get( driveIDToDeltaLink[driveID] = du.URL } - newPrevPaths, err := c.UpdateCollections( - ctx, - driveID, - driveName, - items, - oldPrevPaths, - itemCollection, - excludedItemIDs, - du.Reset, - errs) - if err != nil { - return nil, false, clues.Stack(err) - } - // Avoid the edge case where there's no paths but we do have a valid delta // token. We can accomplish this by adding an empty paths map for this // drive. If we don't have this then the next backup won't use the delta @@ -688,226 +670,282 @@ func (c *Collections) getCollectionPath( return collectionPath, nil } -// UpdateCollections initializes and adds the provided drive items to Collections -// A new collection is created for every drive folder (or package). -// oldPrevPaths is the unchanged data that was loaded from the metadata file. -// This map is not modified during the call. -// currPrevPaths starts as a copy of oldPaths and is updated as changes are found in -// the returned results. Items are added to this collection throughout the call. -// newPrevPaths, ie: the items added during this call, get returned as a map. -func (c *Collections) UpdateCollections( +// PopulateDriveCollections initializes and adds the provided drive items to Collections +// A new collection is created for every drive folder. +// Along with populating the collection items and updating the excluded item IDs, this func +// returns the current DeltaUpdate and PreviousPaths for metadata records. +func (c *Collections) PopulateDriveCollections( ctx context.Context, driveID, driveName string, - items []models.DriveItemable, oldPrevPaths map[string]string, - currPrevPaths map[string]string, - excluded map[string]struct{}, - invalidPrevDelta bool, + excludedItemIDs map[string]struct{}, + prevDeltaLink string, errs *fault.Bus, -) (map[string]string, error) { +) (api.DeltaUpdate, map[string]string, error) { var ( - el = errs.Local() - newPrevPaths = map[string]string{} + el = errs.Local() + newPrevPaths = map[string]string{} + invalidPrevDelta = len(prevDeltaLink) == 0 + + // currPrevPaths is used to identify which collection a + // file belongs to. This is useful to delete a file from the + // collection it was previously in, in case it was moved to a + // different collection within the same delta query + // item ID -> item ID + currPrevPaths = map[string]string{} ) if !invalidPrevDelta { maps.Copy(newPrevPaths, oldPrevPaths) } - for _, item := range items { + pager := c.handler.EnumerateDriveItemsDelta( + ctx, + driveID, + prevDeltaLink, + api.CallConfig{ + Select: api.DefaultDriveItemProps(), + }) + + for page, reset, done := pager.NextPage(); !done; page, reset, done = pager.NextPage() { if el.Failure() != nil { break } - var ( - itemID = ptr.Val(item.GetId()) - itemName = ptr.Val(item.GetName()) - isFolder = item.GetFolder() != nil || item.GetPackageEscaped() != nil - ictx = clues.Add( - ctx, - "item_id", itemID, - "item_name", clues.Hide(itemName), - "item_is_folder", isFolder) - ) - - if item.GetMalware() != nil { - addtl := graph.ItemInfo(item) - skip := fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, addtl) - - if isFolder { - skip = fault.ContainerSkip(fault.SkipMalware, driveID, itemID, itemName, addtl) - } - - errs.AddSkip(ctx, skip) - logger.Ctx(ctx).Infow("malware detected", "item_details", addtl) - - continue + if reset { + newPrevPaths = map[string]string{} + currPrevPaths = map[string]string{} + c.CollectionMap[driveID] = map[string]*Collection{} + invalidPrevDelta = true } - // Deleted file or folder. - if item.GetDeleted() != nil { - if err := c.handleDelete( - itemID, + for _, item := range page { + if el.Failure() != nil { + break + } + + err := c.processItem( + ctx, + item, driveID, + driveName, oldPrevPaths, currPrevPaths, newPrevPaths, - isFolder, - excluded, - invalidPrevDelta); err != nil { - return nil, clues.Stack(err).WithClues(ictx) - } - - continue - } - - collectionPath, err := c.getCollectionPath(driveID, item) - if err != nil { - el.AddRecoverable(ctx, clues.Stack(err). - WithClues(ictx). - Label(fault.LabelForceNoBackupCreation)) - - continue - } - - // Skip items that don't match the folder selectors we were given. - if shouldSkip(ctx, collectionPath, c.handler, driveName) { - logger.Ctx(ictx).Debugw("path not selected", "skipped_path", collectionPath.String()) - continue - } - - switch { - case isFolder: - // Deletions are handled above so this is just moves/renames. - var prevPath path.Path - - prevPathStr, ok := oldPrevPaths[itemID] - if ok { - prevPath, err = path.FromDataLayerPath(prevPathStr, false) - if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "invalid previous path"). - WithClues(ictx). - With("prev_path_string", path.LoggableDir(prevPathStr))) - } - } else if item.GetRoot() != nil { - // Root doesn't move or get renamed. - prevPath = collectionPath - } - - // Moved folders don't cause delta results for any subfolders nested in - // them. We need to go through and update paths to handle that. We only - // update newPaths so we don't accidentally clobber previous deletes. - updatePath(newPrevPaths, itemID, collectionPath.String()) - - found, err := updateCollectionPaths(driveID, itemID, c.CollectionMap, collectionPath) - if err != nil { - return nil, clues.Stack(err).WithClues(ictx) - } - - if found { - continue - } - - colScope := CollectionScopeFolder - if item.GetPackageEscaped() != nil { - colScope = CollectionScopePackage - } - - ictx = clues.Add(ictx, "collection_scope", colScope) - - col, err := NewCollection( - c.handler, - c.protectedResource, - collectionPath, - prevPath, - driveID, - c.statusUpdater, - c.ctrl, - colScope, + excludedItemIDs, invalidPrevDelta, - nil) + el) if err != nil { - return nil, clues.Stack(err).WithClues(ictx) + el.AddRecoverable(ctx, clues.Stack(err)) } - - col.driveName = driveName - - c.CollectionMap[driveID][itemID] = col - c.NumContainers++ - - if item.GetRoot() != nil { - continue - } - - // Add an entry to fetch permissions into this collection. This assumes - // that OneDrive always returns all folders on the path of an item - // before the item. This seems to hold true for now at least. - if col.Add(item) { - c.NumItems++ - } - - case item.GetFile() != nil: - // Deletions are handled above so this is just moves/renames. - if len(ptr.Val(item.GetParentReference().GetId())) == 0 { - return nil, clues.New("file without parent ID").WithClues(ictx) - } - - // Get the collection for this item. - parentID := ptr.Val(item.GetParentReference().GetId()) - ictx = clues.Add(ictx, "parent_id", parentID) - - collection, ok := c.CollectionMap[driveID][parentID] - if !ok { - return nil, clues.New("item seen before parent folder").WithClues(ictx) - } - - // This will only kick in if the file was moved multiple times - // within a single delta query. We delete the file from the previous - // collection so that it doesn't appear in two places. - prevParentContainerID, ok := currPrevPaths[itemID] - if ok { - prevColl, found := c.CollectionMap[driveID][prevParentContainerID] - if !found { - return nil, clues.New("previous collection not found"). - With("prev_parent_container_id", prevParentContainerID). - WithClues(ictx) - } - - if ok := prevColl.Remove(itemID); !ok { - return nil, clues.New("removing item from prev collection"). - With("prev_parent_container_id", prevParentContainerID). - WithClues(ictx) - } - } - - currPrevPaths[itemID] = parentID - - if collection.Add(item) { - c.NumItems++ - c.NumFiles++ - } - - // Do this after adding the file to the collection so if we fail to add - // the item to the collection for some reason and we're using best effort - // we don't just end up deleting the item in the resulting backup. The - // resulting backup will be slightly incorrect, but it will have the most - // data that we were able to preserve. - if !invalidPrevDelta { - // Always add a file to the excluded list. The file may have been - // renamed/moved/modified, so we still have to drop the - // original one and download a fresh copy. - excluded[itemID+metadata.DataFileSuffix] = struct{}{} - excluded[itemID+metadata.MetaFileSuffix] = struct{}{} - } - - default: - el.AddRecoverable(ictx, clues.New("item is neither folder nor file"). - WithClues(ictx). - Label(fault.LabelForceNoBackupCreation)) } } - return newPrevPaths, el.Failure() + du, err := pager.Results() + if err != nil { + return du, nil, clues.Stack(err) + } + + return du, newPrevPaths, el.Failure() +} + +func (c *Collections) processItem( + ctx context.Context, + item models.DriveItemable, + driveID, driveName string, + oldPrevPaths, currPrevPaths, newPrevPaths map[string]string, + excluded map[string]struct{}, + invalidPrevDelta bool, + skipper fault.AddSkipper, +) error { + var ( + itemID = ptr.Val(item.GetId()) + itemName = ptr.Val(item.GetName()) + isFolder = item.GetFolder() != nil || item.GetPackageEscaped() != nil + ictx = clues.Add( + ctx, + "item_id", itemID, + "item_name", clues.Hide(itemName), + "item_is_folder", isFolder) + ) + + if item.GetMalware() != nil { + addtl := graph.ItemInfo(item) + skip := fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, addtl) + + if isFolder { + skip = fault.ContainerSkip(fault.SkipMalware, driveID, itemID, itemName, addtl) + } + + skipper.AddSkip(ctx, skip) + logger.Ctx(ctx).Infow("malware detected", "item_details", addtl) + + return nil + } + + // Deleted file or folder. + if item.GetDeleted() != nil { + err := c.handleDelete( + itemID, + driveID, + oldPrevPaths, + currPrevPaths, + newPrevPaths, + isFolder, + excluded, + invalidPrevDelta) + + return clues.Stack(err).WithClues(ictx).OrNil() + } + + collectionPath, err := c.getCollectionPath(driveID, item) + if err != nil { + return clues.Stack(err). + WithClues(ictx). + Label(fault.LabelForceNoBackupCreation) + } + + // Skip items that don't match the folder selectors we were given. + if shouldSkip(ctx, collectionPath, c.handler, driveName) { + logger.Ctx(ictx).Debugw("path not selected", "skipped_path", collectionPath.String()) + return nil + } + + switch { + case isFolder: + // Deletions are handled above so this is just moves/renames. + var prevPath path.Path + + prevPathStr, ok := oldPrevPaths[itemID] + if ok { + prevPath, err = path.FromDataLayerPath(prevPathStr, false) + if err != nil { + return clues.Wrap(err, "invalid previous path"). + WithClues(ictx). + With("prev_path_string", path.LoggableDir(prevPathStr)) + } + } else if item.GetRoot() != nil { + // Root doesn't move or get renamed. + prevPath = collectionPath + } + + // Moved folders don't cause delta results for any subfolders nested in + // them. We need to go through and update paths to handle that. We only + // update newPaths so we don't accidentally clobber previous deletes. + updatePath(newPrevPaths, itemID, collectionPath.String()) + + found, err := updateCollectionPaths( + driveID, + itemID, + c.CollectionMap, + collectionPath) + if err != nil { + return clues.Stack(err).WithClues(ictx) + } + + if found { + return nil + } + + colScope := CollectionScopeFolder + if item.GetPackageEscaped() != nil { + colScope = CollectionScopePackage + } + + ictx = clues.Add(ictx, "collection_scope", colScope) + + col, err := NewCollection( + c.handler, + c.protectedResource, + collectionPath, + prevPath, + driveID, + c.statusUpdater, + c.ctrl, + colScope, + invalidPrevDelta, + nil) + if err != nil { + return clues.Stack(err).WithClues(ictx) + } + + col.driveName = driveName + + c.CollectionMap[driveID][itemID] = col + c.NumContainers++ + + if item.GetRoot() != nil { + return nil + } + + // Add an entry to fetch permissions into this collection. This assumes + // that OneDrive always returns all folders on the path of an item + // before the item. This seems to hold true for now at least. + if col.Add(item) { + c.NumItems++ + } + + case item.GetFile() != nil: + // Deletions are handled above so this is just moves/renames. + if len(ptr.Val(item.GetParentReference().GetId())) == 0 { + return clues.New("file without parent ID").WithClues(ictx) + } + + // Get the collection for this item. + parentID := ptr.Val(item.GetParentReference().GetId()) + ictx = clues.Add(ictx, "parent_id", parentID) + + collection, ok := c.CollectionMap[driveID][parentID] + if !ok { + return clues.New("item seen before parent folder").WithClues(ictx) + } + + // This will only kick in if the file was moved multiple times + // within a single delta query. We delete the file from the previous + // collection so that it doesn't appear in two places. + prevParentContainerID, ok := currPrevPaths[itemID] + if ok { + prevColl, found := c.CollectionMap[driveID][prevParentContainerID] + if !found { + return clues.New("previous collection not found"). + With("prev_parent_container_id", prevParentContainerID). + WithClues(ictx) + } + + if ok := prevColl.Remove(itemID); !ok { + return clues.New("removing item from prev collection"). + With("prev_parent_container_id", prevParentContainerID). + WithClues(ictx) + } + } + + currPrevPaths[itemID] = parentID + + if collection.Add(item) { + c.NumItems++ + c.NumFiles++ + } + + // Do this after adding the file to the collection so if we fail to add + // the item to the collection for some reason and we're using best effort + // we don't just end up deleting the item in the resulting backup. The + // resulting backup will be slightly incorrect, but it will have the most + // data that we were able to preserve. + if !invalidPrevDelta { + // Always add a file to the excluded list. The file may have been + // renamed/moved/modified, so we still have to drop the + // original one and download a fresh copy. + excluded[itemID+metadata.DataFileSuffix] = struct{}{} + excluded[itemID+metadata.MetaFileSuffix] = struct{}{} + } + + default: + return clues.New("item is neither folder nor file"). + WithClues(ictx). + Label(fault.LabelForceNoBackupCreation) + } + + return nil } type dirScopeChecker interface { @@ -915,7 +953,12 @@ type dirScopeChecker interface { IncludesDir(dir string) bool } -func shouldSkip(ctx context.Context, drivePath path.Path, dsc dirScopeChecker, driveName string) bool { +func shouldSkip( + ctx context.Context, + drivePath path.Path, + dsc dirScopeChecker, + driveName string, +) bool { return !includePath(ctx, dsc, drivePath) || (drivePath.Category() == path.LibrariesCategory && restrictedDirectory == driveName) } diff --git a/src/internal/m365/collection/drive/collections_test.go b/src/internal/m365/collection/drive/collections_test.go index 9a5fd5e81..1236ed201 100644 --- a/src/internal/m365/collection/drive/collections_test.go +++ b/src/internal/m365/collection/drive/collections_test.go @@ -119,7 +119,7 @@ func getDelList(files ...string) map[string]struct{} { return delList } -func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() { +func (suite *OneDriveCollectionsUnitSuite) TestPopulateDriveCollections() { anyFolder := (&selectors.OneDriveBackup{}).Folders(selectors.Any())[0] const ( @@ -690,8 +690,10 @@ func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() { expectedItemCount: 0, expectedFileCount: 0, expectedContainerCount: 1, - expectedPrevPaths: nil, - expectedExcludes: map[string]struct{}{}, + expectedPrevPaths: map[string]string{ + "root": expectedPath(""), + }, + expectedExcludes: map[string]struct{}{}, }, { name: "1 root file, 1 folder, 1 package, 1 good file, 1 malware", @@ -732,21 +734,31 @@ func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() { defer flush() var ( - excludes = map[string]struct{}{} - currPrevPaths = map[string]string{} - errs = fault.New(true) + mbh = mock.DefaultOneDriveBH(user) + du = api.DeltaUpdate{ + URL: "notempty", + Reset: false, + } + excludes = map[string]struct{}{} + errs = fault.New(true) ) - maps.Copy(currPrevPaths, test.inputFolderMap) + mbh.DriveItemEnumeration = mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID: { + Pages: []mock.NextPage{{Items: test.items}}, + DeltaUpdate: du, + }, + }, + } + + sel := selectors.NewOneDriveBackup([]string{user}) + sel.Include([]selectors.OneDriveScope{test.scope}) + + mbh.Sel = sel.Selector c := NewCollections( - &userDriveBackupHandler{ - baseUserDriveHandler: baseUserDriveHandler{ - ac: api.Drives{}, - }, - userID: user, - scope: test.scope, - }, + mbh, tenant, idname.NewProvider(user, user), nil, @@ -754,18 +766,19 @@ func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() { c.CollectionMap[driveID] = map[string]*Collection{} - newPrevPaths, err := c.UpdateCollections( + _, newPrevPaths, err := c.PopulateDriveCollections( ctx, driveID, "General", - test.items, test.inputFolderMap, - currPrevPaths, excludes, - false, + "smarf", errs) test.expect(t, err, clues.ToCore(err)) - assert.Equal(t, len(test.expectedCollectionIDs), len(c.CollectionMap[driveID]), "total collections") + assert.ElementsMatch( + t, + maps.Keys(test.expectedCollectionIDs), + maps.Keys(c.CollectionMap[driveID])) assert.Equal(t, test.expectedItemCount, c.NumItems, "item count") assert.Equal(t, test.expectedFileCount, c.NumFiles, "file count") assert.Equal(t, test.expectedContainerCount, c.NumContainers, "container count") @@ -1172,7 +1185,6 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { tenant = "a-tenant" user = "a-user" empty = "" - next = "next" delta = "delta1" delta2 = "delta2" ) @@ -1214,7 +1226,7 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { table := []struct { name string drives []models.Driveable - items map[string][]apiMock.PagerResult[models.DriveItemable] + enumerator mock.EnumerateItemsDeltaByDrive canUsePreviousBackup bool errCheck assert.ErrorAssertionFunc prevFolderPaths map[string]map[string]string @@ -1233,14 +1245,16 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_DelFileOnly_NoFolders_NoErrors", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), // will be present, not needed - delItem("file", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{ + Items: []models.DriveItemable{ + driveRootItem("root"), // will be present, not needed + delItem("file", driveBasePath1, "root", true, false, false), + }, + }}, + DeltaUpdate: api.DeltaUpdate{URL: delta}, }, }, }, @@ -1265,14 +1279,16 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_NoFolderDeltas_NoErrors", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("file", "file", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{ + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("file", "file", driveBasePath1, "root", true, false, false), + }, + }}, + DeltaUpdate: api.DeltaUpdate{URL: delta}, }, }, }, @@ -1297,16 +1313,17 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_NoErrors", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{ + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + }}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1335,17 +1352,18 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_NoErrors_FileRenamedMultiple", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - driveItem("file", "file2", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{ + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + driveItem("file", "file2", driveBasePath1+"/folder", "folder", true, false, false), + }, + }}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1374,16 +1392,16 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_NoErrors_FileMovedMultiple", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("file", "file2", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta}, }, }, }, @@ -1414,16 +1432,15 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_EmptyDelta_NoErrors", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &empty, // probably will never happen with graph - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: empty, Reset: true}, }, }, }, @@ -1452,25 +1469,146 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_TwoItemPages_NoErrors", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, }, - NextLink: &next, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + canUsePreviousBackup: true, + errCheck: assert.NoError, + prevFolderPaths: map[string]map[string]string{ + driveID1: {}, + }, + expectedCollections: map[string]map[data.CollectionState][]string{ + rootFolderPath1: {data.NewState: {}}, + folderPath1: {data.NewState: {"folder", "file", "file2"}}, + }, + expectedDeltaURLs: map[string]string{ + driveID1: delta, + }, + expectedFolderPaths: map[string]map[string]string{ + driveID1: { + "root": rootFolderPath1, + "folder": folderPath1, + }, + }, + expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}), + doNotMergeItems: map[string]bool{ + rootFolderPath1: true, + folderPath1: true, + }, + }, + { + name: "OneDrive_TwoItemPages_WithReset", + drives: []models.Driveable{drive1}, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + driveItem("file3", "file3", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, }, - DeltaLink: &delta, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, + }, + }, + }, + canUsePreviousBackup: true, + errCheck: assert.NoError, + prevFolderPaths: map[string]map[string]string{ + driveID1: {}, + }, + expectedCollections: map[string]map[data.CollectionState][]string{ + rootFolderPath1: {data.NewState: {}}, + folderPath1: {data.NewState: {"folder", "file", "file2"}}, + }, + expectedDeltaURLs: map[string]string{ + driveID1: delta, + }, + expectedFolderPaths: map[string]map[string]string{ + driveID1: { + "root": rootFolderPath1, + "folder": folderPath1, + }, + }, + expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}), + doNotMergeItems: map[string]bool{ + rootFolderPath1: true, + folderPath1: true, + }, + }, + { + name: "OneDrive_TwoItemPages_WithResetCombinedWithItems", + drives: []models.Driveable{drive1}, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + Reset: true, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + }, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1504,27 +1642,23 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { drive1, drive2, }, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, - }, - driveID2: { - { - Values: []models.DriveItemable{ + driveID2: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root2"), driveItem("folder2", "folder", driveBasePath2, "root2", false, true, false), driveItem("file2", "file", driveBasePath2+"/folder", "folder2", true, false, false), - }, - DeltaLink: &delta2, - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true}, }, }, }, @@ -1568,27 +1702,23 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { drive1, drive2, }, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, - }, - driveID2: { - { - Values: []models.DriveItemable{ + driveID2: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), driveItem("folder", "folder", driveBasePath2, "root", false, true, false), driveItem("file2", "file", driveBasePath2+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta2, - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true}, }, }, }, @@ -1629,10 +1759,12 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_Errors", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Err: assert.AnError, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{}}}, + DeltaUpdate: api.DeltaUpdate{}, + Err: assert.AnError, }, }, }, @@ -1647,24 +1779,25 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { expectedDelList: nil, }, { - name: "OneDrive_TwoItemPage_NoDeltaError", + name: "OneDrive_OneItemPage_InvalidPrevDelta_DeleteNonExistentFolder", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("file", "file", driveBasePath1, "root", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder2", "folder2", true, false, false), + }, + }, }, - NextLink: &next, - }, - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file2", "file", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1672,40 +1805,51 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { errCheck: assert.NoError, prevFolderPaths: map[string]map[string]string{ driveID1: { - "root": rootFolderPath1, + "root": rootFolderPath1, + "folder": folderPath1, }, }, expectedCollections: map[string]map[data.CollectionState][]string{ - rootFolderPath1: {data.NotMovedState: {"file"}}, - expectedPath1("/folder"): {data.NewState: {"folder", "file2"}}, + rootFolderPath1: {data.NewState: {}}, + expectedPath1("/folder"): {data.DeletedState: {}}, + expectedPath1("/folder2"): {data.NewState: {"folder2", "file"}}, }, expectedDeltaURLs: map[string]string{ driveID1: delta, }, expectedFolderPaths: map[string]map[string]string{ driveID1: { - "root": rootFolderPath1, - "folder": folderPath1, + "root": rootFolderPath1, + "folder2": expectedPath1("/folder2"), }, }, - expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{ - rootFolderPath1: getDelList("file", "file2"), - }), - doNotMergeItems: map[string]bool{}, + expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}), + doNotMergeItems: map[string]bool{ + rootFolderPath1: true, + folderPath1: true, + expectedPath1("/folder2"): true, + }, }, { - name: "OneDrive_OneItemPage_InvalidPrevDelta_DeleteNonExistentFolder", + name: "OneDrive_OneItemPage_InvalidPrevDeltaCombinedWithItems_DeleteNonExistentFolder", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder2", "folder2", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder2", "folder2", true, false, false), + }, + }, }, - DeltaLink: &delta, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1741,16 +1885,89 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive_OneItemPage_InvalidPrevDelta_AnotherFolderAtDeletedLocation", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder2", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + // on the first page, if this is the total data, we'd expect both folder and folder2 + // since new previousPaths merge with the old previousPaths. + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder2", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false), + }, + }, + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + // but after a delta reset, we treat this as the total end set of folders, which means + // we don't expect folder to exist any longer. + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder2", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false), + }, + }, }, - DeltaLink: &delta, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, + }, + }, + }, + canUsePreviousBackup: true, + errCheck: assert.NoError, + prevFolderPaths: map[string]map[string]string{ + driveID1: { + "root": rootFolderPath1, + "folder": folderPath1, + }, + }, + expectedCollections: map[string]map[data.CollectionState][]string{ + rootFolderPath1: {data.NewState: {}}, + expectedPath1("/folder"): { + // Old folder path should be marked as deleted since it should compare + // by ID. + data.DeletedState: {}, + data.NewState: {"folder2", "file"}, + }, + }, + expectedDeltaURLs: map[string]string{ + driveID1: delta, + }, + expectedFolderPaths: map[string]map[string]string{ + driveID1: { + "root": rootFolderPath1, + "folder2": expectedPath1("/folder"), + }, + }, + expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}), + doNotMergeItems: map[string]bool{ + rootFolderPath1: true, + folderPath1: true, + }, + }, + { + name: "OneDrive_OneItemPage_InvalidPrevDelta_AnotherFolderAtDeletedLocation", + drives: []models.Driveable{drive1}, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder2", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false), + }, + }, + }, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1789,26 +2006,28 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "OneDrive Two Item Pages with Malware", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - malwareItem("malware", "malware", driveBasePath1+"/folder", "folder", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + malwareItem("malware", "malware", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), + malwareItem("malware2", "malware2", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, }, - NextLink: &next, - }, - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), - malwareItem("malware2", "malware2", driveBasePath1+"/folder", "folder", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1838,28 +2057,35 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { expectedSkippedCount: 2, }, { - name: "One Drive Deleted Folder In New Results", + name: "One Drive Deleted Folder In New Results With Invalid Delta", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), - driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), - driveItem("file2", "file2", driveBasePath1+"/folder2", "folder2", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), + driveItem("file2", "file2", driveBasePath1+"/folder2", "folder2", true, false, false), + }, + }, + { + Reset: true, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + delItem("folder2", driveBasePath1, "root", false, true, false), + delItem("file2", driveBasePath1, "root", true, false, false), + }, + }, }, - NextLink: &next, - }, - { - Values: []models.DriveItemable{ - driveRootItem("root"), - delItem("folder2", driveBasePath1, "root", false, true, false), - delItem("file2", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta2, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true}, }, }, }, @@ -1894,17 +2120,19 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { }, }, { - name: "One Drive Random Folder Delete", + name: "One Drive Folder Delete After Invalid Delta", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - delItem("folder", driveBasePath1, "root", false, true, false), - }, - DeltaLink: &delta, - ResetDelta: true, + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{ + Items: []models.DriveItemable{ + driveRootItem("root"), + delItem("folder", driveBasePath1, "root", false, true, false), + }, + Reset: true, + }}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1935,17 +2163,21 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { }, }, { - name: "One Drive Random Item Delete", + name: "One Drive Item Delete After Invalid Delta", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - delItem("file", driveBasePath1, "root", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + delItem("file", driveBasePath1, "root", true, false, false), + }, + Reset: true, + }, }, - DeltaLink: &delta, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -1975,24 +2207,26 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "One Drive Folder Made And Deleted", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + delItem("folder", driveBasePath1, "root", false, true, false), + delItem("file", driveBasePath1, "root", true, false, false), + }, + }, }, - NextLink: &next, - }, - { - Values: []models.DriveItemable{ - driveRootItem("root"), - delItem("folder", driveBasePath1, "root", false, true, false), - delItem("file", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta2, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true}, }, }, }, @@ -2020,23 +2254,25 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "One Drive Item Made And Deleted", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ - driveRootItem("root"), - driveItem("folder", "folder", driveBasePath1, "root", false, true, false), - driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + driveRootItem("root"), + driveItem("folder", "folder", driveBasePath1, "root", false, true, false), + driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), + }, + }, + { + Items: []models.DriveItemable{ + driveRootItem("root"), + delItem("file", driveBasePath1, "root", true, false, false), + }, + }, }, - NextLink: &next, - }, - { - Values: []models.DriveItemable{ - driveRootItem("root"), - delItem("file", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -2067,15 +2303,14 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "One Drive Random Folder Delete", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), delItem("folder", driveBasePath1, "root", false, true, false), - }, - DeltaLink: &delta, - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -2103,15 +2338,14 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "One Drive Random Item Delete", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), delItem("file", driveBasePath1, "root", true, false, false), - }, - DeltaLink: &delta, - ResetDelta: true, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true}, }, }, }, @@ -2139,13 +2373,13 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { { name: "TwoPriorDrives_OneTombstoned", drives: []models.Driveable{drive1}, - items: map[string][]apiMock.PagerResult[models.DriveItemable]{ - driveID1: { - { - Values: []models.DriveItemable{ + enumerator: mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID1: { + Pages: []mock.NextPage{{Items: []models.DriveItemable{ driveRootItem("root"), // will be present - }, - DeltaLink: &delta, + }}}, + DeltaUpdate: api.DeltaUpdate{URL: delta}, }, }, }, @@ -2182,18 +2416,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { }, } - itemPagers := map[string]api.DeltaPager[models.DriveItemable]{} - - for driveID := range test.items { - itemPagers[driveID] = &apiMock.DeltaPager[models.DriveItemable]{ - ToReturn: test.items[driveID], - } - } - mbh := mock.DefaultOneDriveBH("a-user") mbh.DrivePagerV = mockDrivePager - mbh.ItemPagerV = itemPagers - mbh.DriveItemEnumeration = mock.PagerResultToEDID(test.items) + mbh.DriveItemEnumeration = test.enumerator c := NewCollections( mbh, @@ -2268,10 +2493,10 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { collectionCount++ - // TODO(ashmrtn): We should really be getting items in the collection - // via the Items() channel, but we don't have a way to mock out the - // actual item fetch yet (mostly wiring issues). The lack of that makes - // this check a bit more bittle since internal details can change. + // TODO: We should really be getting items in the collection + // via the Items() channel. The lack of that makes this check a bit more + // bittle since internal details can change. The wiring to support + // mocked GetItems is available. We just haven't plugged it in yet. col, ok := baseCol.(*Collection) require.True(t, ok, "getting onedrive.Collection handle") diff --git a/src/internal/m365/collection/drive/handlers.go b/src/internal/m365/collection/drive/handlers.go index 9c803b93f..0b5211050 100644 --- a/src/internal/m365/collection/drive/handlers.go +++ b/src/internal/m365/collection/drive/handlers.go @@ -86,12 +86,8 @@ type EnumerateDriveItemsDeltaer interface { EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, - selectProps []string, - ) ( - []models.DriveItemable, - api.DeltaUpdate, - error, - ) + cc api.CallConfig, + ) api.NextPageResulter[models.DriveItemable] } // --------------------------------------------------------------------------- diff --git a/src/internal/m365/collection/drive/item_collector_test.go b/src/internal/m365/collection/drive/item_collector_test.go index 30bdc4661..0e5e44ace 100644 --- a/src/internal/m365/collection/drive/item_collector_test.go +++ b/src/internal/m365/collection/drive/item_collector_test.go @@ -150,8 +150,10 @@ func (suite *ItemCollectorUnitSuite) TestDrives() { Err: assert.AnError, }, }, - expectedErr: assert.Error, - expectedResults: nil, + expectedErr: assert.Error, + // even though we error, the func will return both the + // error and the prior results + expectedResults: resultDrives, }, { name: "MySiteURLNotFound", diff --git a/src/internal/m365/collection/drive/site_handler.go b/src/internal/m365/collection/drive/site_handler.go index 532f32e0f..126f480c3 100644 --- a/src/internal/m365/collection/drive/site_handler.go +++ b/src/internal/m365/collection/drive/site_handler.go @@ -175,9 +175,9 @@ func (h siteBackupHandler) IncludesDir(dir string) bool { func (h siteBackupHandler) EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, - selectProps []string, -) ([]models.DriveItemable, api.DeltaUpdate, error) { - return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) + cc api.CallConfig, +) api.NextPageResulter[models.DriveItemable] { + return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, cc) } // --------------------------------------------------------------------------- diff --git a/src/internal/m365/collection/drive/url_cache.go b/src/internal/m365/collection/drive/url_cache.go index 391382dbe..654979626 100644 --- a/src/internal/m365/collection/drive/url_cache.go +++ b/src/internal/m365/collection/drive/url_cache.go @@ -44,10 +44,10 @@ type urlCache struct { // cacheMu protects idToProps and lastRefreshTime cacheMu sync.RWMutex // refreshMu serializes cache refresh attempts by potential writers - refreshMu sync.Mutex - deltaQueryCount int + refreshMu sync.Mutex + refreshCount int - edid EnumerateDriveItemsDeltaer + enumerator EnumerateDriveItemsDeltaer errs *fault.Bus } @@ -56,10 +56,10 @@ type urlCache struct { func newURLCache( driveID, prevDelta string, refreshInterval time.Duration, - edid EnumerateDriveItemsDeltaer, + enumerator EnumerateDriveItemsDeltaer, errs *fault.Bus, ) (*urlCache, error) { - err := validateCacheParams(driveID, refreshInterval, edid) + err := validateCacheParams(driveID, refreshInterval, enumerator) if err != nil { return nil, clues.Wrap(err, "cache params") } @@ -68,7 +68,7 @@ func newURLCache( idToProps: make(map[string]itemProps), lastRefreshTime: time.Time{}, driveID: driveID, - edid: edid, + enumerator: enumerator, prevDelta: prevDelta, refreshInterval: refreshInterval, errs: errs, @@ -80,7 +80,7 @@ func newURLCache( func validateCacheParams( driveID string, refreshInterval time.Duration, - edid EnumerateDriveItemsDeltaer, + enumerator EnumerateDriveItemsDeltaer, ) error { if len(driveID) == 0 { return clues.New("drive id is empty") @@ -90,8 +90,8 @@ func validateCacheParams( return clues.New("invalid refresh interval") } - if edid == nil { - return clues.New("nil item enumerator") + if enumerator == nil { + return clues.New("missing item enumerator") } return nil @@ -154,22 +154,30 @@ func (uc *urlCache) refreshCache( uc.cacheMu.Lock() defer uc.cacheMu.Unlock() - // Issue a delta query to graph logger.Ctx(ctx).Info("refreshing url cache") + uc.refreshCount++ - items, du, err := uc.edid.EnumerateDriveItemsDelta( + pager := uc.enumerator.EnumerateDriveItemsDelta( ctx, uc.driveID, uc.prevDelta, - api.URLCacheDriveItemProps()) - if err != nil { - uc.idToProps = make(map[string]itemProps) - return clues.Stack(err) + api.CallConfig{ + Select: api.URLCacheDriveItemProps(), + }) + + for page, reset, done := pager.NextPage(); !done; page, reset, done = pager.NextPage() { + err := uc.updateCache( + ctx, + page, + reset, + uc.errs) + if err != nil { + return clues.Wrap(err, "updating cache") + } } - uc.deltaQueryCount++ - - if err := uc.updateCache(ctx, items, uc.errs); err != nil { + du, err := pager.Results() + if err != nil { return clues.Stack(err) } @@ -205,10 +213,15 @@ func (uc *urlCache) readCache( func (uc *urlCache) updateCache( ctx context.Context, items []models.DriveItemable, + reset bool, errs *fault.Bus, ) error { el := errs.Local() + if reset { + uc.idToProps = map[string]itemProps{} + } + for _, item := range items { if el.Failure() != nil { break diff --git a/src/internal/m365/collection/drive/url_cache_test.go b/src/internal/m365/collection/drive/url_cache_test.go index 4fa0043fb..da6e5c732 100644 --- a/src/internal/m365/collection/drive/url_cache_test.go +++ b/src/internal/m365/collection/drive/url_cache_test.go @@ -2,6 +2,7 @@ package drive import ( "errors" + "fmt" "io" "math/rand" "net/http" @@ -11,6 +12,7 @@ import ( "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -91,17 +93,23 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { api.NewDriveItem(newFolderName, true), control.Copy) require.NoError(t, err, clues.ToCore(err)) - require.NotNil(t, newFolder.GetId()) nfid := ptr.Val(newFolder.GetId()) // Get the previous delta to feed into url cache - _, du, err := ac.EnumerateDriveItemsDelta( + pager := ac.EnumerateDriveItemsDelta( ctx, suite.driveID, "", - api.URLCacheDriveItemProps()) + api.CallConfig{ + Select: api.URLCacheDriveItemProps(), + }) + + // normally we'd page through all the pager.NextPage + // enumerations first. But Results should make sure + // that we don't need to drain lower-level communication first. + du, err := pager.Results() require.NoError(t, err, clues.ToCore(err)) require.NotEmpty(t, du.URL) @@ -175,7 +183,7 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { wg.Wait() // Validate that exactly 1 delta query was made by url cache - require.Equal(t, 1, uc.deltaQueryCount) + require.Equal(t, 1, uc.refreshCount) } // --------------------------------------------------------------------------- @@ -196,16 +204,18 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { table := []struct { name string - pagerItems map[string][]models.DriveItemable - pagerErr map[string]error + pages []mock.NextPage + pagerErr error expectedItemProps map[string]itemProps - expectedErr require.ErrorAssertionFunc - cacheAssert func(*urlCache, time.Time) + expectErr assert.ErrorAssertionFunc + expect func(*testing.T, *urlCache, time.Time) }{ { name: "single item in cache", - pagerItems: map[string][]models.DriveItemable{ - driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + }}, }, expectedItemProps: map[string]itemProps{ "1": { @@ -213,22 +223,123 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 1, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 1, len(uc.idToProps)) }, }, { name: "multiple items in cache", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("4", "file4", "root", "root", "https://dummy4.com", false), fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }}, + }, + expectedItemProps: map[string]itemProps{ + "1": { + downloadURL: "https://dummy1.com", + isDeleted: false, + }, + "2": { + downloadURL: "https://dummy2.com", + isDeleted: false, + }, + "3": { + downloadURL: "https://dummy3.com", + isDeleted: false, + }, + "4": { + downloadURL: "https://dummy4.com", + isDeleted: false, + }, + "5": { + downloadURL: "https://dummy5.com", + isDeleted: false, + }, + }, + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 5, len(uc.idToProps)) + }, + }, + { + name: "multiple pages", + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }}, + {Items: []models.DriveItemable{ + fileItem("4", "file4", "root", "root", "https://dummy4.com", false), + fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }}, + }, + expectedItemProps: map[string]itemProps{ + "1": { + downloadURL: "https://dummy1.com", + isDeleted: false, + }, + "2": { + downloadURL: "https://dummy2.com", + isDeleted: false, + }, + "3": { + downloadURL: "https://dummy3.com", + isDeleted: false, + }, + "4": { + downloadURL: "https://dummy4.com", + isDeleted: false, + }, + "5": { + downloadURL: "https://dummy5.com", + isDeleted: false, + }, + }, + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 5, len(uc.idToProps)) + }, + }, + { + name: "multiple pages with resets", + pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + fileItem("-1", "file-1", "root", "root", "https://dummy-1.com", false), + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + }, + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + Items: []models.DriveItemable{ + fileItem("0", "file1", "root", "root", "https://dummy0.com", false), + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + }, + { + Items: []models.DriveItemable{ + fileItem("4", "file4", "root", "root", "https://dummy4.com", false), + fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }, }, }, expectedItemProps: map[string]itemProps{ @@ -253,23 +364,77 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 5, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 6, len(uc.idToProps)) + }, + }, + { + name: "multiple pages with resets and combo reset+items in page", + pages: []mock.NextPage{ + { + Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + }, + { + Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + Reset: true, + }, + { + Items: []models.DriveItemable{ + fileItem("4", "file4", "root", "root", "https://dummy4.com", false), + fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }, + }, + }, + expectedItemProps: map[string]itemProps{ + "1": { + downloadURL: "https://dummy1.com", + isDeleted: false, + }, + "2": { + downloadURL: "https://dummy2.com", + isDeleted: false, + }, + "3": { + downloadURL: "https://dummy3.com", + isDeleted: false, + }, + "4": { + downloadURL: "https://dummy4.com", + isDeleted: false, + }, + "5": { + downloadURL: "https://dummy5.com", + isDeleted: false, + }, + }, + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 5, len(uc.idToProps)) }, }, { name: "duplicate items with potentially new urls", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("1", "file1", "root", "root", "https://test1.com", false), fileItem("2", "file2", "root", "root", "https://test2.com", false), - }, + }}, }, expectedItemProps: map[string]itemProps{ "1": { @@ -285,21 +450,21 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 3, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 3, len(uc.idToProps)) }, }, { name: "deleted items", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", true), - }, + }}, }, expectedItemProps: map[string]itemProps{ "1": { @@ -311,111 +476,120 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 2, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 2, len(uc.idToProps)) }, }, { name: "item not found in cache", - pagerItems: map[string][]models.DriveItemable{ - driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + }}, }, expectedItemProps: map[string]itemProps{ "2": {}, }, - expectedErr: require.Error, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 1, len(uc.idToProps)) + expectErr: assert.Error, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 1, len(uc.idToProps)) }, }, { - name: "delta query error", - pagerItems: map[string][]models.DriveItemable{}, - pagerErr: map[string]error{ - driveID: errors.New("delta query error"), + name: "delta query error", + pages: []mock.NextPage{ + {Items: []models.DriveItemable{}}, }, + pagerErr: errors.New("delta query error"), expectedItemProps: map[string]itemProps{ "1": {}, "2": {}, }, - expectedErr: require.Error, - cacheAssert: func(uc *urlCache, _ time.Time) { - require.Equal(suite.T(), time.Time{}, uc.lastRefreshTime) - require.Equal(suite.T(), 0, uc.deltaQueryCount) - require.Equal(suite.T(), 0, len(uc.idToProps)) + expectErr: assert.Error, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Equal(t, time.Time{}, uc.lastRefreshTime) + assert.NotZero(t, uc.refreshCount) + assert.Equal(t, 0, len(uc.idToProps)) }, }, { name: "folder item", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []mock.NextPage{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), driveItem("2", "folder2", "root", "root", false, true, false), - }, + }}, }, expectedItemProps: map[string]itemProps{ "2": {}, }, - expectedErr: require.Error, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 1, len(uc.idToProps)) + expectErr: assert.Error, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.refreshCount) + assert.Equal(t, 1, len(uc.idToProps)) }, }, } for _, test := range table { suite.Run(test.name, func() { - t := suite.T() - ctx, flush := tester.NewContext(t) - defer flush() + for _, numConcurrentReqs := range []int{1, 2, 32} { + crTestName := fmt.Sprintf("%d_concurrent_reqs", numConcurrentReqs) + suite.Run(crTestName, func() { + t := suite.T() - medi := mock.EnumeratesDriveItemsDelta{ - Items: test.pagerItems, - Err: test.pagerErr, - DeltaUpdate: map[string]api.DeltaUpdate{driveID: {URL: deltaString}}, - } + ctx, flush := tester.NewContext(t) + defer flush() - cache, err := newURLCache( - driveID, - "", - 1*time.Hour, - &medi, - fault.New(true)) - - require.NoError(suite.T(), err, clues.ToCore(err)) - - numConcurrentReq := 100 - var wg sync.WaitGroup - wg.Add(numConcurrentReq) - - startTime := time.Now() - - for i := 0; i < numConcurrentReq; i++ { - go func() { - defer wg.Done() - - for id, expected := range test.expectedItemProps { - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - - props, err := cache.getItemProperties(ctx, id) - - test.expectedErr(suite.T(), err, clues.ToCore(err)) - require.Equal(suite.T(), expected, props) + medi := mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID: { + Pages: test.pages, + Err: test.pagerErr, + DeltaUpdate: api.DeltaUpdate{URL: deltaString}, + }, + }, } - }() + + cache, err := newURLCache( + driveID, + "", + 1*time.Hour, + &medi, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + var wg sync.WaitGroup + wg.Add(numConcurrentReqs) + + startTime := time.Now() + + for i := 0; i < numConcurrentReqs; i++ { + go func(ti int) { + defer wg.Done() + + for id, expected := range test.expectedItemProps { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + + props, err := cache.getItemProperties(ctx, id) + test.expectErr(t, err, clues.ToCore(err)) + assert.Equal(t, expected, props) + } + }(i) + } + + wg.Wait() + + test.expect(t, cache, startTime) + }) } - - wg.Wait() - - test.cacheAssert(cache, startTime) }) } } @@ -432,7 +606,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() { driveID, "", refreshInterval, - &mock.EnumeratesDriveItemsDelta{}, + &mock.EnumerateItemsDeltaByDrive{}, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -456,44 +630,44 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() { func (suite *URLCacheUnitSuite) TestNewURLCache() { table := []struct { - name string - driveID string - refreshInt time.Duration - itemPager EnumerateDriveItemsDeltaer - errors *fault.Bus - expectedErr require.ErrorAssertionFunc + name string + driveID string + refreshInt time.Duration + itemPager EnumerateDriveItemsDeltaer + errors *fault.Bus + expectErr require.ErrorAssertionFunc }{ { - name: "invalid driveID", - driveID: "", - refreshInt: 1 * time.Hour, - itemPager: &mock.EnumeratesDriveItemsDelta{}, - errors: fault.New(true), - expectedErr: require.Error, + name: "invalid driveID", + driveID: "", + refreshInt: 1 * time.Hour, + itemPager: &mock.EnumerateItemsDeltaByDrive{}, + errors: fault.New(true), + expectErr: require.Error, }, { - name: "invalid refresh interval", - driveID: "drive1", - refreshInt: 100 * time.Millisecond, - itemPager: &mock.EnumeratesDriveItemsDelta{}, - errors: fault.New(true), - expectedErr: require.Error, + name: "invalid refresh interval", + driveID: "drive1", + refreshInt: 100 * time.Millisecond, + itemPager: &mock.EnumerateItemsDeltaByDrive{}, + errors: fault.New(true), + expectErr: require.Error, }, { - name: "invalid item enumerator", - driveID: "drive1", - refreshInt: 1 * time.Hour, - itemPager: nil, - errors: fault.New(true), - expectedErr: require.Error, + name: "invalid item enumerator", + driveID: "drive1", + refreshInt: 1 * time.Hour, + itemPager: nil, + errors: fault.New(true), + expectErr: require.Error, }, { - name: "valid", - driveID: "drive1", - refreshInt: 1 * time.Hour, - itemPager: &mock.EnumeratesDriveItemsDelta{}, - errors: fault.New(true), - expectedErr: require.NoError, + name: "valid", + driveID: "drive1", + refreshInt: 1 * time.Hour, + itemPager: &mock.EnumerateItemsDeltaByDrive{}, + errors: fault.New(true), + expectErr: require.NoError, }, } @@ -507,7 +681,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() { test.itemPager, test.errors) - test.expectedErr(t, err, clues.ToCore(err)) + test.expectErr(t, err, clues.ToCore(err)) }) } } diff --git a/src/internal/m365/collection/drive/user_drive_handler.go b/src/internal/m365/collection/drive/user_drive_handler.go index c8ee56882..3d19cd23c 100644 --- a/src/internal/m365/collection/drive/user_drive_handler.go +++ b/src/internal/m365/collection/drive/user_drive_handler.go @@ -175,9 +175,9 @@ func (h userDriveBackupHandler) IncludesDir(dir string) bool { func (h userDriveBackupHandler) EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, - selectProps []string, -) ([]models.DriveItemable, api.DeltaUpdate, error) { - return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) + cc api.CallConfig, +) api.NextPageResulter[models.DriveItemable] { + return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, cc) } // --------------------------------------------------------------------------- diff --git a/src/internal/m365/service/onedrive/mock/handlers.go b/src/internal/m365/service/onedrive/mock/handlers.go index 568644d98..d9b944478 100644 --- a/src/internal/m365/service/onedrive/mock/handlers.go +++ b/src/internal/m365/service/onedrive/mock/handlers.go @@ -9,23 +9,25 @@ import ( "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/alcionai/corso/src/internal/common/idname" - "github.com/alcionai/corso/src/internal/common/ptr" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" - apiMock "github.com/alcionai/corso/src/pkg/services/m365/api/mock" ) // --------------------------------------------------------------------------- // Backup Handler // --------------------------------------------------------------------------- -type BackupHandler struct { +type BackupHandler[T any] struct { ItemInfo details.ItemInfo + // FIXME: this is a hacky solution. Better to use an interface + // and plug in the selector scope there. + Sel selectors.Selector - DriveItemEnumeration EnumeratesDriveItemsDelta + DriveItemEnumeration EnumerateItemsDeltaByDrive GI GetsItem GIP GetsItemPermission @@ -54,13 +56,17 @@ type BackupHandler struct { GetErrs []error } -func DefaultOneDriveBH(resourceOwner string) *BackupHandler { - return &BackupHandler{ +func DefaultOneDriveBH(resourceOwner string) *BackupHandler[models.DriveItemable] { + sel := selectors.NewOneDriveBackup([]string{resourceOwner}) + sel.Include(sel.AllData()) + + return &BackupHandler[models.DriveItemable]{ ItemInfo: details.ItemInfo{ OneDrive: &details.OneDriveInfo{}, Extension: &details.ExtensionData{}, }, - DriveItemEnumeration: EnumeratesDriveItemsDelta{}, + Sel: sel.Selector, + DriveItemEnumeration: EnumerateItemsDeltaByDrive{}, GI: GetsItem{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")}, PathPrefixFn: defaultOneDrivePathPrefixer, @@ -75,12 +81,16 @@ func DefaultOneDriveBH(resourceOwner string) *BackupHandler { } } -func DefaultSharePointBH(resourceOwner string) *BackupHandler { - return &BackupHandler{ +func DefaultSharePointBH(resourceOwner string) *BackupHandler[models.DriveItemable] { + sel := selectors.NewOneDriveBackup([]string{resourceOwner}) + sel.Include(sel.AllData()) + + return &BackupHandler[models.DriveItemable]{ ItemInfo: details.ItemInfo{ SharePoint: &details.SharePointInfo{}, Extension: &details.ExtensionData{}, }, + Sel: sel.Selector, GI: GetsItem{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")}, PathPrefixFn: defaultSharePointPathPrefixer, @@ -95,7 +105,7 @@ func DefaultSharePointBH(resourceOwner string) *BackupHandler { } } -func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) { +func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) { pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID) if err != nil { return nil, err @@ -104,7 +114,7 @@ func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) { return pp, h.PathPrefixErr } -func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) { +func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) { pp, err := h.MetadataPathPrefixFn(tID, h.ProtectedResource.ID()) if err != nil { return nil, err @@ -113,7 +123,7 @@ func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) { return pp, h.PathPrefixErr } -func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) { +func (h BackupHandler[T]) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) { cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource.ID()) if err != nil { return nil, err @@ -122,23 +132,23 @@ func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, e return cp, h.CanonPathErr } -func (h BackupHandler) ServiceCat() (path.ServiceType, path.CategoryType) { +func (h BackupHandler[T]) ServiceCat() (path.ServiceType, path.CategoryType) { return h.Service, h.Category } -func (h BackupHandler) NewDrivePager(string, []string) api.Pager[models.Driveable] { +func (h BackupHandler[T]) NewDrivePager(string, []string) api.Pager[models.Driveable] { return h.DrivePagerV } -func (h BackupHandler) FormatDisplayPath(_ string, pb *path.Builder) string { +func (h BackupHandler[T]) FormatDisplayPath(_ string, pb *path.Builder) string { return "/" + pb.String() } -func (h BackupHandler) NewLocationIDer(driveID string, elems ...string) details.LocationIDer { +func (h BackupHandler[T]) NewLocationIDer(driveID string, elems ...string) details.LocationIDer { return h.LocationIDFn(driveID, elems...) } -func (h BackupHandler) AugmentItemInfo( +func (h BackupHandler[T]) AugmentItemInfo( details.ItemInfo, idname.Provider, models.DriveItemable, @@ -148,7 +158,7 @@ func (h BackupHandler) AugmentItemInfo( return h.ItemInfo } -func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.Response, error) { +func (h *BackupHandler[T]) Get(context.Context, string, map[string]string) (*http.Response, error) { c := h.getCall h.getCall++ @@ -160,23 +170,23 @@ func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.R return h.GetResps[c], h.GetErrs[c] } -func (h BackupHandler) EnumerateDriveItemsDelta( +func (h BackupHandler[T]) EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, - selectProps []string, -) ([]models.DriveItemable, api.DeltaUpdate, error) { + cc api.CallConfig, +) api.NextPageResulter[models.DriveItemable] { return h.DriveItemEnumeration.EnumerateDriveItemsDelta( ctx, driveID, prevDeltaLink, - selectProps) + cc) } -func (h BackupHandler) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) { +func (h BackupHandler[T]) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) { return h.GI.GetItem(ctx, "", "") } -func (h BackupHandler) GetItemPermission( +func (h BackupHandler[T]) GetItemPermission( ctx context.Context, _, _ string, ) (models.PermissionCollectionResponseable, error) { @@ -250,12 +260,16 @@ var defaultSharePointLocationIDer = func(driveID string, elems ...string) detail return details.NewSharePointLocationIDer(driveID, elems...) } -func (h BackupHandler) IsAllPass() bool { - return true +func (h BackupHandler[T]) IsAllPass() bool { + scope := h.Sel.Includes[0] + return selectors.IsAnyTarget(selectors.SharePointScope(scope), selectors.SharePointLibraryFolder) || + selectors.IsAnyTarget(selectors.OneDriveScope(scope), selectors.OneDriveFolder) } -func (h BackupHandler) IncludesDir(string) bool { - return true +func (h BackupHandler[T]) IncludesDir(dir string) bool { + scope := h.Sel.Includes[0] + return selectors.SharePointScope(scope).Matches(selectors.SharePointLibraryFolder, dir) || + selectors.OneDriveScope(scope).Matches(selectors.OneDriveFolder, dir) } // --------------------------------------------------------------------------- @@ -278,60 +292,46 @@ func (m GetsItem) GetItem( // Enumerates Drive Items // --------------------------------------------------------------------------- -type EnumeratesDriveItemsDelta struct { - Items map[string][]models.DriveItemable - DeltaUpdate map[string]api.DeltaUpdate - Err map[string]error +type NextPage struct { + Items []models.DriveItemable + Reset bool } -func (edi EnumeratesDriveItemsDelta) EnumerateDriveItemsDelta( +type EnumerateItemsDeltaByDrive struct { + DrivePagers map[string]*DriveItemsDeltaPager +} + +var _ api.NextPageResulter[models.DriveItemable] = &DriveItemsDeltaPager{} + +type DriveItemsDeltaPager struct { + Idx int + Pages []NextPage + DeltaUpdate api.DeltaUpdate + Err error +} + +func (edibd EnumerateItemsDeltaByDrive) EnumerateDriveItemsDelta( _ context.Context, driveID, _ string, - _ []string, -) ( - []models.DriveItemable, - api.DeltaUpdate, - error, -) { - return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID] + _ api.CallConfig, +) api.NextPageResulter[models.DriveItemable] { + didp := edibd.DrivePagers[driveID] + return didp } -func PagerResultToEDID( - m map[string][]apiMock.PagerResult[models.DriveItemable], -) EnumeratesDriveItemsDelta { - edi := EnumeratesDriveItemsDelta{ - Items: map[string][]models.DriveItemable{}, - DeltaUpdate: map[string]api.DeltaUpdate{}, - Err: map[string]error{}, +func (edi *DriveItemsDeltaPager) NextPage() ([]models.DriveItemable, bool, bool) { + if edi.Idx >= len(edi.Pages) { + return nil, false, true } - for driveID, results := range m { - var ( - err error - items = []models.DriveItemable{} - deltaUpdate api.DeltaUpdate - ) + np := edi.Pages[edi.Idx] + edi.Idx = edi.Idx + 1 - for _, pr := range results { - items = append(items, pr.Values...) + return np.Items, np.Reset, false +} - if pr.DeltaLink != nil { - deltaUpdate = api.DeltaUpdate{URL: ptr.Val(pr.DeltaLink)} - } - - if pr.Err != nil { - err = pr.Err - } - - deltaUpdate.Reset = deltaUpdate.Reset || pr.ResetDelta - } - - edi.Items[driveID] = items - edi.Err[driveID] = err - edi.DeltaUpdate[driveID] = deltaUpdate - } - - return edi +func (edi *DriveItemsDeltaPager) Results() (api.DeltaUpdate, error) { + return edi.DeltaUpdate, edi.Err } // --------------------------------------------------------------------------- diff --git a/src/internal/m365/service/sharepoint/backup_test.go b/src/internal/m365/service/sharepoint/backup_test.go index f1a68bbab..e9ae4c10f 100644 --- a/src/internal/m365/service/sharepoint/backup_test.go +++ b/src/internal/m365/service/sharepoint/backup_test.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/m365/collection/drive" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" + "github.com/alcionai/corso/src/internal/m365/service/onedrive/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" @@ -90,16 +91,29 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() { defer flush() var ( - paths = map[string]string{} - currPaths = map[string]string{} - excluded = map[string]struct{}{} - collMap = map[string]map[string]*drive.Collection{ + mbh = mock.DefaultSharePointBH(siteID) + du = api.DeltaUpdate{ + URL: "notempty", + Reset: false, + } + paths = map[string]string{} + excluded = map[string]struct{}{} + collMap = map[string]map[string]*drive.Collection{ driveID: {}, } ) + mbh.DriveItemEnumeration = mock.EnumerateItemsDeltaByDrive{ + DrivePagers: map[string]*mock.DriveItemsDeltaPager{ + driveID: { + Pages: []mock.NextPage{{Items: test.items}}, + DeltaUpdate: du, + }, + }, + } + c := drive.NewCollections( - drive.NewSiteBackupHandler(api.Drives{}, siteID, test.scope, path.SharePointService), + mbh, tenantID, idname.NewProvider(siteID, siteID), nil, @@ -107,15 +121,13 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() { c.CollectionMap = collMap - _, err := c.UpdateCollections( + _, _, err := c.PopulateDriveCollections( ctx, driveID, "General", - test.items, paths, - currPaths, excluded, - true, + "", fault.New(true)) test.expect(t, err, clues.ToCore(err)) diff --git a/src/pkg/fault/skipped.go b/src/pkg/fault/skipped.go index 126313c37..6aead57b1 100644 --- a/src/pkg/fault/skipped.go +++ b/src/pkg/fault/skipped.go @@ -1,9 +1,17 @@ package fault import ( + "context" + "github.com/alcionai/corso/src/cli/print" ) +// AddSkipper presents an interface that allows callers to +// write additional skipped items to the complying struct. +type AddSkipper interface { + AddSkip(ctx context.Context, s *Skipped) +} + // skipCause identifies the well-known conditions to Skip an item. It is // important that skip cause enumerations do not overlap with general error // handling. Skips must be well known, well documented, and consistent. diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index 80efefce0..2f069a5bf 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -57,8 +57,8 @@ func (c Channels) NewChannelMessagePager( Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), } - if len(cc.Props) > 0 { - options.QueryParameters.Select = cc.Props + if len(cc.Select) > 0 { + options.QueryParameters.Select = cc.Select } if len(cc.Expand) > 0 { @@ -83,7 +83,7 @@ func (c Channels) GetChannelMessages( ) ([]models.ChatMessageable, error) { ctx = clues.Add(ctx, "channel_id", channelID) pager := c.NewChannelMessagePager(teamID, channelID, cc) - items, err := enumerateItems[models.ChatMessageable](ctx, pager) + items, err := batchEnumerateItems[models.ChatMessageable](ctx, pager) return items, graph.Stack(ctx, err).OrNil() } @@ -265,7 +265,7 @@ func (c Channels) GetChannelMessageReplies( ctx context.Context, teamID, channelID, messageID string, ) ([]models.ChatMessageable, error) { - return enumerateItems[models.ChatMessageable]( + return batchEnumerateItems[models.ChatMessageable]( ctx, c.NewChannelMessageRepliesPager(teamID, channelID, messageID)) } @@ -322,5 +322,5 @@ func (c Channels) GetChannels( ctx context.Context, teamID string, ) ([]models.Channelable, error) { - return enumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID)) + return batchEnumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID)) } diff --git a/src/pkg/services/m365/api/client.go b/src/pkg/services/m365/api/client.go index 3a234b045..23bf34dfd 100644 --- a/src/pkg/services/m365/api/client.go +++ b/src/pkg/services/m365/api/client.go @@ -136,7 +136,7 @@ func (c Client) Post( type CallConfig struct { Expand []string - Props []string + Select []string } // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/contacts_pager.go b/src/pkg/services/m365/api/contacts_pager.go index 5d43a5172..068e7406c 100644 --- a/src/pkg/services/m365/api/contacts_pager.go +++ b/src/pkg/services/m365/api/contacts_pager.go @@ -84,7 +84,7 @@ func (c Contacts) EnumerateContainers( pgr = c.NewContactFoldersPager(userID, baseContainerID, immutableIDs) ) - containers, err := enumerateItems(ctx, pgr) + containers, err := batchEnumerateItems(ctx, pgr) if err != nil { return graph.Stack(ctx, err) } @@ -165,7 +165,7 @@ func (c Contacts) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating contacts") } @@ -186,7 +186,7 @@ func (c Contacts) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewContactsPager(userID, containerID, false, idAnd()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating contacts") } diff --git a/src/pkg/services/m365/api/drive_pager.go b/src/pkg/services/m365/api/drive_pager.go index 2a9527712..708fa3439 100644 --- a/src/pkg/services/m365/api/drive_pager.go +++ b/src/pkg/services/m365/api/drive_pager.go @@ -77,7 +77,7 @@ func (c Drives) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewDriveItemPager(driveID, containerID, idAnd("name")...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating drive items") } @@ -101,7 +101,7 @@ func (c Drives) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewDriveItemPager(driveID, containerID, idAnd("file", "folder")...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating contacts") } @@ -133,7 +133,7 @@ type DriveItemDeltaPageCtrl struct { func (c Drives) newDriveItemDeltaPager( driveID, prevDeltaLink string, - selectProps ...string, + cc CallConfig, ) *DriveItemDeltaPageCtrl { preferHeaderItems := []string{ "deltashowremovedasdeleted", @@ -147,8 +147,8 @@ func (c Drives) newDriveItemDeltaPager( QueryParameters: &drives.ItemItemsItemDeltaRequestBuilderGetQueryParameters{}, } - if len(selectProps) > 0 { - options.QueryParameters.Select = selectProps + if len(cc.Select) > 0 { + options.QueryParameters.Select = cc.Select } builder := c.Stable. @@ -197,26 +197,33 @@ func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool { return true } -// EnumerateDriveItems will enumerate all items in the specified drive and hand them to the -// provided `collector` method +// EnumerateDriveItems will enumerate all items in the specified drive and stream them page +// by page, along with the delta update and any errors, to the provided channel. func (c Drives) EnumerateDriveItemsDelta( ctx context.Context, driveID string, prevDeltaLink string, - selectProps []string, -) ( - []models.DriveItemable, - DeltaUpdate, - error, -) { - pager := c.newDriveItemDeltaPager(driveID, prevDeltaLink, selectProps...) + cc CallConfig, +) NextPageResulter[models.DriveItemable] { + deltaPager := c.newDriveItemDeltaPager( + driveID, + prevDeltaLink, + cc) - items, du, err := deltaEnumerateItems[models.DriveItemable](ctx, pager, prevDeltaLink) - if err != nil { - return nil, du, clues.Stack(err) + npr := &nextPageResults[models.DriveItemable]{ + pages: make(chan nextPage[models.DriveItemable]), } - return items, du, nil + // asynchronously enumerate pages on the caller's behalf. + // they only need to consume the pager and call Results at + // the end. + go deltaEnumerateItems[models.DriveItemable]( + ctx, + deltaPager, + npr, + prevDeltaLink) + + return npr } // --------------------------------------------------------------------------- @@ -354,11 +361,17 @@ func GetAllDrives( ctx context.Context, pager Pager[models.Driveable], ) ([]models.Driveable, error) { - ds, err := enumerateItems(ctx, pager) - if err != nil && (clues.HasLabel(err, graph.LabelsMysiteNotFound) || - clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { + ds, err := batchEnumerateItems(ctx, pager) + + // no license or drives available. + // return a non-error and let the caller assume an empty result set. + // TODO: is this the best way to handle this? + // what about returning a ResourceNotFound error as is standard elsewhere? + if err != nil && + (clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { logger.CtxErr(ctx, err).Infof("resource owner does not have a drive") - return make([]models.Driveable, 0), nil // no license or drives. + + return make([]models.Driveable, 0), nil } return ds, graph.Stack(ctx, err).OrNil() diff --git a/src/pkg/services/m365/api/drive_pager_test.go b/src/pkg/services/m365/api/drive_pager_test.go index fad440ee0..619f16932 100644 --- a/src/pkg/services/m365/api/drive_pager_test.go +++ b/src/pkg/services/m365/api/drive_pager_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/alcionai/clues" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -185,11 +186,28 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() { ctx, flush := tester.NewContext(t) defer flush() - items, du, err := suite.its. + items := []models.DriveItemable{} + + pager := suite.its. ac. Drives(). - EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps()) + EnumerateDriveItemsDelta( + ctx, + suite.its.user.driveID, + "", + api.CallConfig{ + Select: api.DefaultDriveItemProps(), + }) + + for page, reset, done := pager.NextPage(); !done; page, reset, done = pager.NextPage() { + items = append(items, page...) + + assert.False(t, reset, "should not reset") + } + + du, err := pager.Results() + require.NoError(t, err, clues.ToCore(err)) - require.NotEmpty(t, items, "no items found in user's drive") + require.NotEmpty(t, items, "should find items in user's drive") assert.NotEmpty(t, du.URL, "should have a delta link") } diff --git a/src/pkg/services/m365/api/events_pager.go b/src/pkg/services/m365/api/events_pager.go index 38c985770..05474367f 100644 --- a/src/pkg/services/m365/api/events_pager.go +++ b/src/pkg/services/m365/api/events_pager.go @@ -85,7 +85,7 @@ func (c Events) EnumerateContainers( pgr = c.NewEventCalendarsPager(userID, immutableIDs) ) - containers, err := enumerateItems(ctx, pgr) + containers, err := batchEnumerateItems(ctx, pgr) if err != nil { return graph.Stack(ctx, err) } @@ -169,7 +169,7 @@ func (c Events) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating events") } @@ -190,7 +190,7 @@ func (c Events) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewEventsPager(userID, containerID, false, idAnd()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating events") } diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index 4e524a58f..ef15836a4 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -28,6 +28,85 @@ type DeltaUpdate struct { Reset bool } +type NextPager[T any] interface { + // reset should only true on the iteration where the delta pager's Reset() + // is called. Callers can use it to reset any data aggregation they + // currently use. After that loop it can be false again, though the + // DeltaUpdate will eventually contain the expected reset value. + // Items may or may not be >0 when reset == true. In that case, the + // items should always represent the next page of data following a reset. + // Callers should always handle the reset first, and follow-up with + // item population. + NextPage() (items []T, reset, done bool) +} + +type nextPage[T any] struct { + items []T + reset bool +} + +type NextPageResulter[T any] interface { + NextPager[T] + + Results() (DeltaUpdate, error) +} + +var _ NextPageResulter[any] = &nextPageResults[any]{} + +type nextPageResults[T any] struct { + pages chan nextPage[T] + du DeltaUpdate + err error +} + +func (npr *nextPageResults[T]) writeNextPage( + ctx context.Context, + items []T, + reset bool, +) error { + if npr.pages == nil { + return clues.New("pager already closed") + } + + select { + case <-ctx.Done(): + return clues.Wrap(context.Canceled, "writing next page") + case npr.pages <- nextPage[T]{ + items: items, + reset: reset, + }: + return nil + } +} + +func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) { + if npr.pages == nil { + return nil, false, true + } + + np, ok := <-npr.pages + + return np.items, np.reset, !ok +} + +func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { + if npr.pages != nil { + //nolint:revive + for range npr.pages { + // if the pager hasn't closed yet, drain out the pages iterator + // to avoid leaking routines, and to ensure we get results. + } + } + + return npr.du, npr.err +} + +func (npr *nextPageResults[T]) close() { + if npr.pages != nil { + close(npr.pages) + } +} + // --------------------------------------------------------------------------- // common interfaces // --------------------------------------------------------------------------- @@ -101,7 +180,10 @@ type Pager[T any] interface { func enumerateItems[T any]( ctx context.Context, pager Pager[T], -) ([]T, error) { + npr *nextPageResults[T], +) { + defer npr.close() + var ( result = make([]T, 0) // stubbed initial value to ensure we enter the loop. @@ -112,18 +194,43 @@ func enumerateItems[T any]( // get the next page of data, check for standard errors page, err := pager.GetPage(ctx) if err != nil { - return nil, graph.Stack(ctx, err) + npr.err = graph.Stack(ctx, err) + return + } + + if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil { + npr.err = clues.Stack(err) + return } - result = append(result, page.GetValue()...) nextLink = NextLink(page) pager.SetNextLink(nextLink) } logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) +} - return result, nil +func batchEnumerateItems[T any]( + ctx context.Context, + pager Pager[T], +) ([]T, error) { + var ( + npr = nextPageResults[T]{ + pages: make(chan nextPage[T]), + } + items = []T{} + ) + + go enumerateItems[T](ctx, pager, &npr) + + for page, _, done := npr.NextPage(); !done; page, _, done = npr.NextPage() { + items = append(items, page...) + } + + _, err := npr.Results() + + return items, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -137,11 +244,17 @@ type DeltaPager[T any] interface { ValidModTimer } +// enumerates pages of items, streaming each page to the provided channel. +// the DeltaUpdate, reset notifications, and any errors are also fed to the +// same channel. func deltaEnumerateItems[T any]( ctx context.Context, pager DeltaPager[T], + npr *nextPageResults[T], prevDeltaLink string, -) ([]T, DeltaUpdate, error) { +) { + defer npr.close() + var ( result = make([]T, 0) // stubbed initial value to ensure we enter the loop. @@ -161,28 +274,47 @@ func deltaEnumerateItems[T any]( page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume)) if graph.IsErrDeltaNotSupported(err) { logger.Ctx(ctx).Infow("delta queries not supported") - return nil, DeltaUpdate{}, clues.Stack(graph.ErrDeltaNotSupported, err) + + pager.Reset(ctx) + + if err := npr.writeNextPage(ctx, nil, true); err != nil { + npr.err = clues.Stack(err) + return + } + + npr.err = clues.Stack(err) + + return } if graph.IsErrInvalidDelta(err) { logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink) invalidPrevDelta = true - // Reset limiter consumption since we don't have a valid delta token. + + // Set limiter consumption rate to non-delta. consume = graph.DeltaNoTokenLC - result = make([]T, 0) // Reset tells the pager to try again after ditching its delta history. pager.Reset(ctx) + if err := npr.writeNextPage(ctx, nil, true); err != nil { + npr.err = clues.Stack(err) + return + } + continue } if err != nil { - return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "retrieving page") + npr.err = clues.Stack(err) + return } - result = append(result, page.GetValue()...) + if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil { + npr.err = clues.Stack(err) + return + } nl, deltaLink := NextAndDeltaLink(page) if len(deltaLink) > 0 { @@ -195,12 +327,37 @@ func deltaEnumerateItems[T any]( logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result)) - du := DeltaUpdate{ + npr.du = DeltaUpdate{ URL: newDeltaLink, Reset: invalidPrevDelta, } +} - return result, du, nil +func batchDeltaEnumerateItems[T any]( + ctx context.Context, + pager DeltaPager[T], + prevDeltaLink string, +) ([]T, DeltaUpdate, error) { + var ( + npr = nextPageResults[T]{ + pages: make(chan nextPage[T]), + } + results = []T{} + ) + + go deltaEnumerateItems[T](ctx, pager, &npr, prevDeltaLink) + + for page, reset, done := npr.NextPage(); !done; page, reset, done = npr.NextPage() { + if reset { + results = []T{} + } + + results = append(results, page...) + } + + du, err := npr.Results() + + return results, du, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -226,7 +383,7 @@ func getAddedAndRemovedItemIDs[T any]( filters ...func(T) bool, ) (map[string]time.Time, bool, []string, DeltaUpdate, error) { if canMakeDeltaQueries { - ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) + ts, du, err := batchDeltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) { return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) } @@ -239,7 +396,7 @@ func getAddedAndRemovedItemIDs[T any]( du := DeltaUpdate{Reset: true} - ts, err := enumerateItems(ctx, pager) + ts, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) } diff --git a/src/pkg/services/m365/api/item_pager_test.go b/src/pkg/services/m365/api/item_pager_test.go index 01398a343..f3f260f68 100644 --- a/src/pkg/services/m365/api/item_pager_test.go +++ b/src/pkg/services/m365/api/item_pager_test.go @@ -272,7 +272,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() { pageErr: assert.AnError, } }, - expect: nil, + expect: []testItem{}, expectErr: require.Error, }, } @@ -284,7 +284,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() { ctx, flush := tester.NewContext(t) defer flush() - result, err := enumerateItems(ctx, test.getPager(t, ctx)) + result, err := batchEnumerateItems(ctx, test.getPager(t, ctx)) test.expectErr(t, err, clues.ToCore(err)) require.EqualValues(t, test.expect, result) diff --git a/src/pkg/services/m365/api/mail_pager.go b/src/pkg/services/m365/api/mail_pager.go index 1aa6c8414..7882de7cd 100644 --- a/src/pkg/services/m365/api/mail_pager.go +++ b/src/pkg/services/m365/api/mail_pager.go @@ -81,7 +81,7 @@ func (c Mail) EnumerateContainers( pgr = c.NewMailFoldersPager(userID, immutableIDs) ) - containers, err := enumerateItems(ctx, pgr) + containers, err := batchEnumerateItems(ctx, pgr) if err != nil { return graph.Stack(ctx, err) } @@ -162,7 +162,7 @@ func (c Mail) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating mails") } @@ -183,7 +183,7 @@ func (c Mail) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewMailPager(userID, containerID, false, idAnd()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating mails") }