apply channels and streaming to drive collections

Updates drive collection processing with the new pattern of
streaming pages from the api using channels.

This is the last in a multipart update that has been separated
for ease of review.  Everything should now pass.
This commit is contained in:
ryanfkeepers 2023-09-30 10:14:50 -06:00
parent 45aac829dc
commit c805c8f8e5
4 changed files with 776 additions and 494 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -272,13 +273,6 @@ func (c *Collections) Get(
excludedItemIDs = map[string]struct{}{}
oldPrevPaths = oldPrevPathsByDriveID[driveID]
prevDeltaLink = prevDriveIDToDelta[driveID]
// itemCollection is used to identify which collection a
// file belongs to. This is useful to delete a file from the
// collection it was previously in, in case it was moved to a
// different collection within the same delta query
// item ID -> item ID
itemCollection = map[string]string{}
)
delete(driveTombstones, driveID)
@ -295,13 +289,16 @@ func (c *Collections) Get(
"previous metadata for drive",
"num_paths_entries", len(oldPrevPaths))
items, du, err := c.handler.EnumerateDriveItemsDelta(
ictx,
du, newPrevPaths, err := c.PopulateDriveCollections(
ctx,
driveID,
driveName,
oldPrevPaths,
excludedItemIDs,
prevDeltaLink,
api.DefaultDriveItemProps())
errs)
if err != nil {
return nil, false, err
return nil, false, clues.Stack(err)
}
// It's alright to have an empty folders map (i.e. no folders found) but not
@ -313,20 +310,6 @@ func (c *Collections) Get(
driveIDToDeltaLink[driveID] = du.URL
}
newPrevPaths, err := c.UpdateCollections(
ctx,
driveID,
driveName,
items,
oldPrevPaths,
itemCollection,
excludedItemIDs,
du.Reset,
errs)
if err != nil {
return nil, false, clues.Stack(err)
}
// Avoid the edge case where there's no paths but we do have a valid delta
// token. We can accomplish this by adding an empty paths map for this
// drive. If we don't have this then the next backup won't use the delta
@ -688,224 +671,290 @@ func (c *Collections) getCollectionPath(
return collectionPath, nil
}
// UpdateCollections initializes and adds the provided drive items to Collections
// A new collection is created for every drive folder (or package).
// PopulateDriveCollections initializes and adds the provided drive items to Collections
// A new collection is created for every drive folder.
// oldPrevPaths is the unchanged data that was loaded from the metadata file.
// This map is not modified during the call.
// currPrevPaths starts as a copy of oldPaths and is updated as changes are found in
// the returned results. Items are added to this collection throughout the call.
// newPrevPaths, ie: the items added during this call, get returned as a map.
func (c *Collections) UpdateCollections(
func (c *Collections) PopulateDriveCollections(
ctx context.Context,
driveID, driveName string,
items []models.DriveItemable,
oldPrevPaths map[string]string,
currPrevPaths map[string]string,
excluded map[string]struct{},
invalidPrevDelta bool,
excludedItemIDs map[string]struct{},
prevDeltaLink string,
errs *fault.Bus,
) (map[string]string, error) {
) (api.DeltaUpdate, map[string]string, error) {
var (
el = errs.Local()
newPrevPaths = map[string]string{}
el = errs.Local()
newPrevPaths = map[string]string{}
invalidPrevDelta = len(prevDeltaLink) == 0
ch = make(chan api.NextPage[models.DriveItemable], 1)
wg = sync.WaitGroup{}
// currPrevPaths is used to identify which collection a
// file belongs to. This is useful to delete a file from the
// collection it was previously in, in case it was moved to a
// different collection within the same delta query
// item ID -> item ID
currPrevPaths = map[string]string{}
)
if !invalidPrevDelta {
maps.Copy(newPrevPaths, oldPrevPaths)
}
for _, item := range items {
if el.Failure() != nil {
break
}
go func() {
defer wg.Done()
var (
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
isFolder = item.GetFolder() != nil || item.GetPackageEscaped() != nil
ictx = clues.Add(
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_is_folder", isFolder)
)
if item.GetMalware() != nil {
addtl := graph.ItemInfo(item)
skip := fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
if isFolder {
skip = fault.ContainerSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
for pg := range ch {
if el.Failure() != nil {
// exhaust the channel to ensure it closes
continue
}
errs.AddSkip(ctx, skip)
logger.Ctx(ctx).Infow("malware detected", "item_details", addtl)
continue
}
// Deleted file or folder.
if item.GetDeleted() != nil {
if err := c.handleDelete(
itemID,
driveID,
oldPrevPaths,
currPrevPaths,
newPrevPaths,
isFolder,
excluded,
invalidPrevDelta); err != nil {
return nil, clues.Stack(err).WithClues(ictx)
if pg.Reset {
newPrevPaths = map[string]string{}
currPrevPaths = map[string]string{}
c.CollectionMap[driveID] = map[string]*Collection{}
invalidPrevDelta = true
}
continue
}
for _, item := range pg.Items {
if el.Failure() != nil {
continue
}
collectionPath, err := c.getCollectionPath(driveID, item)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err).
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
continue
}
// Skip items that don't match the folder selectors we were given.
if shouldSkip(ctx, collectionPath, c.handler, driveName) {
logger.Ctx(ictx).Debugw("path not selected", "skipped_path", collectionPath.String())
continue
}
switch {
case isFolder:
// Deletions are handled above so this is just moves/renames.
var prevPath path.Path
prevPathStr, ok := oldPrevPaths[itemID]
if ok {
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
err := c.processItem(
ctx,
item,
driveID,
driveName,
oldPrevPaths,
currPrevPaths,
newPrevPaths,
excludedItemIDs,
invalidPrevDelta,
el)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "invalid previous path").
WithClues(ictx).
With("prev_path_string", path.LoggableDir(prevPathStr)))
}
} else if item.GetRoot() != nil {
// Root doesn't move or get renamed.
prevPath = collectionPath
}
// Moved folders don't cause delta results for any subfolders nested in
// them. We need to go through and update paths to handle that. We only
// update newPaths so we don't accidentally clobber previous deletes.
updatePath(newPrevPaths, itemID, collectionPath.String())
found, err := updateCollectionPaths(driveID, itemID, c.CollectionMap, collectionPath)
if err != nil {
return nil, clues.Stack(err).WithClues(ictx)
}
if found {
continue
}
colScope := CollectionScopeFolder
if item.GetPackageEscaped() != nil {
colScope = CollectionScopePackage
}
col, err := NewCollection(
c.handler,
c.protectedResource,
collectionPath,
prevPath,
driveID,
c.statusUpdater,
c.ctrl,
colScope,
invalidPrevDelta,
nil)
if err != nil {
return nil, clues.Stack(err).WithClues(ictx)
}
col.driveName = driveName
c.CollectionMap[driveID][itemID] = col
c.NumContainers++
if item.GetRoot() != nil {
continue
}
// Add an entry to fetch permissions into this collection. This assumes
// that OneDrive always returns all folders on the path of an item
// before the item. This seems to hold true for now at least.
if col.Add(item) {
c.NumItems++
}
case item.GetFile() != nil:
// Deletions are handled above so this is just moves/renames.
if len(ptr.Val(item.GetParentReference().GetId())) == 0 {
return nil, clues.New("file without parent ID").WithClues(ictx)
}
// Get the collection for this item.
parentID := ptr.Val(item.GetParentReference().GetId())
ictx = clues.Add(ictx, "parent_id", parentID)
collection, ok := c.CollectionMap[driveID][parentID]
if !ok {
return nil, clues.New("item seen before parent folder").WithClues(ictx)
}
// This will only kick in if the file was moved multiple times
// within a single delta query. We delete the file from the previous
// collection so that it doesn't appear in two places.
prevParentContainerID, ok := currPrevPaths[itemID]
if ok {
prevColl, found := c.CollectionMap[driveID][prevParentContainerID]
if !found {
return nil, clues.New("previous collection not found").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
if ok := prevColl.Remove(itemID); !ok {
return nil, clues.New("removing item from prev collection").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
el.AddRecoverable(ctx, clues.Stack(err))
}
}
currPrevPaths[itemID] = parentID
if collection.Add(item) {
c.NumItems++
c.NumFiles++
}
// Do this after adding the file to the collection so if we fail to add
// the item to the collection for some reason and we're using best effort
// we don't just end up deleting the item in the resulting backup. The
// resulting backup will be slightly incorrect, but it will have the most
// data that we were able to preserve.
if !invalidPrevDelta {
// Always add a file to the excluded list. The file may have been
// renamed/moved/modified, so we still have to drop the
// original one and download a fresh copy.
excluded[itemID+metadata.DataFileSuffix] = struct{}{}
excluded[itemID+metadata.MetaFileSuffix] = struct{}{}
}
default:
el.AddRecoverable(ictx, clues.New("item is neither folder nor file").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
}
}()
wg.Add(1)
du, err := c.handler.EnumerateDriveItemsDelta(
ctx,
ch,
driveID,
prevDeltaLink)
if err != nil {
return du, nil, clues.Stack(err)
}
return newPrevPaths, el.Failure()
wg.Wait()
return du, newPrevPaths, el.Failure()
}
func (c *Collections) processItem(
ctx context.Context,
item models.DriveItemable,
driveID, driveName string,
oldPrevPaths, currPrevPaths, newPrevPaths map[string]string,
excluded map[string]struct{},
invalidPrevDelta bool,
skipper fault.AddSkipper,
) error {
var (
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
isFolder = item.GetFolder() != nil || item.GetPackageEscaped() != nil
ictx = clues.Add(
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_is_folder", isFolder)
)
if item.GetMalware() != nil {
addtl := graph.ItemInfo(item)
skip := fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
if isFolder {
skip = fault.ContainerSkip(fault.SkipMalware, driveID, itemID, itemName, addtl)
}
skipper.AddSkip(ctx, skip)
logger.Ctx(ctx).Infow("malware detected", "item_details", addtl)
return nil
}
// Deleted file or folder.
if item.GetDeleted() != nil {
err := c.handleDelete(
itemID,
driveID,
oldPrevPaths,
currPrevPaths,
newPrevPaths,
isFolder,
excluded,
invalidPrevDelta)
return clues.Stack(err).WithClues(ictx).OrNil()
}
collectionPath, err := c.getCollectionPath(driveID, item)
if err != nil {
return clues.Stack(err).
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation)
}
// Skip items that don't match the folder selectors we were given.
if shouldSkip(ctx, collectionPath, c.handler, driveName) {
logger.Ctx(ictx).Debugw("path not selected", "skipped_path", collectionPath.String())
return nil
}
switch {
case isFolder:
// Deletions are handled above so this is just moves/renames.
var prevPath path.Path
prevPathStr, ok := oldPrevPaths[itemID]
if ok {
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
if err != nil {
return clues.Wrap(err, "invalid previous path").
WithClues(ictx).
With("prev_path_string", path.LoggableDir(prevPathStr))
}
} else if item.GetRoot() != nil {
// Root doesn't move or get renamed.
prevPath = collectionPath
}
// Moved folders don't cause delta results for any subfolders nested in
// them. We need to go through and update paths to handle that. We only
// update newPaths so we don't accidentally clobber previous deletes.
updatePath(newPrevPaths, itemID, collectionPath.String())
found, err := updateCollectionPaths(
driveID,
itemID,
c.CollectionMap,
collectionPath)
if err != nil {
return clues.Stack(err).WithClues(ictx)
}
if found {
return nil
}
colScope := CollectionScopeFolder
if item.GetPackageEscaped() != nil {
colScope = CollectionScopePackage
}
col, err := NewCollection(
c.handler,
c.protectedResource,
collectionPath,
prevPath,
driveID,
c.statusUpdater,
c.ctrl,
colScope,
invalidPrevDelta,
nil)
if err != nil {
return clues.Stack(err).WithClues(ictx)
}
col.driveName = driveName
c.CollectionMap[driveID][itemID] = col
c.NumContainers++
if item.GetRoot() != nil {
return nil
}
// Add an entry to fetch permissions into this collection. This assumes
// that OneDrive always returns all folders on the path of an item
// before the item. This seems to hold true for now at least.
if col.Add(item) {
c.NumItems++
}
case item.GetFile() != nil:
// Deletions are handled above so this is just moves/renames.
if len(ptr.Val(item.GetParentReference().GetId())) == 0 {
return clues.New("file without parent ID").WithClues(ictx)
}
// Get the collection for this item.
parentID := ptr.Val(item.GetParentReference().GetId())
ictx = clues.Add(ictx, "parent_id", parentID)
collection, ok := c.CollectionMap[driveID][parentID]
if !ok {
return clues.New("item seen before parent folder").WithClues(ictx)
}
// This will only kick in if the file was moved multiple times
// within a single delta query. We delete the file from the previous
// collection so that it doesn't appear in two places.
prevParentContainerID, ok := currPrevPaths[itemID]
if ok {
prevColl, found := c.CollectionMap[driveID][prevParentContainerID]
if !found {
return clues.New("previous collection not found").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
if ok := prevColl.Remove(itemID); !ok {
return clues.New("removing item from prev collection").
With("prev_parent_container_id", prevParentContainerID).
WithClues(ictx)
}
}
currPrevPaths[itemID] = parentID
if collection.Add(item) {
c.NumItems++
c.NumFiles++
}
// Do this after adding the file to the collection so if we fail to add
// the item to the collection for some reason and we're using best effort
// we don't just end up deleting the item in the resulting backup. The
// resulting backup will be slightly incorrect, but it will have the most
// data that we were able to preserve.
if !invalidPrevDelta {
// Always add a file to the excluded list. The file may have been
// renamed/moved/modified, so we still have to drop the
// original one and download a fresh copy.
excluded[itemID+metadata.DataFileSuffix] = struct{}{}
excluded[itemID+metadata.MetaFileSuffix] = struct{}{}
}
default:
return clues.New("item is neither folder nor file").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation)
}
return nil
}
type dirScopeChecker interface {
@ -913,7 +962,12 @@ type dirScopeChecker interface {
IncludesDir(dir string) bool
}
func shouldSkip(ctx context.Context, drivePath path.Path, dsc dirScopeChecker, driveName string) bool {
func shouldSkip(
ctx context.Context,
drivePath path.Path,
dsc dirScopeChecker,
driveName string,
) bool {
return !includePath(ctx, dsc, drivePath) ||
(drivePath.Category() == path.LibrariesCategory && restrictedDirectory == driveName)
}

File diff suppressed because it is too large Load Diff

View File

@ -292,8 +292,8 @@ func (m GetsItem) GetItem(
// Enumerates Drive Items
// ---------------------------------------------------------------------------
type NextPage[T any] struct {
Items []T
type NextPage struct {
Items []models.DriveItemable
Reset bool
}
@ -305,7 +305,7 @@ var _ api.NextPageResulter[models.DriveItemable] = &DriveItemsDeltaPager{}
type DriveItemsDeltaPager struct {
Idx int
Pages []NextPage[models.DriveItemable]
Pages []NextPage
DeltaUpdate api.DeltaUpdate
Err error
}

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/m365/collection/drive"
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
"github.com/alcionai/corso/src/internal/m365/service/onedrive/mock"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
@ -90,16 +91,29 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() {
defer flush()
var (
paths = map[string]string{}
currPaths = map[string]string{}
excluded = map[string]struct{}{}
collMap = map[string]map[string]*drive.Collection{
mbh = mock.DefaultSharePointBH(siteID)
du = api.DeltaUpdate{
URL: "notempty",
Reset: false,
}
paths = map[string]string{}
excluded = map[string]struct{}{}
collMap = map[string]map[string]*drive.Collection{
driveID: {},
}
)
mbh.DriveItemEnumeration = mock.EnumerateItemsDeltaByDrive{
DrivePagers: map[string]mock.DriveItemsDeltaPager{
driveID: mock.DriveItemsDeltaPager{
Pages: []mock.NextPage{{Items: test.items}},
DeltaUpdate: du,
},
},
}
c := drive.NewCollections(
drive.NewLibraryBackupHandler(api.Drives{}, siteID, test.scope, path.SharePointService),
mbh,
tenantID,
idname.NewProvider(siteID, siteID),
nil,
@ -107,15 +121,13 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() {
c.CollectionMap = collMap
_, err := c.UpdateCollections(
_, _, err := c.PopulateDriveCollections(
ctx,
driveID,
"General",
test.items,
paths,
currPaths,
excluded,
true,
"",
fault.New(true))
test.expect(t, err, clues.ToCore(err))