Compare commits

...

10 Commits

Author SHA1 Message Date
ryanfkeepers
3462edfccd fix url cache tests 2023-10-11 10:59:40 -06:00
ryanfkeepers
96b9f752ba fix up tests, apply latest design 2023-10-11 10:57:12 -06:00
ryanfkeepers
c805c8f8e5 apply channels and streaming to drive collections
Updates drive collection processing with the new pattern of
streaming pages from the api using channels.

This is the last in a multipart update that has been separated
for ease of review.  Everything should now pass.
2023-10-11 10:56:55 -06:00
ryanfkeepers
45aac829dc rename/recount url cache delta queries, update test 2023-10-11 10:11:18 -06:00
ryanfkeepers
3c44837418 update branch to latest design 2023-10-10 17:12:54 -06:00
ryanfkeepers
2b7d76faef apply channels and streaming to the url cache
updates the url cache item enumeration with stream processing
using channels.  Plus other changes to accomodate the
new pattern such as mock and interface updates.

This is the second of a multipart update that has been separated
for ease of review.  CI is not expected to pass until the final
PR.
2023-10-10 16:22:50 -06:00
ryanfkeepers
a81f526cc6 don't set np chan to nil 2023-10-10 16:11:07 -06:00
ryanfkeepers
41130a2d5f fixing up npe possibilities in pager channel handling 2023-10-10 16:11:07 -06:00
ryanfkeepers
eb0219de6e replace channel creation with pager interface
instead of having callers pass in channels, create channels
when starting an api enumeration and return a pager to the
caller which allows them to retrieve each page at a time.
2023-10-10 16:11:06 -06:00
ryanfkeepers
4a295be9ed api pagers use channels for page streaming
Replaces the batch processing of api pagers with stream
processing using channels.  Each page is passed to the
channel for callers to consume iteratively.  For delta pagers,
the reset notification is also streamed through the same
channel.  Channel consumers are expeced to react to the
reset as needed.

For quality of life, most pagers now utilize a batching
wrapper instead of transitioning to streaming support.
Only locations that already expect stream handling will
get moved to that support in the following PRs.

This is the first of a multipart update that has been separated
for ease of review.  CI is not expected to pass until the final
PR.
2023-10-10 16:08:33 -06:00
20 changed files with 1412 additions and 792 deletions

View File

@ -272,13 +272,6 @@ func (c *Collections) Get(
excludedItemIDs = map[string]struct{}{} excludedItemIDs = map[string]struct{}{}
oldPrevPaths = oldPrevPathsByDriveID[driveID] oldPrevPaths = oldPrevPathsByDriveID[driveID]
prevDeltaLink = prevDriveIDToDelta[driveID] prevDeltaLink = prevDriveIDToDelta[driveID]
// itemCollection is used to identify which collection a
// file belongs to. This is useful to delete a file from the
// collection it was previously in, in case it was moved to a
// different collection within the same delta query
// item ID -> item ID
itemCollection = map[string]string{}
) )
delete(driveTombstones, driveID) delete(driveTombstones, driveID)
@ -295,13 +288,16 @@ func (c *Collections) Get(
"previous metadata for drive", "previous metadata for drive",
"num_paths_entries", len(oldPrevPaths)) "num_paths_entries", len(oldPrevPaths))
items, du, err := c.handler.EnumerateDriveItemsDelta( du, newPrevPaths, err := c.PopulateDriveCollections(
ictx, ctx,
driveID, driveID,
driveName,
oldPrevPaths,
excludedItemIDs,
prevDeltaLink, prevDeltaLink,
api.DefaultDriveItemProps()) errs)
if err != nil { if err != nil {
return nil, false, err return nil, false, clues.Stack(err)
} }
// It's alright to have an empty folders map (i.e. no folders found) but not // It's alright to have an empty folders map (i.e. no folders found) but not
@ -313,20 +309,6 @@ func (c *Collections) Get(
driveIDToDeltaLink[driveID] = du.URL driveIDToDeltaLink[driveID] = du.URL
} }
newPrevPaths, err := c.UpdateCollections(
ctx,
driveID,
driveName,
items,
oldPrevPaths,
itemCollection,
excludedItemIDs,
du.Reset,
errs)
if err != nil {
return nil, false, clues.Stack(err)
}
// Avoid the edge case where there's no paths but we do have a valid delta // Avoid the edge case where there's no paths but we do have a valid delta
// token. We can accomplish this by adding an empty paths map for this // token. We can accomplish this by adding an empty paths map for this
// drive. If we don't have this then the next backup won't use the delta // drive. If we don't have this then the next backup won't use the delta
@ -688,224 +670,284 @@ func (c *Collections) getCollectionPath(
return collectionPath, nil return collectionPath, nil
} }
// UpdateCollections initializes and adds the provided drive items to Collections // PopulateDriveCollections initializes and adds the provided drive items to Collections
// A new collection is created for every drive folder (or package). // A new collection is created for every drive folder.
// oldPrevPaths is the unchanged data that was loaded from the metadata file. // oldPrevPaths is the unchanged data that was loaded from the metadata file.
// This map is not modified during the call. // This map is not modified during the call.
// currPrevPaths starts as a copy of oldPaths and is updated as changes are found in // currPrevPaths starts as a copy of oldPaths and is updated as changes are found in
// the returned results. Items are added to this collection throughout the call. // the returned results. Items are added to this collection throughout the call.
// newPrevPaths, ie: the items added during this call, get returned as a map. // newPrevPaths, ie: the items added during this call, get returned as a map.
func (c *Collections) UpdateCollections( func (c *Collections) PopulateDriveCollections(
ctx context.Context, ctx context.Context,
driveID, driveName string, driveID, driveName string,
items []models.DriveItemable,
oldPrevPaths map[string]string, oldPrevPaths map[string]string,
currPrevPaths map[string]string, excludedItemIDs map[string]struct{},
excluded map[string]struct{}, prevDeltaLink string,
invalidPrevDelta bool,
errs *fault.Bus, errs *fault.Bus,
) (map[string]string, error) { ) (api.DeltaUpdate, map[string]string, error) {
var ( var (
el = errs.Local() el = errs.Local()
newPrevPaths = map[string]string{} newPrevPaths = map[string]string{}
invalidPrevDelta = len(prevDeltaLink) == 0
// currPrevPaths is used to identify which collection a
// file belongs to. This is useful to delete a file from the
// collection it was previously in, in case it was moved to a
// different collection within the same delta query
// item ID -> item ID
currPrevPaths = map[string]string{}
) )
if !invalidPrevDelta { if !invalidPrevDelta {
maps.Copy(newPrevPaths, oldPrevPaths) maps.Copy(newPrevPaths, oldPrevPaths)
} }
for _, item := range items { pager := c.handler.EnumerateDriveItemsDelta(
ctx,
driveID,
prevDeltaLink,
api.CallConfig{
Select: api.DefaultDriveItemProps(),
})
page, reset, done := pager.NextPage()
for ; !done; page, reset, done = pager.NextPage() {
if el.Failure() != nil { if el.Failure() != nil {
break break
} }
var ( if reset {
itemID = ptr.Val(item.GetId()) newPrevPaths = map[string]string{}
itemName = ptr.Val(item.GetName()) currPrevPaths = map[string]string{}
isFolder = item.GetFolder() != nil || item.GetPackageEscaped() != nil c.CollectionMap[driveID] = map[string]*Collection{}
ictx = clues.Add( invalidPrevDelta = true
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_is_folder", isFolder)
)
if item.GetMalware() != nil {
addtl := graph.ItemInfo(item)
skip := fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
if isFolder {
skip = fault.ContainerSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
}
errs.AddSkip(ctx, skip)
logger.Ctx(ctx).Infow("malware detected", "item_details", addtl)
continue
} }
// Deleted file or folder. for _, item := range page {
if item.GetDeleted() != nil { if el.Failure() != nil {
if err := c.handleDelete( break
itemID, }
err := c.processItem(
ctx,
item,
driveID, driveID,
driveName,
oldPrevPaths, oldPrevPaths,
currPrevPaths, currPrevPaths,
newPrevPaths, newPrevPaths,
isFolder, excludedItemIDs,
excluded,
invalidPrevDelta); err != nil {
return nil, clues.Stack(err).WithClues(ictx)
}
continue
}
collectionPath, err := c.getCollectionPath(driveID, item)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err).
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
continue
}
// Skip items that don't match the folder selectors we were given.
if shouldSkip(ctx, collectionPath, c.handler, driveName) {
logger.Ctx(ictx).Debugw("path not selected", "skipped_path", collectionPath.String())
continue
}
switch {
case isFolder:
// Deletions are handled above so this is just moves/renames.
var prevPath path.Path
prevPathStr, ok := oldPrevPaths[itemID]
if ok {
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "invalid previous path").
WithClues(ictx).
With("prev_path_string", path.LoggableDir(prevPathStr)))
}
} else if item.GetRoot() != nil {
// Root doesn't move or get renamed.
prevPath = collectionPath
}
// Moved folders don't cause delta results for any subfolders nested in
// them. We need to go through and update paths to handle that. We only
// update newPaths so we don't accidentally clobber previous deletes.
updatePath(newPrevPaths, itemID, collectionPath.String())
found, err := updateCollectionPaths(driveID, itemID, c.CollectionMap, collectionPath)
if err != nil {
return nil, clues.Stack(err).WithClues(ictx)
}
if found {
continue
}
colScope := CollectionScopeFolder
if item.GetPackageEscaped() != nil {
colScope = CollectionScopePackage
}
col, err := NewCollection(
c.handler,
c.protectedResource,
collectionPath,
prevPath,
driveID,
c.statusUpdater,
c.ctrl,
colScope,
invalidPrevDelta, invalidPrevDelta,
nil) el)
if err != nil { if err != nil {
return nil, clues.Stack(err).WithClues(ictx) el.AddRecoverable(ctx, clues.Stack(err))
} }
col.driveName = driveName
c.CollectionMap[driveID][itemID] = col
c.NumContainers++
if item.GetRoot() != nil {
continue
}
// Add an entry to fetch permissions into this collection. This assumes
// that OneDrive always returns all folders on the path of an item
// before the item. This seems to hold true for now at least.
if col.Add(item) {
c.NumItems++
}
case item.GetFile() != nil:
// Deletions are handled above so this is just moves/renames.
if len(ptr.Val(item.GetParentReference().GetId())) == 0 {
return nil, clues.New("file without parent ID").WithClues(ictx)
}
// Get the collection for this item.
parentID := ptr.Val(item.GetParentReference().GetId())
ictx = clues.Add(ictx, "parent_id", parentID)
collection, ok := c.CollectionMap[driveID][parentID]
if !ok {
return nil, clues.New("item seen before parent folder").WithClues(ictx)
}
// This will only kick in if the file was moved multiple times
// within a single delta query. We delete the file from the previous
// collection so that it doesn't appear in two places.
prevParentContainerID, ok := currPrevPaths[itemID]
if ok {
prevColl, found := c.CollectionMap[driveID][prevParentContainerID]
if !found {
return nil, clues.New("previous collection not found").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
if ok := prevColl.Remove(itemID); !ok {
return nil, clues.New("removing item from prev collection").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
}
currPrevPaths[itemID] = parentID
if collection.Add(item) {
c.NumItems++
c.NumFiles++
}
// Do this after adding the file to the collection so if we fail to add
// the item to the collection for some reason and we're using best effort
// we don't just end up deleting the item in the resulting backup. The
// resulting backup will be slightly incorrect, but it will have the most
// data that we were able to preserve.
if !invalidPrevDelta {
// Always add a file to the excluded list. The file may have been
// renamed/moved/modified, so we still have to drop the
// original one and download a fresh copy.
excluded[itemID+metadata.DataFileSuffix] = struct{}{}
excluded[itemID+metadata.MetaFileSuffix] = struct{}{}
}
default:
el.AddRecoverable(ictx, clues.New("item is neither folder nor file").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
} }
} }
return newPrevPaths, el.Failure() du, err := pager.Results()
if err != nil {
return du, nil, clues.Stack(err)
}
return du, newPrevPaths, el.Failure()
}
func (c *Collections) processItem(
ctx context.Context,
item models.DriveItemable,
driveID, driveName string,
oldPrevPaths, currPrevPaths, newPrevPaths map[string]string,
excluded map[string]struct{},
invalidPrevDelta bool,
skipper fault.AddSkipper,
) error {
var (
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
isFolder = item.GetFolder() != nil || item.GetPackageEscaped() != nil
ictx = clues.Add(
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_is_folder", isFolder)
)
if item.GetMalware() != nil {
addtl := graph.ItemInfo(item)
skip := fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
if isFolder {
skip = fault.ContainerSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
}
skipper.AddSkip(ctx, skip)
logger.Ctx(ctx).Infow("malware detected", "item_details", addtl)
return nil
}
// Deleted file or folder.
if item.GetDeleted() != nil {
err := c.handleDelete(
itemID,
driveID,
oldPrevPaths,
currPrevPaths,
newPrevPaths,
isFolder,
excluded,
invalidPrevDelta)
return clues.Stack(err).WithClues(ictx).OrNil()
}
collectionPath, err := c.getCollectionPath(driveID, item)
if err != nil {
return clues.Stack(err).
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation)
}
// Skip items that don't match the folder selectors we were given.
if shouldSkip(ctx, collectionPath, c.handler, driveName) {
logger.Ctx(ictx).Debugw("path not selected", "skipped_path", collectionPath.String())
return nil
}
switch {
case isFolder:
// Deletions are handled above so this is just moves/renames.
var prevPath path.Path
prevPathStr, ok := oldPrevPaths[itemID]
if ok {
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
if err != nil {
return clues.Wrap(err, "invalid previous path").
WithClues(ictx).
With("prev_path_string", path.LoggableDir(prevPathStr))
}
} else if item.GetRoot() != nil {
// Root doesn't move or get renamed.
prevPath = collectionPath
}
// Moved folders don't cause delta results for any subfolders nested in
// them. We need to go through and update paths to handle that. We only
// update newPaths so we don't accidentally clobber previous deletes.
updatePath(newPrevPaths, itemID, collectionPath.String())
found, err := updateCollectionPaths(
driveID,
itemID,
c.CollectionMap,
collectionPath)
if err != nil {
return clues.Stack(err).WithClues(ictx)
}
if found {
return nil
}
colScope := CollectionScopeFolder
if item.GetPackageEscaped() != nil {
colScope = CollectionScopePackage
}
col, err := NewCollection(
c.handler,
c.protectedResource,
collectionPath,
prevPath,
driveID,
c.statusUpdater,
c.ctrl,
colScope,
invalidPrevDelta,
nil)
if err != nil {
return clues.Stack(err).WithClues(ictx)
}
col.driveName = driveName
c.CollectionMap[driveID][itemID] = col
c.NumContainers++
if item.GetRoot() != nil {
return nil
}
// Add an entry to fetch permissions into this collection. This assumes
// that OneDrive always returns all folders on the path of an item
// before the item. This seems to hold true for now at least.
if col.Add(item) {
c.NumItems++
}
case item.GetFile() != nil:
// Deletions are handled above so this is just moves/renames.
if len(ptr.Val(item.GetParentReference().GetId())) == 0 {
return clues.New("file without parent ID").WithClues(ictx)
}
// Get the collection for this item.
parentID := ptr.Val(item.GetParentReference().GetId())
ictx = clues.Add(ictx, "parent_id", parentID)
collection, ok := c.CollectionMap[driveID][parentID]
if !ok {
return clues.New("item seen before parent folder").WithClues(ictx)
}
// This will only kick in if the file was moved multiple times
// within a single delta query. We delete the file from the previous
// collection so that it doesn't appear in two places.
prevParentContainerID, ok := currPrevPaths[itemID]
if ok {
prevColl, found := c.CollectionMap[driveID][prevParentContainerID]
if !found {
return clues.New("previous collection not found").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
if ok := prevColl.Remove(itemID); !ok {
return clues.New("removing item from prev collection").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
}
currPrevPaths[itemID] = parentID
if collection.Add(item) {
c.NumItems++
c.NumFiles++
}
// Do this after adding the file to the collection so if we fail to add
// the item to the collection for some reason and we're using best effort
// we don't just end up deleting the item in the resulting backup. The
// resulting backup will be slightly incorrect, but it will have the most
// data that we were able to preserve.
if !invalidPrevDelta {
// Always add a file to the excluded list. The file may have been
// renamed/moved/modified, so we still have to drop the
// original one and download a fresh copy.
excluded[itemID+metadata.DataFileSuffix] = struct{}{}
excluded[itemID+metadata.MetaFileSuffix] = struct{}{}
}
default:
return clues.New("item is neither folder nor file").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation)
}
return nil
} }
type dirScopeChecker interface { type dirScopeChecker interface {
@ -913,7 +955,12 @@ type dirScopeChecker interface {
IncludesDir(dir string) bool IncludesDir(dir string) bool
} }
func shouldSkip(ctx context.Context, drivePath path.Path, dsc dirScopeChecker, driveName string) bool { func shouldSkip(
ctx context.Context,
drivePath path.Path,
dsc dirScopeChecker,
driveName string,
) bool {
return !includePath(ctx, dsc, drivePath) || return !includePath(ctx, dsc, drivePath) ||
(drivePath.Category() == path.LibrariesCategory && restrictedDirectory == driveName) (drivePath.Category() == path.LibrariesCategory && restrictedDirectory == driveName)
} }

View File

@ -119,7 +119,7 @@ func getDelList(files ...string) map[string]struct{} {
return delList return delList
} }
func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() { func (suite *OneDriveCollectionsUnitSuite) TestPopulateDriveCollections() {
anyFolder := (&selectors.OneDriveBackup{}).Folders(selectors.Any())[0] anyFolder := (&selectors.OneDriveBackup{}).Folders(selectors.Any())[0]
const ( const (
@ -690,8 +690,10 @@ func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() {
expectedItemCount: 0, expectedItemCount: 0,
expectedFileCount: 0, expectedFileCount: 0,
expectedContainerCount: 1, expectedContainerCount: 1,
expectedPrevPaths: nil, expectedPrevPaths: map[string]string{
expectedExcludes: map[string]struct{}{}, "root": expectedPath(""),
},
expectedExcludes: map[string]struct{}{},
}, },
{ {
name: "1 root file, 1 folder, 1 package, 1 good file, 1 malware", name: "1 root file, 1 folder, 1 package, 1 good file, 1 malware",
@ -732,15 +734,31 @@ func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() {
defer flush() defer flush()
var ( var (
excludes = map[string]struct{}{} mbh = mock.DefaultOneDriveBH(user)
currPrevPaths = map[string]string{} du = api.DeltaUpdate{
errs = fault.New(true) URL: "notempty",
Reset: false,
}
excludes = map[string]struct{}{}
errs = fault.New(true)
) )
maps.Copy(currPrevPaths, test.inputFolderMap) mbh.DriveItemEnumeration = mock.EnumerateItemsDeltaByDrive{
DrivePagers: map[string]*mock.DriveItemsDeltaPager{
driveID: {
Pages: []mock.NextPage{{Items: test.items}},
DeltaUpdate: du,
},
},
}
sel := selectors.NewOneDriveBackup([]string{user})
sel.Include([]selectors.OneDriveScope{test.scope})
mbh.Sel = sel.Selector
c := NewCollections( c := NewCollections(
&itemBackupHandler{api.Drives{}, user, test.scope}, mbh,
tenant, tenant,
idname.NewProvider(user, user), idname.NewProvider(user, user),
nil, nil,
@ -748,18 +766,19 @@ func (suite *OneDriveCollectionsUnitSuite) TestUpdateCollections() {
c.CollectionMap[driveID] = map[string]*Collection{} c.CollectionMap[driveID] = map[string]*Collection{}
newPrevPaths, err := c.UpdateCollections( _, newPrevPaths, err := c.PopulateDriveCollections(
ctx, ctx,
driveID, driveID,
"General", "General",
test.items,
test.inputFolderMap, test.inputFolderMap,
currPrevPaths,
excludes, excludes,
false, "smarf",
errs) errs)
test.expect(t, err, clues.ToCore(err)) test.expect(t, err, clues.ToCore(err))
assert.Equal(t, len(test.expectedCollectionIDs), len(c.CollectionMap[driveID]), "total collections") assert.ElementsMatch(
t,
maps.Keys(test.expectedCollectionIDs),
maps.Keys(c.CollectionMap[driveID]))
assert.Equal(t, test.expectedItemCount, c.NumItems, "item count") assert.Equal(t, test.expectedItemCount, c.NumItems, "item count")
assert.Equal(t, test.expectedFileCount, c.NumFiles, "file count") assert.Equal(t, test.expectedFileCount, c.NumFiles, "file count")
assert.Equal(t, test.expectedContainerCount, c.NumContainers, "container count") assert.Equal(t, test.expectedContainerCount, c.NumContainers, "container count")
@ -1166,7 +1185,6 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
tenant = "a-tenant" tenant = "a-tenant"
user = "a-user" user = "a-user"
empty = "" empty = ""
next = "next"
delta = "delta1" delta = "delta1"
delta2 = "delta2" delta2 = "delta2"
) )
@ -1208,7 +1226,7 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
table := []struct { table := []struct {
name string name string
drives []models.Driveable drives []models.Driveable
items map[string][]apiMock.PagerResult[models.DriveItemable] enumerator mock.EnumerateItemsDeltaByDrive
canUsePreviousBackup bool canUsePreviousBackup bool
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
prevFolderPaths map[string]map[string]string prevFolderPaths map[string]map[string]string
@ -1227,14 +1245,16 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_DelFileOnly_NoFolders_NoErrors", name: "OneDrive_OneItemPage_DelFileOnly_NoFolders_NoErrors",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{
driveRootItem("root"), // will be present, not needed Items: []models.DriveItemable{
delItem("file", driveBasePath1, "root", true, false, false), driveRootItem("root"), // will be present, not needed
}, delItem("file", driveBasePath1, "root", true, false, false),
DeltaLink: &delta, },
}},
DeltaUpdate: api.DeltaUpdate{URL: delta},
}, },
}, },
}, },
@ -1259,14 +1279,16 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_NoFolderDeltas_NoErrors", name: "OneDrive_OneItemPage_NoFolderDeltas_NoErrors",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{
driveRootItem("root"), Items: []models.DriveItemable{
driveItem("file", "file", driveBasePath1, "root", true, false, false), driveRootItem("root"),
}, driveItem("file", "file", driveBasePath1, "root", true, false, false),
DeltaLink: &delta, },
}},
DeltaUpdate: api.DeltaUpdate{URL: delta},
}, },
}, },
}, },
@ -1291,16 +1313,17 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_NoErrors", name: "OneDrive_OneItemPage_NoErrors",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{
driveRootItem("root"), Items: []models.DriveItemable{
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveRootItem("root"),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
}, driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
DeltaLink: &delta, },
ResetDelta: true, }},
DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
}, },
}, },
}, },
@ -1329,17 +1352,18 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_NoErrors_FileRenamedMultiple", name: "OneDrive_OneItemPage_NoErrors_FileRenamedMultiple",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{
driveRootItem("root"), Items: []models.DriveItemable{
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveRootItem("root"),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file2", driveBasePath1+"/folder", "folder", true, false, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
}, driveItem("file", "file2", driveBasePath1+"/folder", "folder", true, false, false),
DeltaLink: &delta, },
ResetDelta: true, }},
DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
}, },
}, },
}, },
@ -1368,16 +1392,16 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_NoErrors_FileMovedMultiple", name: "OneDrive_OneItemPage_NoErrors_FileMovedMultiple",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
driveItem("file", "file2", driveBasePath1, "root", true, false, false), driveItem("file", "file2", driveBasePath1, "root", true, false, false),
}, }}},
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta},
}, },
}, },
}, },
@ -1408,16 +1432,15 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_EmptyDelta_NoErrors", name: "OneDrive_OneItemPage_EmptyDelta_NoErrors",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
}, }}},
DeltaLink: &empty, // probably will never happen with graph DeltaUpdate: api.DeltaUpdate{URL: empty, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -1446,25 +1469,146 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_TwoItemPages_NoErrors", name: "OneDrive_TwoItemPages_NoErrors",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), Items: []models.DriveItemable{
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false),
},
},
}, },
NextLink: &next, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
{ },
Values: []models.DriveItemable{ },
driveRootItem("root"), canUsePreviousBackup: true,
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), errCheck: assert.NoError,
driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false), prevFolderPaths: map[string]map[string]string{
driveID1: {},
},
expectedCollections: map[string]map[data.CollectionState][]string{
rootFolderPath1: {data.NewState: {}},
folderPath1: {data.NewState: {"folder", "file", "file2"}},
},
expectedDeltaURLs: map[string]string{
driveID1: delta,
},
expectedFolderPaths: map[string]map[string]string{
driveID1: {
"root": rootFolderPath1,
"folder": folderPath1,
},
},
expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}),
doNotMergeItems: map[string]bool{
rootFolderPath1: true,
folderPath1: true,
},
},
{
name: "OneDrive_TwoItemPages_WithReset",
drives: []models.Driveable{drive1},
enumerator: mock.EnumerateItemsDeltaByDrive{
DrivePagers: map[string]*mock.DriveItemsDeltaPager{
driveID1: {
Pages: []mock.NextPage{
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{},
Reset: true,
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false),
},
},
}, },
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true, },
},
},
canUsePreviousBackup: true,
errCheck: assert.NoError,
prevFolderPaths: map[string]map[string]string{
driveID1: {},
},
expectedCollections: map[string]map[data.CollectionState][]string{
rootFolderPath1: {data.NewState: {}},
folderPath1: {data.NewState: {"folder", "file", "file2"}},
},
expectedDeltaURLs: map[string]string{
driveID1: delta,
},
expectedFolderPaths: map[string]map[string]string{
driveID1: {
"root": rootFolderPath1,
"folder": folderPath1,
},
},
expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}),
doNotMergeItems: map[string]bool{
rootFolderPath1: true,
folderPath1: true,
},
},
{
name: "OneDrive_TwoItemPages_WithResetCombinedWithItems",
drives: []models.Driveable{drive1},
enumerator: mock.EnumerateItemsDeltaByDrive{
DrivePagers: map[string]*mock.DriveItemsDeltaPager{
driveID1: {
Pages: []mock.NextPage{
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
Reset: true,
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false),
},
},
},
DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
}, },
}, },
}, },
@ -1498,27 +1642,23 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
drive1, drive1,
drive2, drive2,
}, },
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
}, }}},
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, driveID2: {
driveID2: { Pages: []mock.NextPage{{Items: []models.DriveItemable{
{
Values: []models.DriveItemable{
driveRootItem("root2"), driveRootItem("root2"),
driveItem("folder2", "folder", driveBasePath2, "root2", false, true, false), driveItem("folder2", "folder", driveBasePath2, "root2", false, true, false),
driveItem("file2", "file", driveBasePath2+"/folder", "folder2", true, false, false), driveItem("file2", "file", driveBasePath2+"/folder", "folder2", true, false, false),
}, }}},
DeltaLink: &delta2, DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -1562,27 +1702,23 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
drive1, drive1,
drive2, drive2,
}, },
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
}, }}},
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, driveID2: {
driveID2: { Pages: []mock.NextPage{{Items: []models.DriveItemable{
{
Values: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath2, "root", false, true, false), driveItem("folder", "folder", driveBasePath2, "root", false, true, false),
driveItem("file2", "file", driveBasePath2+"/folder", "folder", true, false, false), driveItem("file2", "file", driveBasePath2+"/folder", "folder", true, false, false),
}, }}},
DeltaLink: &delta2, DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -1623,10 +1759,12 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_Errors", name: "OneDrive_OneItemPage_Errors",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Err: assert.AnError, Pages: []mock.NextPage{{Items: []models.DriveItemable{}}},
DeltaUpdate: api.DeltaUpdate{},
Err: assert.AnError,
}, },
}, },
}, },
@ -1641,24 +1779,25 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
expectedDelList: nil, expectedDelList: nil,
}, },
{ {
name: "OneDrive_TwoItemPage_NoDeltaError", name: "OneDrive_OneItemPage_InvalidPrevDelta_DeleteNonExistentFolder",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("file", "file", driveBasePath1, "root", true, false, false), Items: []models.DriveItemable{},
Reset: true,
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder2", "folder2", true, false, false),
},
},
}, },
NextLink: &next, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
},
{
Values: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
DeltaLink: &delta,
}, },
}, },
}, },
@ -1666,40 +1805,51 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
errCheck: assert.NoError, errCheck: assert.NoError,
prevFolderPaths: map[string]map[string]string{ prevFolderPaths: map[string]map[string]string{
driveID1: { driveID1: {
"root": rootFolderPath1, "root": rootFolderPath1,
"folder": folderPath1,
}, },
}, },
expectedCollections: map[string]map[data.CollectionState][]string{ expectedCollections: map[string]map[data.CollectionState][]string{
rootFolderPath1: {data.NotMovedState: {"file"}}, rootFolderPath1: {data.NewState: {}},
expectedPath1("/folder"): {data.NewState: {"folder", "file2"}}, expectedPath1("/folder"): {data.DeletedState: {}},
expectedPath1("/folder2"): {data.NewState: {"folder2", "file"}},
}, },
expectedDeltaURLs: map[string]string{ expectedDeltaURLs: map[string]string{
driveID1: delta, driveID1: delta,
}, },
expectedFolderPaths: map[string]map[string]string{ expectedFolderPaths: map[string]map[string]string{
driveID1: { driveID1: {
"root": rootFolderPath1, "root": rootFolderPath1,
"folder": folderPath1, "folder2": expectedPath1("/folder2"),
}, },
}, },
expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{ expectedDelList: pmMock.NewPrefixMap(map[string]map[string]struct{}{}),
rootFolderPath1: getDelList("file", "file2"), doNotMergeItems: map[string]bool{
}), rootFolderPath1: true,
doNotMergeItems: map[string]bool{}, folderPath1: true,
expectedPath1("/folder2"): true,
},
}, },
{ {
name: "OneDrive_OneItemPage_InvalidPrevDelta_DeleteNonExistentFolder", name: "OneDrive_OneItemPage_InvalidPrevDeltaCombinedWithItems_DeleteNonExistentFolder",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), Items: []models.DriveItemable{},
driveItem("file", "file", driveBasePath1+"/folder2", "folder2", true, false, false), Reset: true,
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder2", "folder2", true, false, false),
},
},
}, },
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -1735,16 +1885,34 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive_OneItemPage_InvalidPrevDelta_AnotherFolderAtDeletedLocation", name: "OneDrive_OneItemPage_InvalidPrevDelta_AnotherFolderAtDeletedLocation",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder2", "folder", driveBasePath1, "root", false, true, false), // on the first page, if this is the total data, we'd expect both folder and folder2
driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false), // since new previousPaths merge with the old previousPaths.
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder2", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false),
},
},
{
Items: []models.DriveItemable{},
Reset: true,
},
{
// but after a delta reset, we treat this as the total end set of folders, which means
// we don't expect folder to exist any longer.
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder2", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder2", true, false, false),
},
},
}, },
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -1783,26 +1951,28 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "OneDrive Two Item Pages with Malware", name: "OneDrive Two Item Pages with Malware",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), Items: []models.DriveItemable{
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveRootItem("root"),
malwareItem("malware", "malware", driveBasePath1+"/folder", "folder", true, false, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
malwareItem("malware", "malware", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false),
malwareItem("malware2", "malware2", driveBasePath1+"/folder", "folder", true, false, false),
},
},
}, },
NextLink: &next, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
},
{
Values: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder", "folder", true, false, false),
malwareItem("malware2", "malware2", driveBasePath1+"/folder", "folder", true, false, false),
},
DeltaLink: &delta,
ResetDelta: true,
}, },
}, },
}, },
@ -1832,28 +2002,35 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
expectedSkippedCount: 2, expectedSkippedCount: 2,
}, },
{ {
name: "One Drive Deleted Folder In New Results", name: "One Drive Deleted Folder In New Results With Invalid Delta",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), Items: []models.DriveItemable{
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveRootItem("root"),
driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false), driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder2", "folder2", true, false, false), driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
driveItem("folder2", "folder2", driveBasePath1, "root", false, true, false),
driveItem("file2", "file2", driveBasePath1+"/folder2", "folder2", true, false, false),
},
},
{
Reset: true,
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
delItem("folder2", driveBasePath1, "root", false, true, false),
delItem("file2", driveBasePath1, "root", true, false, false),
},
},
}, },
NextLink: &next, DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true},
},
{
Values: []models.DriveItemable{
driveRootItem("root"),
delItem("folder2", driveBasePath1, "root", false, true, false),
delItem("file2", driveBasePath1, "root", true, false, false),
},
DeltaLink: &delta2,
ResetDelta: true,
}, },
}, },
}, },
@ -1888,17 +2065,19 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
}, },
}, },
{ {
name: "One Drive Random Folder Delete", name: "One Drive Folder Delete After Invalid Delta",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{
driveRootItem("root"), Items: []models.DriveItemable{
delItem("folder", driveBasePath1, "root", false, true, false), driveRootItem("root"),
}, delItem("folder", driveBasePath1, "root", false, true, false),
DeltaLink: &delta, },
ResetDelta: true, Reset: true,
}},
DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
}, },
}, },
}, },
@ -1929,17 +2108,21 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
}, },
}, },
{ {
name: "One Drive Random Item Delete", name: "One Drive Item Delete After Invalid Delta",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
delItem("file", driveBasePath1, "root", true, false, false), Items: []models.DriveItemable{
driveRootItem("root"),
delItem("file", driveBasePath1, "root", true, false, false),
},
Reset: true,
},
}, },
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -1969,24 +2152,26 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "One Drive Folder Made And Deleted", name: "One Drive Folder Made And Deleted",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), Items: []models.DriveItemable{
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
delItem("folder", driveBasePath1, "root", false, true, false),
delItem("file", driveBasePath1, "root", true, false, false),
},
},
}, },
NextLink: &next, DeltaUpdate: api.DeltaUpdate{URL: delta2, Reset: true},
},
{
Values: []models.DriveItemable{
driveRootItem("root"),
delItem("folder", driveBasePath1, "root", false, true, false),
delItem("file", driveBasePath1, "root", true, false, false),
},
DeltaLink: &delta2,
ResetDelta: true,
}, },
}, },
}, },
@ -2014,23 +2199,25 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "One Drive Item Made And Deleted", name: "One Drive Item Made And Deleted",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{
driveRootItem("root"), {
driveItem("folder", "folder", driveBasePath1, "root", false, true, false), Items: []models.DriveItemable{
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false), driveRootItem("root"),
driveItem("folder", "folder", driveBasePath1, "root", false, true, false),
driveItem("file", "file", driveBasePath1+"/folder", "folder", true, false, false),
},
},
{
Items: []models.DriveItemable{
driveRootItem("root"),
delItem("file", driveBasePath1, "root", true, false, false),
},
},
}, },
NextLink: &next, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
},
{
Values: []models.DriveItemable{
driveRootItem("root"),
delItem("file", driveBasePath1, "root", true, false, false),
},
DeltaLink: &delta,
ResetDelta: true,
}, },
}, },
}, },
@ -2061,15 +2248,14 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "One Drive Random Folder Delete", name: "One Drive Random Folder Delete",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
delItem("folder", driveBasePath1, "root", false, true, false), delItem("folder", driveBasePath1, "root", false, true, false),
}, }}},
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -2097,15 +2283,14 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "One Drive Random Item Delete", name: "One Drive Random Item Delete",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), driveRootItem("root"),
delItem("file", driveBasePath1, "root", true, false, false), delItem("file", driveBasePath1, "root", true, false, false),
}, }}},
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta, Reset: true},
ResetDelta: true,
}, },
}, },
}, },
@ -2133,13 +2318,13 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
{ {
name: "TwoPriorDrives_OneTombstoned", name: "TwoPriorDrives_OneTombstoned",
drives: []models.Driveable{drive1}, drives: []models.Driveable{drive1},
items: map[string][]apiMock.PagerResult[models.DriveItemable]{ enumerator: mock.EnumerateItemsDeltaByDrive{
driveID1: { DrivePagers: map[string]*mock.DriveItemsDeltaPager{
{ driveID1: {
Values: []models.DriveItemable{ Pages: []mock.NextPage{{Items: []models.DriveItemable{
driveRootItem("root"), // will be present driveRootItem("root"), // will be present
}, }}},
DeltaLink: &delta, DeltaUpdate: api.DeltaUpdate{URL: delta},
}, },
}, },
}, },
@ -2176,18 +2361,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
}, },
} }
itemPagers := map[string]api.DeltaPager[models.DriveItemable]{}
for driveID := range test.items {
itemPagers[driveID] = &apiMock.DeltaPager[models.DriveItemable]{
ToReturn: test.items[driveID],
}
}
mbh := mock.DefaultOneDriveBH("a-user") mbh := mock.DefaultOneDriveBH("a-user")
mbh.DrivePagerV = mockDrivePager mbh.DrivePagerV = mockDrivePager
mbh.ItemPagerV = itemPagers mbh.DriveItemEnumeration = test.enumerator
mbh.DriveItemEnumeration = mock.PagerResultToEDID(test.items)
c := NewCollections( c := NewCollections(
mbh, mbh,
@ -2262,10 +2438,10 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
collectionCount++ collectionCount++
// TODO(ashmrtn): We should really be getting items in the collection // TODO: We should really be getting items in the collection
// via the Items() channel, but we don't have a way to mock out the // via the Items() channel. The lack of that makes this check a bit more
// actual item fetch yet (mostly wiring issues). The lack of that makes // bittle since internal details can change. The wiring to support
// this check a bit more bittle since internal details can change. // mocked GetItems is available. We just haven't plugged it in yet.
col, ok := baseCol.(*Collection) col, ok := baseCol.(*Collection)
require.True(t, ok, "getting onedrive.Collection handle") require.True(t, ok, "getting onedrive.Collection handle")

View File

@ -86,12 +86,8 @@ type EnumerateDriveItemsDeltaer interface {
EnumerateDriveItemsDelta( EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, cc api.CallConfig,
) ( ) api.NextPageResulter[models.DriveItemable]
[]models.DriveItemable,
api.DeltaUpdate,
error,
)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -150,8 +150,10 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: assert.AnError, Err: assert.AnError,
}, },
}, },
expectedErr: assert.Error, expectedErr: assert.Error,
expectedResults: nil, // even though we error, the func will return both the
// error and the prior results
expectedResults: resultDrives,
}, },
{ {
name: "MySiteURLNotFound", name: "MySiteURLNotFound",

View File

@ -137,9 +137,9 @@ func (h itemBackupHandler) IncludesDir(dir string) bool {
func (h itemBackupHandler) EnumerateDriveItemsDelta( func (h itemBackupHandler) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, cc api.CallConfig,
) ([]models.DriveItemable, api.DeltaUpdate, error) { ) api.NextPageResulter[models.DriveItemable] {
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, cc)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -140,9 +140,9 @@ func (h libraryBackupHandler) IncludesDir(dir string) bool {
func (h libraryBackupHandler) EnumerateDriveItemsDelta( func (h libraryBackupHandler) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, cc api.CallConfig,
) ([]models.DriveItemable, api.DeltaUpdate, error) { ) api.NextPageResulter[models.DriveItemable] {
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, cc)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -44,10 +44,10 @@ type urlCache struct {
// cacheMu protects idToProps and lastRefreshTime // cacheMu protects idToProps and lastRefreshTime
cacheMu sync.RWMutex cacheMu sync.RWMutex
// refreshMu serializes cache refresh attempts by potential writers // refreshMu serializes cache refresh attempts by potential writers
refreshMu sync.Mutex refreshMu sync.Mutex
deltaQueryCount int refreshCount int
edid EnumerateDriveItemsDeltaer enumerator EnumerateDriveItemsDeltaer
errs *fault.Bus errs *fault.Bus
} }
@ -56,10 +56,10 @@ type urlCache struct {
func newURLCache( func newURLCache(
driveID, prevDelta string, driveID, prevDelta string,
refreshInterval time.Duration, refreshInterval time.Duration,
edid EnumerateDriveItemsDeltaer, enumerator EnumerateDriveItemsDeltaer,
errs *fault.Bus, errs *fault.Bus,
) (*urlCache, error) { ) (*urlCache, error) {
err := validateCacheParams(driveID, refreshInterval, edid) err := validateCacheParams(driveID, refreshInterval, enumerator)
if err != nil { if err != nil {
return nil, clues.Wrap(err, "cache params") return nil, clues.Wrap(err, "cache params")
} }
@ -68,7 +68,7 @@ func newURLCache(
idToProps: make(map[string]itemProps), idToProps: make(map[string]itemProps),
lastRefreshTime: time.Time{}, lastRefreshTime: time.Time{},
driveID: driveID, driveID: driveID,
edid: edid, enumerator: enumerator,
prevDelta: prevDelta, prevDelta: prevDelta,
refreshInterval: refreshInterval, refreshInterval: refreshInterval,
errs: errs, errs: errs,
@ -80,7 +80,7 @@ func newURLCache(
func validateCacheParams( func validateCacheParams(
driveID string, driveID string,
refreshInterval time.Duration, refreshInterval time.Duration,
edid EnumerateDriveItemsDeltaer, enumerator EnumerateDriveItemsDeltaer,
) error { ) error {
if len(driveID) == 0 { if len(driveID) == 0 {
return clues.New("drive id is empty") return clues.New("drive id is empty")
@ -90,8 +90,8 @@ func validateCacheParams(
return clues.New("invalid refresh interval") return clues.New("invalid refresh interval")
} }
if edid == nil { if enumerator == nil {
return clues.New("nil item enumerator") return clues.New("missing item enumerator")
} }
return nil return nil
@ -154,22 +154,31 @@ func (uc *urlCache) refreshCache(
uc.cacheMu.Lock() uc.cacheMu.Lock()
defer uc.cacheMu.Unlock() defer uc.cacheMu.Unlock()
// Issue a delta query to graph
logger.Ctx(ctx).Info("refreshing url cache") logger.Ctx(ctx).Info("refreshing url cache")
uc.refreshCount++
items, du, err := uc.edid.EnumerateDriveItemsDelta( pager := uc.enumerator.EnumerateDriveItemsDelta(
ctx, ctx,
uc.driveID, uc.driveID,
uc.prevDelta, uc.prevDelta,
api.URLCacheDriveItemProps()) api.CallConfig{
if err != nil { Select: api.URLCacheDriveItemProps(),
uc.idToProps = make(map[string]itemProps) })
return clues.Stack(err)
page, reset, done := pager.NextPage()
for ; !done; page, reset, done = pager.NextPage() {
err := uc.updateCache(
ctx,
page,
reset,
uc.errs)
if err != nil {
return clues.Wrap(err, "updating cache")
}
} }
uc.deltaQueryCount++ du, err := pager.Results()
if err != nil {
if err := uc.updateCache(ctx, items, uc.errs); err != nil {
return clues.Stack(err) return clues.Stack(err)
} }
@ -205,10 +214,15 @@ func (uc *urlCache) readCache(
func (uc *urlCache) updateCache( func (uc *urlCache) updateCache(
ctx context.Context, ctx context.Context,
items []models.DriveItemable, items []models.DriveItemable,
reset bool,
errs *fault.Bus, errs *fault.Bus,
) error { ) error {
el := errs.Local() el := errs.Local()
if reset {
uc.idToProps = map[string]itemProps{}
}
for _, item := range items { for _, item := range items {
if el.Failure() != nil { if el.Failure() != nil {
break break

View File

@ -2,6 +2,7 @@ package drive
import ( import (
"errors" "errors"
"fmt"
"io" "io"
"math/rand" "math/rand"
"net/http" "net/http"
@ -11,6 +12,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -91,17 +93,23 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
api.NewDriveItem(newFolderName, true), api.NewDriveItem(newFolderName, true),
control.Copy) control.Copy)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, newFolder.GetId()) require.NotNil(t, newFolder.GetId())
nfid := ptr.Val(newFolder.GetId()) nfid := ptr.Val(newFolder.GetId())
// Get the previous delta to feed into url cache // Get the previous delta to feed into url cache
_, du, err := ac.EnumerateDriveItemsDelta( pager := ac.EnumerateDriveItemsDelta(
ctx, ctx,
suite.driveID, suite.driveID,
"", "",
api.URLCacheDriveItemProps()) api.CallConfig{
Select: api.URLCacheDriveItemProps(),
})
// normally we'd page through all the pager.NextPage
// enumerations first. But Results should make sure
// that we don't need to drain lower-level communication first.
du, err := pager.Results()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, du.URL) require.NotEmpty(t, du.URL)
@ -175,7 +183,7 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
wg.Wait() wg.Wait()
// Validate that exactly 1 delta query was made by url cache // Validate that exactly 1 delta query was made by url cache
require.Equal(t, 1, uc.deltaQueryCount) require.Equal(t, 1, uc.refreshCount)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -196,16 +204,18 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
table := []struct { table := []struct {
name string name string
pagerItems map[string][]models.DriveItemable pages []mock.NextPage
pagerErr map[string]error pagerErr error
expectedItemProps map[string]itemProps expectedItemProps map[string]itemProps
expectedErr require.ErrorAssertionFunc expectErr assert.ErrorAssertionFunc
cacheAssert func(*urlCache, time.Time) expect func(*testing.T, *urlCache, time.Time)
}{ }{
{ {
name: "single item in cache", name: "single item in cache",
pagerItems: map[string][]models.DriveItemable{ pages: []mock.NextPage{
driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
}},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": { "1": {
@ -213,22 +223,122 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.refreshCount)
require.Equal(suite.T(), 1, len(uc.idToProps)) assert.Equal(t, 1, len(uc.idToProps))
}, },
}, },
{ {
name: "multiple items in cache", name: "multiple items in cache",
pagerItems: map[string][]models.DriveItemable{ pages: []mock.NextPage{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
fileItem("4", "file4", "root", "root", "https://dummy4.com", false), fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false), fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
}},
},
expectedItemProps: map[string]itemProps{
"1": {
downloadURL: "https://dummy1.com",
isDeleted: false,
},
"2": {
downloadURL: "https://dummy2.com",
isDeleted: false,
},
"3": {
downloadURL: "https://dummy3.com",
isDeleted: false,
},
"4": {
downloadURL: "https://dummy4.com",
isDeleted: false,
},
"5": {
downloadURL: "https://dummy5.com",
isDeleted: false,
},
},
expectErr: assert.NoError,
expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
assert.Greater(t, uc.lastRefreshTime, startTime)
assert.Equal(t, 1, uc.refreshCount)
assert.Equal(t, 5, len(uc.idToProps))
},
},
{
name: "multiple pages",
pages: []mock.NextPage{
{Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
}},
{Items: []models.DriveItemable{
fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
}},
},
expectedItemProps: map[string]itemProps{
"1": {
downloadURL: "https://dummy1.com",
isDeleted: false,
},
"2": {
downloadURL: "https://dummy2.com",
isDeleted: false,
},
"3": {
downloadURL: "https://dummy3.com",
isDeleted: false,
},
"4": {
downloadURL: "https://dummy4.com",
isDeleted: false,
},
"5": {
downloadURL: "https://dummy5.com",
isDeleted: false,
},
},
expectErr: assert.NoError,
expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
assert.Greater(t, uc.lastRefreshTime, startTime)
assert.Equal(t, 1, uc.refreshCount)
assert.Equal(t, 5, len(uc.idToProps))
},
},
{
name: "multiple pages with resets",
pages: []mock.NextPage{
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
},
{
Items: []models.DriveItemable{},
Reset: true,
},
{
Items: []models.DriveItemable{
fileItem("0", "file1", "root", "root", "https://dummy0.com", false),
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
},
{
Items: []models.DriveItemable{
fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
},
}, },
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
@ -253,23 +363,77 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.refreshCount)
require.Equal(suite.T(), 5, len(uc.idToProps)) assert.Equal(t, 6, len(uc.idToProps))
},
},
{
name: "multiple pages with resets and combo reset+items in page",
pages: []mock.NextPage{
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
},
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
Reset: true,
},
{
Items: []models.DriveItemable{
fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
},
},
},
expectedItemProps: map[string]itemProps{
"1": {
downloadURL: "https://dummy1.com",
isDeleted: false,
},
"2": {
downloadURL: "https://dummy2.com",
isDeleted: false,
},
"3": {
downloadURL: "https://dummy3.com",
isDeleted: false,
},
"4": {
downloadURL: "https://dummy4.com",
isDeleted: false,
},
"5": {
downloadURL: "https://dummy5.com",
isDeleted: false,
},
},
expectErr: assert.NoError,
expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
assert.Greater(t, uc.lastRefreshTime, startTime)
assert.Equal(t, 1, uc.refreshCount)
assert.Equal(t, 5, len(uc.idToProps))
}, },
}, },
{ {
name: "duplicate items with potentially new urls", name: "duplicate items with potentially new urls",
pagerItems: map[string][]models.DriveItemable{ pages: []mock.NextPage{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
fileItem("1", "file1", "root", "root", "https://test1.com", false), fileItem("1", "file1", "root", "root", "https://test1.com", false),
fileItem("2", "file2", "root", "root", "https://test2.com", false), fileItem("2", "file2", "root", "root", "https://test2.com", false),
}, }},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": { "1": {
@ -285,21 +449,21 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.refreshCount)
require.Equal(suite.T(), 3, len(uc.idToProps)) assert.Equal(t, 3, len(uc.idToProps))
}, },
}, },
{ {
name: "deleted items", name: "deleted items",
pagerItems: map[string][]models.DriveItemable{ pages: []mock.NextPage{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("1", "file1", "root", "root", "https://dummy1.com", true), fileItem("1", "file1", "root", "root", "https://dummy1.com", true),
}, }},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": { "1": {
@ -311,111 +475,120 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.refreshCount)
require.Equal(suite.T(), 2, len(uc.idToProps)) assert.Equal(t, 2, len(uc.idToProps))
}, },
}, },
{ {
name: "item not found in cache", name: "item not found in cache",
pagerItems: map[string][]models.DriveItemable{ pages: []mock.NextPage{
driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
}},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"2": {}, "2": {},
}, },
expectedErr: require.Error, expectErr: assert.Error,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.refreshCount)
require.Equal(suite.T(), 1, len(uc.idToProps)) assert.Equal(t, 1, len(uc.idToProps))
}, },
}, },
{ {
name: "delta query error", name: "delta query error",
pagerItems: map[string][]models.DriveItemable{}, pages: []mock.NextPage{
pagerErr: map[string]error{ {Items: []models.DriveItemable{}},
driveID: errors.New("delta query error"),
}, },
pagerErr: errors.New("delta query error"),
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": {}, "1": {},
"2": {}, "2": {},
}, },
expectedErr: require.Error, expectErr: assert.Error,
cacheAssert: func(uc *urlCache, _ time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Equal(suite.T(), time.Time{}, uc.lastRefreshTime) assert.Equal(t, time.Time{}, uc.lastRefreshTime)
require.Equal(suite.T(), 0, uc.deltaQueryCount) assert.NotZero(t, uc.refreshCount)
require.Equal(suite.T(), 0, len(uc.idToProps)) assert.Equal(t, 0, len(uc.idToProps))
}, },
}, },
{ {
name: "folder item", name: "folder item",
pagerItems: map[string][]models.DriveItemable{ pages: []mock.NextPage{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
driveItem("2", "folder2", "root", "root", false, true, false), driveItem("2", "folder2", "root", "root", false, true, false),
}, }},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"2": {}, "2": {},
}, },
expectedErr: require.Error, expectErr: assert.Error,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.refreshCount)
require.Equal(suite.T(), 1, len(uc.idToProps)) assert.Equal(t, 1, len(uc.idToProps))
}, },
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() for _, numConcurrentReqs := range []int{1, 2, 32} {
ctx, flush := tester.NewContext(t) crTestName := fmt.Sprintf("%d_concurrent_reqs", numConcurrentReqs)
defer flush() suite.Run(crTestName, func() {
t := suite.T()
medi := mock.EnumeratesDriveItemsDelta{ ctx, flush := tester.NewContext(t)
Items: test.pagerItems, defer flush()
Err: test.pagerErr,
DeltaUpdate: map[string]api.DeltaUpdate{driveID: {URL: deltaString}},
}
cache, err := newURLCache( medi := mock.EnumerateItemsDeltaByDrive{
driveID, DrivePagers: map[string]*mock.DriveItemsDeltaPager{
"", driveID: {
1*time.Hour, Pages: test.pages,
&medi, Err: test.pagerErr,
fault.New(true)) DeltaUpdate: api.DeltaUpdate{URL: deltaString},
},
require.NoError(suite.T(), err, clues.ToCore(err)) },
numConcurrentReq := 100
var wg sync.WaitGroup
wg.Add(numConcurrentReq)
startTime := time.Now()
for i := 0; i < numConcurrentReq; i++ {
go func() {
defer wg.Done()
for id, expected := range test.expectedItemProps {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
props, err := cache.getItemProperties(ctx, id)
test.expectedErr(suite.T(), err, clues.ToCore(err))
require.Equal(suite.T(), expected, props)
} }
}()
cache, err := newURLCache(
driveID,
"",
1*time.Hour,
&medi,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
var wg sync.WaitGroup
wg.Add(numConcurrentReqs)
startTime := time.Now()
for i := 0; i < numConcurrentReqs; i++ {
go func(ti int) {
defer wg.Done()
for id, expected := range test.expectedItemProps {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
props, err := cache.getItemProperties(ctx, id)
test.expectErr(t, err, clues.ToCore(err))
assert.Equal(t, expected, props)
}
}(i)
}
wg.Wait()
test.expect(t, cache, startTime)
})
} }
wg.Wait()
test.cacheAssert(cache, startTime)
}) })
} }
} }
@ -432,7 +605,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
driveID, driveID,
"", "",
refreshInterval, refreshInterval,
&mock.EnumeratesDriveItemsDelta{}, &mock.EnumerateItemsDeltaByDrive{},
fault.New(true)) fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
@ -456,44 +629,44 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
func (suite *URLCacheUnitSuite) TestNewURLCache() { func (suite *URLCacheUnitSuite) TestNewURLCache() {
table := []struct { table := []struct {
name string name string
driveID string driveID string
refreshInt time.Duration refreshInt time.Duration
itemPager EnumerateDriveItemsDeltaer itemPager EnumerateDriveItemsDeltaer
errors *fault.Bus errors *fault.Bus
expectedErr require.ErrorAssertionFunc expectErr require.ErrorAssertionFunc
}{ }{
{ {
name: "invalid driveID", name: "invalid driveID",
driveID: "", driveID: "",
refreshInt: 1 * time.Hour, refreshInt: 1 * time.Hour,
itemPager: &mock.EnumeratesDriveItemsDelta{}, itemPager: &mock.EnumerateItemsDeltaByDrive{},
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.Error, expectErr: require.Error,
}, },
{ {
name: "invalid refresh interval", name: "invalid refresh interval",
driveID: "drive1", driveID: "drive1",
refreshInt: 100 * time.Millisecond, refreshInt: 100 * time.Millisecond,
itemPager: &mock.EnumeratesDriveItemsDelta{}, itemPager: &mock.EnumerateItemsDeltaByDrive{},
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.Error, expectErr: require.Error,
}, },
{ {
name: "invalid item enumerator", name: "invalid item enumerator",
driveID: "drive1", driveID: "drive1",
refreshInt: 1 * time.Hour, refreshInt: 1 * time.Hour,
itemPager: nil, itemPager: nil,
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.Error, expectErr: require.Error,
}, },
{ {
name: "valid", name: "valid",
driveID: "drive1", driveID: "drive1",
refreshInt: 1 * time.Hour, refreshInt: 1 * time.Hour,
itemPager: &mock.EnumeratesDriveItemsDelta{}, itemPager: &mock.EnumerateItemsDeltaByDrive{},
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.NoError, expectErr: require.NoError,
}, },
} }
@ -507,7 +680,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
test.itemPager, test.itemPager,
test.errors) test.errors)
test.expectedErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
}) })
} }
} }

View File

@ -9,23 +9,25 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/ptr"
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
apiMock "github.com/alcionai/corso/src/pkg/services/m365/api/mock"
) )
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Backup Handler // Backup Handler
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type BackupHandler struct { type BackupHandler[T any] struct {
ItemInfo details.ItemInfo ItemInfo details.ItemInfo
// FIXME: this is a hacky solution. Better to use an interface
// and plug in the selector scope there.
Sel selectors.Selector
DriveItemEnumeration EnumeratesDriveItemsDelta DriveItemEnumeration EnumerateItemsDeltaByDrive
GI GetsItem GI GetsItem
GIP GetsItemPermission GIP GetsItemPermission
@ -54,13 +56,17 @@ type BackupHandler struct {
GetErrs []error GetErrs []error
} }
func DefaultOneDriveBH(resourceOwner string) *BackupHandler { func DefaultOneDriveBH(resourceOwner string) *BackupHandler[models.DriveItemable] {
return &BackupHandler{ sel := selectors.NewOneDriveBackup([]string{resourceOwner})
sel.Include(sel.AllData())
return &BackupHandler[models.DriveItemable]{
ItemInfo: details.ItemInfo{ ItemInfo: details.ItemInfo{
OneDrive: &details.OneDriveInfo{}, OneDrive: &details.OneDriveInfo{},
Extension: &details.ExtensionData{}, Extension: &details.ExtensionData{},
}, },
DriveItemEnumeration: EnumeratesDriveItemsDelta{}, Sel: sel.Selector,
DriveItemEnumeration: EnumerateItemsDeltaByDrive{},
GI: GetsItem{Err: clues.New("not defined")}, GI: GetsItem{Err: clues.New("not defined")},
GIP: GetsItemPermission{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")},
PathPrefixFn: defaultOneDrivePathPrefixer, PathPrefixFn: defaultOneDrivePathPrefixer,
@ -75,12 +81,16 @@ func DefaultOneDriveBH(resourceOwner string) *BackupHandler {
} }
} }
func DefaultSharePointBH(resourceOwner string) *BackupHandler { func DefaultSharePointBH(resourceOwner string) *BackupHandler[models.DriveItemable] {
return &BackupHandler{ sel := selectors.NewOneDriveBackup([]string{resourceOwner})
sel.Include(sel.AllData())
return &BackupHandler[models.DriveItemable]{
ItemInfo: details.ItemInfo{ ItemInfo: details.ItemInfo{
SharePoint: &details.SharePointInfo{}, SharePoint: &details.SharePointInfo{},
Extension: &details.ExtensionData{}, Extension: &details.ExtensionData{},
}, },
Sel: sel.Selector,
GI: GetsItem{Err: clues.New("not defined")}, GI: GetsItem{Err: clues.New("not defined")},
GIP: GetsItemPermission{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")},
PathPrefixFn: defaultSharePointPathPrefixer, PathPrefixFn: defaultSharePointPathPrefixer,
@ -95,7 +105,7 @@ func DefaultSharePointBH(resourceOwner string) *BackupHandler {
} }
} }
func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) { func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) {
pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID) pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -104,7 +114,7 @@ func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) {
return pp, h.PathPrefixErr return pp, h.PathPrefixErr
} }
func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) { func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) {
pp, err := h.MetadataPathPrefixFn(tID, h.ProtectedResource.ID()) pp, err := h.MetadataPathPrefixFn(tID, h.ProtectedResource.ID())
if err != nil { if err != nil {
return nil, err return nil, err
@ -113,7 +123,7 @@ func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) {
return pp, h.PathPrefixErr return pp, h.PathPrefixErr
} }
func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) { func (h BackupHandler[T]) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) {
cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource.ID()) cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource.ID())
if err != nil { if err != nil {
return nil, err return nil, err
@ -122,23 +132,23 @@ func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, e
return cp, h.CanonPathErr return cp, h.CanonPathErr
} }
func (h BackupHandler) ServiceCat() (path.ServiceType, path.CategoryType) { func (h BackupHandler[T]) ServiceCat() (path.ServiceType, path.CategoryType) {
return h.Service, h.Category return h.Service, h.Category
} }
func (h BackupHandler) NewDrivePager(string, []string) api.Pager[models.Driveable] { func (h BackupHandler[T]) NewDrivePager(string, []string) api.Pager[models.Driveable] {
return h.DrivePagerV return h.DrivePagerV
} }
func (h BackupHandler) FormatDisplayPath(_ string, pb *path.Builder) string { func (h BackupHandler[T]) FormatDisplayPath(_ string, pb *path.Builder) string {
return "/" + pb.String() return "/" + pb.String()
} }
func (h BackupHandler) NewLocationIDer(driveID string, elems ...string) details.LocationIDer { func (h BackupHandler[T]) NewLocationIDer(driveID string, elems ...string) details.LocationIDer {
return h.LocationIDFn(driveID, elems...) return h.LocationIDFn(driveID, elems...)
} }
func (h BackupHandler) AugmentItemInfo( func (h BackupHandler[T]) AugmentItemInfo(
details.ItemInfo, details.ItemInfo,
idname.Provider, idname.Provider,
models.DriveItemable, models.DriveItemable,
@ -148,7 +158,7 @@ func (h BackupHandler) AugmentItemInfo(
return h.ItemInfo return h.ItemInfo
} }
func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.Response, error) { func (h *BackupHandler[T]) Get(context.Context, string, map[string]string) (*http.Response, error) {
c := h.getCall c := h.getCall
h.getCall++ h.getCall++
@ -160,23 +170,23 @@ func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.R
return h.GetResps[c], h.GetErrs[c] return h.GetResps[c], h.GetErrs[c]
} }
func (h BackupHandler) EnumerateDriveItemsDelta( func (h BackupHandler[T]) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, cc api.CallConfig,
) ([]models.DriveItemable, api.DeltaUpdate, error) { ) api.NextPageResulter[models.DriveItemable] {
return h.DriveItemEnumeration.EnumerateDriveItemsDelta( return h.DriveItemEnumeration.EnumerateDriveItemsDelta(
ctx, ctx,
driveID, driveID,
prevDeltaLink, prevDeltaLink,
selectProps) cc)
} }
func (h BackupHandler) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) { func (h BackupHandler[T]) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) {
return h.GI.GetItem(ctx, "", "") return h.GI.GetItem(ctx, "", "")
} }
func (h BackupHandler) GetItemPermission( func (h BackupHandler[T]) GetItemPermission(
ctx context.Context, ctx context.Context,
_, _ string, _, _ string,
) (models.PermissionCollectionResponseable, error) { ) (models.PermissionCollectionResponseable, error) {
@ -250,12 +260,16 @@ var defaultSharePointLocationIDer = func(driveID string, elems ...string) detail
return details.NewSharePointLocationIDer(driveID, elems...) return details.NewSharePointLocationIDer(driveID, elems...)
} }
func (h BackupHandler) IsAllPass() bool { func (h BackupHandler[T]) IsAllPass() bool {
return true scope := h.Sel.Includes[0]
return selectors.IsAnyTarget(selectors.SharePointScope(scope), selectors.SharePointLibraryFolder) ||
selectors.IsAnyTarget(selectors.OneDriveScope(scope), selectors.OneDriveFolder)
} }
func (h BackupHandler) IncludesDir(string) bool { func (h BackupHandler[T]) IncludesDir(dir string) bool {
return true scope := h.Sel.Includes[0]
return selectors.SharePointScope(scope).Matches(selectors.SharePointLibraryFolder, dir) ||
selectors.OneDriveScope(scope).Matches(selectors.OneDriveFolder, dir)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -278,60 +292,46 @@ func (m GetsItem) GetItem(
// Enumerates Drive Items // Enumerates Drive Items
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type EnumeratesDriveItemsDelta struct { type NextPage struct {
Items map[string][]models.DriveItemable Items []models.DriveItemable
DeltaUpdate map[string]api.DeltaUpdate Reset bool
Err map[string]error
} }
func (edi EnumeratesDriveItemsDelta) EnumerateDriveItemsDelta( type EnumerateItemsDeltaByDrive struct {
DrivePagers map[string]*DriveItemsDeltaPager
}
var _ api.NextPageResulter[models.DriveItemable] = &DriveItemsDeltaPager{}
type DriveItemsDeltaPager struct {
Idx int
Pages []NextPage
DeltaUpdate api.DeltaUpdate
Err error
}
func (edibd EnumerateItemsDeltaByDrive) EnumerateDriveItemsDelta(
_ context.Context, _ context.Context,
driveID, _ string, driveID, _ string,
_ []string, _ api.CallConfig,
) ( ) api.NextPageResulter[models.DriveItemable] {
[]models.DriveItemable, didp := edibd.DrivePagers[driveID]
api.DeltaUpdate, return didp
error,
) {
return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID]
} }
func PagerResultToEDID( func (edi *DriveItemsDeltaPager) NextPage() ([]models.DriveItemable, bool, bool) {
m map[string][]apiMock.PagerResult[models.DriveItemable], if edi.Idx >= len(edi.Pages) {
) EnumeratesDriveItemsDelta { return nil, false, true
edi := EnumeratesDriveItemsDelta{
Items: map[string][]models.DriveItemable{},
DeltaUpdate: map[string]api.DeltaUpdate{},
Err: map[string]error{},
} }
for driveID, results := range m { np := edi.Pages[edi.Idx]
var ( edi.Idx = edi.Idx + 1
err error
items = []models.DriveItemable{}
deltaUpdate api.DeltaUpdate
)
for _, pr := range results { return np.Items, np.Reset, false
items = append(items, pr.Values...) }
if pr.DeltaLink != nil { func (edi *DriveItemsDeltaPager) Results() (api.DeltaUpdate, error) {
deltaUpdate = api.DeltaUpdate{URL: ptr.Val(pr.DeltaLink)} return edi.DeltaUpdate, edi.Err
}
if pr.Err != nil {
err = pr.Err
}
deltaUpdate.Reset = deltaUpdate.Reset || pr.ResetDelta
}
edi.Items[driveID] = items
edi.Err[driveID] = err
edi.DeltaUpdate[driveID] = deltaUpdate
}
return edi
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/m365/collection/drive" "github.com/alcionai/corso/src/internal/m365/collection/drive"
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
"github.com/alcionai/corso/src/internal/m365/service/onedrive/mock"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
@ -90,16 +91,29 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() {
defer flush() defer flush()
var ( var (
paths = map[string]string{} mbh = mock.DefaultSharePointBH(siteID)
currPaths = map[string]string{} du = api.DeltaUpdate{
excluded = map[string]struct{}{} URL: "notempty",
collMap = map[string]map[string]*drive.Collection{ Reset: false,
}
paths = map[string]string{}
excluded = map[string]struct{}{}
collMap = map[string]map[string]*drive.Collection{
driveID: {}, driveID: {},
} }
) )
mbh.DriveItemEnumeration = mock.EnumerateItemsDeltaByDrive{
DrivePagers: map[string]*mock.DriveItemsDeltaPager{
driveID: {
Pages: []mock.NextPage{{Items: test.items}},
DeltaUpdate: du,
},
},
}
c := drive.NewCollections( c := drive.NewCollections(
drive.NewLibraryBackupHandler(api.Drives{}, siteID, test.scope, path.SharePointService), mbh,
tenantID, tenantID,
idname.NewProvider(siteID, siteID), idname.NewProvider(siteID, siteID),
nil, nil,
@ -107,15 +121,13 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() {
c.CollectionMap = collMap c.CollectionMap = collMap
_, err := c.UpdateCollections( _, _, err := c.PopulateDriveCollections(
ctx, ctx,
driveID, driveID,
"General", "General",
test.items,
paths, paths,
currPaths,
excluded, excluded,
true, "",
fault.New(true)) fault.New(true))
test.expect(t, err, clues.ToCore(err)) test.expect(t, err, clues.ToCore(err))

View File

@ -1,9 +1,17 @@
package fault package fault
import ( import (
"context"
"github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/cli/print"
) )
// AddSkipper presents an interface that allows callers to
// write additional skipped items to the complying struct.
type AddSkipper interface {
AddSkip(ctx context.Context, s *Skipped)
}
// skipCause identifies the well-known conditions to Skip an item. It is // skipCause identifies the well-known conditions to Skip an item. It is
// important that skip cause enumerations do not overlap with general error // important that skip cause enumerations do not overlap with general error
// handling. Skips must be well known, well documented, and consistent. // handling. Skips must be well known, well documented, and consistent.

View File

@ -227,7 +227,7 @@ func (c Channels) GetChannelMessageReplies(
ctx context.Context, ctx context.Context,
teamID, channelID, messageID string, teamID, channelID, messageID string,
) ([]models.ChatMessageable, error) { ) ([]models.ChatMessageable, error) {
return enumerateItems[models.ChatMessageable]( return batchEnumerateItems[models.ChatMessageable](
ctx, ctx,
c.NewChannelMessageRepliesPager(teamID, channelID, messageID)) c.NewChannelMessageRepliesPager(teamID, channelID, messageID))
} }
@ -284,5 +284,5 @@ func (c Channels) GetChannels(
ctx context.Context, ctx context.Context,
teamID string, teamID string,
) ([]models.Channelable, error) { ) ([]models.Channelable, error) {
return enumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID)) return batchEnumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID))
} }

View File

@ -136,6 +136,7 @@ func (c Client) Post(
type CallConfig struct { type CallConfig struct {
Expand []string Expand []string
Select []string
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -84,7 +84,7 @@ func (c Contacts) EnumerateContainers(
pgr = c.NewContactFoldersPager(userID, baseContainerID, immutableIDs) pgr = c.NewContactFoldersPager(userID, baseContainerID, immutableIDs)
) )
containers, err := enumerateItems(ctx, pgr) containers, err := batchEnumerateItems(ctx, pgr)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
@ -165,7 +165,7 @@ func (c Contacts) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...) pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts") return nil, graph.Wrap(ctx, err, "enumerating contacts")
} }
@ -186,7 +186,7 @@ func (c Contacts) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewContactsPager(userID, containerID, false, idAnd()...) pager := c.NewContactsPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts") return nil, graph.Wrap(ctx, err, "enumerating contacts")
} }

View File

@ -77,7 +77,7 @@ func (c Drives) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewDriveItemPager(driveID, containerID, idAnd("name")...) pager := c.NewDriveItemPager(driveID, containerID, idAnd("name")...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating drive items") return nil, graph.Wrap(ctx, err, "enumerating drive items")
} }
@ -101,7 +101,7 @@ func (c Drives) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewDriveItemPager(driveID, containerID, idAnd("file", "folder")...) pager := c.NewDriveItemPager(driveID, containerID, idAnd("file", "folder")...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts") return nil, graph.Wrap(ctx, err, "enumerating contacts")
} }
@ -133,7 +133,7 @@ type DriveItemDeltaPageCtrl struct {
func (c Drives) newDriveItemDeltaPager( func (c Drives) newDriveItemDeltaPager(
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps ...string, cc CallConfig,
) *DriveItemDeltaPageCtrl { ) *DriveItemDeltaPageCtrl {
preferHeaderItems := []string{ preferHeaderItems := []string{
"deltashowremovedasdeleted", "deltashowremovedasdeleted",
@ -147,8 +147,8 @@ func (c Drives) newDriveItemDeltaPager(
QueryParameters: &drives.ItemItemsItemDeltaRequestBuilderGetQueryParameters{}, QueryParameters: &drives.ItemItemsItemDeltaRequestBuilderGetQueryParameters{},
} }
if len(selectProps) > 0 { if len(cc.Select) > 0 {
options.QueryParameters.Select = selectProps options.QueryParameters.Select = cc.Select
} }
builder := c.Stable. builder := c.Stable.
@ -197,26 +197,33 @@ func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool {
return true return true
} }
// EnumerateDriveItems will enumerate all items in the specified drive and hand them to the // EnumerateDriveItems will enumerate all items in the specified drive and stream them page
// provided `collector` method // by page, along with the delta update and any errors, to the provided channel.
func (c Drives) EnumerateDriveItemsDelta( func (c Drives) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID string, driveID string,
prevDeltaLink string, prevDeltaLink string,
selectProps []string, cc CallConfig,
) ( ) NextPageResulter[models.DriveItemable] {
[]models.DriveItemable, deltaPager := c.newDriveItemDeltaPager(
DeltaUpdate, driveID,
error, prevDeltaLink,
) { cc)
pager := c.newDriveItemDeltaPager(driveID, prevDeltaLink, selectProps...)
items, du, err := deltaEnumerateItems[models.DriveItemable](ctx, pager, prevDeltaLink) npr := &nextPageResults[models.DriveItemable]{
if err != nil { pages: make(chan nextPage[models.DriveItemable]),
return nil, du, clues.Stack(err)
} }
return items, du, nil // asynchronously enumerate pages on the caller's behalf.
// they only need to consume the pager and call Results at
// the end.
go deltaEnumerateItems[models.DriveItemable](
ctx,
deltaPager,
npr,
prevDeltaLink)
return npr
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -354,11 +361,17 @@ func GetAllDrives(
ctx context.Context, ctx context.Context,
pager Pager[models.Driveable], pager Pager[models.Driveable],
) ([]models.Driveable, error) { ) ([]models.Driveable, error) {
ds, err := enumerateItems(ctx, pager) ds, err := batchEnumerateItems(ctx, pager)
if err != nil && (clues.HasLabel(err, graph.LabelsMysiteNotFound) ||
clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { // no license or drives available.
// return a non-error and let the caller assume an empty result set.
// TODO: is this the best way to handle this?
// what about returning a ResourceNotFound error as is standard elsewhere?
if err != nil &&
(clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) {
logger.CtxErr(ctx, err).Infof("resource owner does not have a drive") logger.CtxErr(ctx, err).Infof("resource owner does not have a drive")
return make([]models.Driveable, 0), nil // no license or drives.
return make([]models.Driveable, 0), nil
} }
return ds, graph.Stack(ctx, err).OrNil() return ds, graph.Stack(ctx, err).OrNil()

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -185,11 +186,29 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
items, du, err := suite.its. items := []models.DriveItemable{}
pager := suite.its.
ac. ac.
Drives(). Drives().
EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps()) EnumerateDriveItemsDelta(
ctx,
suite.its.user.driveID,
"",
api.CallConfig{
Select: api.DefaultDriveItemProps(),
})
page, reset, done := pager.NextPage()
for ; !done; page, reset, done = pager.NextPage() {
items = append(items, page...)
assert.False(t, reset, "should not reset")
}
du, err := pager.Results()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, items, "no items found in user's drive") require.NotEmpty(t, items, "should find items in user's drive")
assert.NotEmpty(t, du.URL, "should have a delta link") assert.NotEmpty(t, du.URL, "should have a delta link")
} }

View File

@ -85,7 +85,7 @@ func (c Events) EnumerateContainers(
pgr = c.NewEventCalendarsPager(userID, immutableIDs) pgr = c.NewEventCalendarsPager(userID, immutableIDs)
) )
containers, err := enumerateItems(ctx, pgr) containers, err := batchEnumerateItems(ctx, pgr)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
@ -169,7 +169,7 @@ func (c Events) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...) pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating events") return nil, graph.Wrap(ctx, err, "enumerating events")
} }
@ -190,7 +190,7 @@ func (c Events) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewEventsPager(userID, containerID, false, idAnd()...) pager := c.NewEventsPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating events") return nil, graph.Wrap(ctx, err, "enumerating events")
} }

View File

@ -27,6 +27,85 @@ type DeltaUpdate struct {
Reset bool Reset bool
} }
type NextPager[T any] interface {
// reset should only true on the iteration where the delta pager's Reset()
// is called. Callers can use it to reset any data aggregation they
// currently use. After that loop it can be false again, though the
// DeltaUpdate will eventually contain the expected reset value.
// Items may or may not be >0 when reset == true. In that case, the
// items should always represent the next page of data following a reset.
// Callers should always handle the reset first, and follow-up with
// item population.
NextPage() (items []T, reset, done bool)
}
type nextPage[T any] struct {
items []T
reset bool
}
type NextPageResulter[T any] interface {
NextPager[T]
Results() (DeltaUpdate, error)
}
var _ NextPageResulter[any] = &nextPageResults[any]{}
type nextPageResults[T any] struct {
pages chan nextPage[T]
du DeltaUpdate
err error
}
func (npr *nextPageResults[T]) writeNextPage(
ctx context.Context,
items []T,
reset bool,
) error {
if npr.pages == nil {
return clues.New("pager already closed")
}
select {
case <-ctx.Done():
return clues.Wrap(context.Canceled, "writing next page")
case npr.pages <- nextPage[T]{
items: items,
reset: reset,
}:
return nil
}
}
func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) {
if npr.pages == nil {
return nil, false, true
}
np, ok := <-npr.pages
return np.items, np.reset, !ok
}
func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) {
if npr.pages != nil {
//nolint:revive
for range npr.pages {
// if the pager hasn't closed yet, drain out the pages iterator
// to avoid leaking routines, and to ensure we get results.
}
}
return npr.du, npr.err
}
func (npr *nextPageResults[T]) close() {
if npr.pages != nil {
close(npr.pages)
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// common interfaces // common interfaces
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -100,7 +179,10 @@ type Pager[T any] interface {
func enumerateItems[T any]( func enumerateItems[T any](
ctx context.Context, ctx context.Context,
pager Pager[T], pager Pager[T],
) ([]T, error) { npr *nextPageResults[T],
) {
defer npr.close()
var ( var (
result = make([]T, 0) result = make([]T, 0)
// stubbed initial value to ensure we enter the loop. // stubbed initial value to ensure we enter the loop.
@ -111,18 +193,44 @@ func enumerateItems[T any](
// get the next page of data, check for standard errors // get the next page of data, check for standard errors
page, err := pager.GetPage(ctx) page, err := pager.GetPage(ctx)
if err != nil { if err != nil {
return nil, graph.Stack(ctx, err) npr.err = graph.Stack(ctx, err)
return
}
if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil {
npr.err = clues.Stack(err)
return
} }
result = append(result, page.GetValue()...)
nextLink = NextLink(page) nextLink = NextLink(page)
pager.SetNextLink(nextLink) pager.SetNextLink(nextLink)
} }
logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result))
}
return result, nil func batchEnumerateItems[T any](
ctx context.Context,
pager Pager[T],
) ([]T, error) {
var (
npr = nextPageResults[T]{
pages: make(chan nextPage[T]),
}
items = []T{}
)
go enumerateItems[T](ctx, pager, &npr)
page, _, done := npr.NextPage()
for ; !done; page, _, done = npr.NextPage() {
items = append(items, page...)
}
_, err := npr.Results()
return items, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -136,11 +244,17 @@ type DeltaPager[T any] interface {
ValidModTimer ValidModTimer
} }
// enumerates pages of items, streaming each page to the provided channel.
// the DeltaUpdate, reset notifications, and any errors are also fed to the
// same channel.
func deltaEnumerateItems[T any]( func deltaEnumerateItems[T any](
ctx context.Context, ctx context.Context,
pager DeltaPager[T], pager DeltaPager[T],
npr *nextPageResults[T],
prevDeltaLink string, prevDeltaLink string,
) ([]T, DeltaUpdate, error) { ) {
defer npr.close()
var ( var (
result = make([]T, 0) result = make([]T, 0)
// stubbed initial value to ensure we enter the loop. // stubbed initial value to ensure we enter the loop.
@ -160,28 +274,47 @@ func deltaEnumerateItems[T any](
page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume)) page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume))
if graph.IsErrDeltaNotSupported(err) { if graph.IsErrDeltaNotSupported(err) {
logger.Ctx(ctx).Infow("delta queries not supported") logger.Ctx(ctx).Infow("delta queries not supported")
return nil, DeltaUpdate{}, clues.Stack(graph.ErrDeltaNotSupported, err)
pager.Reset(ctx)
if err := npr.writeNextPage(ctx, nil, true); err != nil {
npr.err = clues.Stack(err)
return
}
npr.err = clues.Stack(err)
return
} }
if graph.IsErrInvalidDelta(err) { if graph.IsErrInvalidDelta(err) {
logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink) logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink)
invalidPrevDelta = true invalidPrevDelta = true
// Reset limiter consumption since we don't have a valid delta token.
// Set limiter consumption rate to non-delta.
consume = graph.DeltaNoTokenLC consume = graph.DeltaNoTokenLC
result = make([]T, 0)
// Reset tells the pager to try again after ditching its delta history. // Reset tells the pager to try again after ditching its delta history.
pager.Reset(ctx) pager.Reset(ctx)
if err := npr.writeNextPage(ctx, nil, true); err != nil {
npr.err = clues.Stack(err)
return
}
continue continue
} }
if err != nil { if err != nil {
return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "retrieving page") npr.err = clues.Stack(err)
return
} }
result = append(result, page.GetValue()...) if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil {
npr.err = clues.Stack(err)
return
}
nl, deltaLink := NextAndDeltaLink(page) nl, deltaLink := NextAndDeltaLink(page)
if len(deltaLink) > 0 { if len(deltaLink) > 0 {
@ -194,12 +327,38 @@ func deltaEnumerateItems[T any](
logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result)) logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result))
du := DeltaUpdate{ npr.du = DeltaUpdate{
URL: newDeltaLink, URL: newDeltaLink,
Reset: invalidPrevDelta, Reset: invalidPrevDelta,
} }
}
return result, du, nil func batchDeltaEnumerateItems[T any](
ctx context.Context,
pager DeltaPager[T],
prevDeltaLink string,
) ([]T, DeltaUpdate, error) {
var (
npr = nextPageResults[T]{
pages: make(chan nextPage[T]),
}
results = []T{}
)
go deltaEnumerateItems[T](ctx, pager, &npr, prevDeltaLink)
page, reset, done := npr.NextPage()
for ; !done; page, reset, done = npr.NextPage() {
if reset {
results = []T{}
}
results = append(results, page...)
}
du, err := npr.Results()
return results, du, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -217,7 +376,7 @@ func getAddedAndRemovedItemIDs[T any](
aarh addedAndRemovedHandler[T], aarh addedAndRemovedHandler[T],
) (map[string]time.Time, bool, []string, DeltaUpdate, error) { ) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
if canMakeDeltaQueries { if canMakeDeltaQueries {
ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) ts, du, err := batchDeltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink)
if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) { if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) {
return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err)
} }
@ -230,7 +389,7 @@ func getAddedAndRemovedItemIDs[T any](
du := DeltaUpdate{Reset: true} du := DeltaUpdate{Reset: true}
ts, err := enumerateItems(ctx, pager) ts, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err)
} }

View File

@ -245,7 +245,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() {
pageErr: assert.AnError, pageErr: assert.AnError,
} }
}, },
expect: nil, expect: []any{},
expectErr: require.Error, expectErr: require.Error,
}, },
} }
@ -257,7 +257,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
result, err := enumerateItems(ctx, test.getPager(t, ctx)) result, err := batchEnumerateItems(ctx, test.getPager(t, ctx))
test.expectErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
require.EqualValues(t, test.expect, result) require.EqualValues(t, test.expect, result)

View File

@ -81,7 +81,7 @@ func (c Mail) EnumerateContainers(
pgr = c.NewMailFoldersPager(userID, immutableIDs) pgr = c.NewMailFoldersPager(userID, immutableIDs)
) )
containers, err := enumerateItems(ctx, pgr) containers, err := batchEnumerateItems(ctx, pgr)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
@ -162,7 +162,7 @@ func (c Mail) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...) pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating mails") return nil, graph.Wrap(ctx, err, "enumerating mails")
} }
@ -183,7 +183,7 @@ func (c Mail) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewMailPager(userID, containerID, false, idAnd()...) pager := c.NewMailPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating mails") return nil, graph.Wrap(ctx, err, "enumerating mails")
} }