add clues & fault to onedrive collections
This commit is contained in:
parent
c62c246ee2
commit
88b5df728b
@ -157,7 +157,7 @@ func purgeOneDriveFolders(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfs, err := onedrive.GetAllFolders(ctx, gs, pager, prefix)
|
||||
cfs, err := onedrive.GetAllFolders(ctx, gs, pager, prefix, fault.New(true))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -91,7 +91,7 @@ func (gc *GraphConnector) DataCollections(
|
||||
return colls, excludes, nil
|
||||
|
||||
case selectors.ServiceOneDrive:
|
||||
return gc.OneDriveDataCollections(ctx, sels, metadata, ctrlOpts)
|
||||
return gc.OneDriveDataCollections(ctx, sels, metadata, ctrlOpts, errs)
|
||||
|
||||
case selectors.ServiceSharePoint:
|
||||
colls, excludes, err := sharepoint.DataCollections(
|
||||
@ -193,6 +193,7 @@ func (gc *GraphConnector) OneDriveDataCollections(
|
||||
selector selectors.Selector,
|
||||
metadata []data.RestoreCollection,
|
||||
ctrlOpts control.Options,
|
||||
errs *fault.Errors,
|
||||
) ([]data.BackupCollection, map[string]struct{}, error) {
|
||||
odb, err := selector.ToOneDriveBackup()
|
||||
if err != nil {
|
||||
@ -218,7 +219,7 @@ func (gc *GraphConnector) OneDriveDataCollections(
|
||||
gc.Service,
|
||||
gc.UpdateStatus,
|
||||
ctrlOpts,
|
||||
).Get(ctx, metadata)
|
||||
).Get(ctx, metadata, errs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@ -10,10 +10,12 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
@ -82,7 +84,7 @@ type Collection struct {
|
||||
type itemReaderFunc func(
|
||||
hc *http.Client,
|
||||
item models.DriveItemable,
|
||||
) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error)
|
||||
) (details.ItemInfo, io.ReadCloser, error)
|
||||
|
||||
// itemMetaReaderFunc returns a reader for the metadata of the
|
||||
// specified item
|
||||
@ -143,7 +145,7 @@ func (oc *Collection) Items(
|
||||
ctx context.Context,
|
||||
errs *fault.Errors, // TODO: currently unused while onedrive isn't up to date with clues/fault
|
||||
) <-chan data.Stream {
|
||||
go oc.populateItems(ctx)
|
||||
go oc.populateItems(ctx, errs)
|
||||
return oc.data
|
||||
}
|
||||
|
||||
@ -216,23 +218,22 @@ func (od *Item) ModTime() time.Time {
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
// and uses the collection `itemReader` to read the item
|
||||
func (oc *Collection) populateItems(ctx context.Context) {
|
||||
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) {
|
||||
var (
|
||||
errs error
|
||||
byteCount int64
|
||||
itemsRead int64
|
||||
dirsRead int64
|
||||
itemsFound int64
|
||||
dirsFound int64
|
||||
wg sync.WaitGroup
|
||||
m sync.Mutex
|
||||
et = errs.Tracker()
|
||||
)
|
||||
|
||||
// Retrieve the OneDrive folder path to set later in
|
||||
// `details.OneDriveInfo`
|
||||
parentPathString, err := path.GetDriveFolderPath(oc.folderPath)
|
||||
if err != nil {
|
||||
oc.reportAsCompleted(ctx, 0, 0, 0, err)
|
||||
oc.reportAsCompleted(ctx, 0, 0, 0, clues.Wrap(err, "getting drive path").WithClues(ctx))
|
||||
return
|
||||
}
|
||||
|
||||
@ -247,14 +248,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
||||
defer close(semaphoreCh)
|
||||
|
||||
errUpdater := func(id string, err error) {
|
||||
m.Lock()
|
||||
errs = support.WrapAndAppend(id, err, errs)
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
for _, item := range oc.driveItems {
|
||||
if oc.ctrl.FailFast && errs != nil {
|
||||
if et.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
@ -262,22 +257,27 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func(item models.DriveItemable) {
|
||||
go func(ctx context.Context, item models.DriveItemable) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
|
||||
// Read the item
|
||||
var (
|
||||
itemID = *item.GetId()
|
||||
itemName = *item.GetName()
|
||||
itemSize = *item.GetSize()
|
||||
itemID = ptr.Val(item.GetId())
|
||||
itemName = ptr.Val(item.GetName())
|
||||
itemSize = ptr.Val(item.GetSize())
|
||||
itemInfo details.ItemInfo
|
||||
itemMeta io.ReadCloser
|
||||
itemMetaSize int
|
||||
metaSuffix string
|
||||
err error
|
||||
)
|
||||
|
||||
ctx = clues.Add(ctx,
|
||||
"restore_item_id", itemID,
|
||||
"restore_item_name", itemName,
|
||||
"restore_item_size", itemSize,
|
||||
"restore_item_info", itemInfo)
|
||||
|
||||
isFile := item.GetFile() != nil
|
||||
|
||||
if isFile {
|
||||
@ -301,9 +301,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
itemMetaSize = 2
|
||||
} else {
|
||||
itemMeta, itemMetaSize, err = oc.itemMetaReader(ctx, oc.service, oc.driveID, item)
|
||||
|
||||
if err != nil {
|
||||
errUpdater(*item.GetId(), errors.Wrap(err, "failed to get item permissions"))
|
||||
et.Add(clues.Wrap(err, "getting item permissions"))
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -351,7 +350,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
|
||||
// check for errors following retries
|
||||
if err != nil {
|
||||
errUpdater(itemID, err)
|
||||
et.Add(clues.Stack(err).WithClues(ctx))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -361,8 +360,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
itemData,
|
||||
observe.ItemBackupMsg,
|
||||
observe.PII(itemName+dataSuffix),
|
||||
itemSize,
|
||||
)
|
||||
itemSize)
|
||||
go closer()
|
||||
|
||||
return progReader, nil
|
||||
@ -419,15 +417,15 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
atomic.AddInt64(&byteCount, itemSize)
|
||||
|
||||
folderProgress <- struct{}{}
|
||||
}(item)
|
||||
}(ctx, item)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, errs)
|
||||
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, et.Err())
|
||||
}
|
||||
|
||||
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64, errs error) {
|
||||
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64, err error) {
|
||||
close(oc.data)
|
||||
|
||||
status := support.CreateStatus(ctx, support.Backup,
|
||||
@ -437,7 +435,7 @@ func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRe
|
||||
Successes: itemsRead, // items read successfully,
|
||||
TotalBytes: byteCount, // Number of bytes read in the operation,
|
||||
},
|
||||
errs,
|
||||
err,
|
||||
oc.folderPath.Folder(false), // Additional details
|
||||
)
|
||||
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
|
||||
|
||||
@ -13,11 +13,13 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
@ -117,32 +119,40 @@ func NewCollections(
|
||||
func deserializeMetadata(
|
||||
ctx context.Context,
|
||||
cols []data.RestoreCollection,
|
||||
errs *fault.Errors,
|
||||
) (map[string]string, map[string]map[string]string, error) {
|
||||
logger.Ctx(ctx).Infow(
|
||||
"deserialzing previous backup metadata",
|
||||
"num_collections",
|
||||
len(cols),
|
||||
"num_collections", len(cols))
|
||||
|
||||
var (
|
||||
prevDeltas = map[string]string{}
|
||||
prevFolders = map[string]map[string]string{}
|
||||
et = errs.Tracker()
|
||||
)
|
||||
|
||||
prevDeltas := map[string]string{}
|
||||
prevFolders := map[string]map[string]string{}
|
||||
|
||||
for _, col := range cols {
|
||||
items := col.Items(ctx, nil) // TODO: fault.Errors instead of nil
|
||||
if et.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
items := col.Items(ctx, errs)
|
||||
|
||||
for breakLoop := false; !breakLoop; {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, nil, errors.Wrap(ctx.Err(), "deserialzing previous backup metadata")
|
||||
return nil, nil, clues.Wrap(ctx.Err(), "deserialzing previous backup metadata").WithClues(ctx)
|
||||
|
||||
case item, ok := <-items:
|
||||
if !ok {
|
||||
// End of collection items.
|
||||
breakLoop = true
|
||||
break
|
||||
}
|
||||
|
||||
var err error
|
||||
var (
|
||||
err error
|
||||
ictx = clues.Add(ctx, "item_uuid", item.UUID())
|
||||
)
|
||||
|
||||
switch item.UUID() {
|
||||
case graph.PreviousPathFileName:
|
||||
@ -152,11 +162,9 @@ func deserializeMetadata(
|
||||
err = deserializeMap(item.ToReader(), prevDeltas)
|
||||
|
||||
default:
|
||||
logger.Ctx(ctx).Infow(
|
||||
logger.Ctx(ictx).Infow(
|
||||
"skipping unknown metadata file",
|
||||
"file_name",
|
||||
item.UUID(),
|
||||
)
|
||||
"file_name", item.UUID())
|
||||
|
||||
continue
|
||||
}
|
||||
@ -173,20 +181,15 @@ func deserializeMetadata(
|
||||
// we end up in a situation where we're sourcing items from the wrong
|
||||
// base in kopia wrapper.
|
||||
if errors.Is(err, errExistingMapping) {
|
||||
return nil, nil, errors.Wrapf(
|
||||
err,
|
||||
"deserializing metadata file %s",
|
||||
item.UUID(),
|
||||
)
|
||||
return nil, nil, clues.Wrap(err, "deserializing metadata file").WithClues(ictx)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Errorw(
|
||||
"deserializing base backup metadata. Falling back to full backup for selected drives",
|
||||
"error",
|
||||
err,
|
||||
"file_name",
|
||||
item.UUID(),
|
||||
)
|
||||
err = clues.Stack(err).WithClues(ictx)
|
||||
|
||||
et.Add(err)
|
||||
logger.Ctx(ictx).
|
||||
With("err", err).
|
||||
Errorw("deserializing base backup metadata", clues.InErr(err).Slice()...)
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,10 +216,10 @@ func deserializeMetadata(
|
||||
}
|
||||
}
|
||||
|
||||
return prevDeltas, prevFolders, nil
|
||||
return prevDeltas, prevFolders, et.Err()
|
||||
}
|
||||
|
||||
var errExistingMapping = errors.New("mapping already exists for same drive ID")
|
||||
var errExistingMapping = clues.New("mapping already exists for same drive ID")
|
||||
|
||||
// deserializeMap takes an reader and a map of already deserialized items and
|
||||
// adds the newly deserialized items to alreadyFound. Items are only added to
|
||||
@ -242,7 +245,7 @@ func deserializeMap[T any](reader io.ReadCloser, alreadyFound map[string]T) erro
|
||||
}
|
||||
|
||||
if duplicate {
|
||||
return errors.WithStack(errExistingMapping)
|
||||
return clues.Stack(errExistingMapping)
|
||||
}
|
||||
|
||||
maps.Copy(alreadyFound, tmp)
|
||||
@ -255,8 +258,9 @@ func deserializeMap[T any](reader io.ReadCloser, alreadyFound map[string]T) erro
|
||||
func (c *Collections) Get(
|
||||
ctx context.Context,
|
||||
prevMetadata []data.RestoreCollection,
|
||||
errs *fault.Errors,
|
||||
) ([]data.BackupCollection, map[string]struct{}, error) {
|
||||
prevDeltas, oldPathsByDriveID, err := deserializeMetadata(ctx, prevMetadata)
|
||||
prevDeltas, oldPathsByDriveID, err := deserializeMetadata(ctx, prevMetadata, errs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -264,7 +268,7 @@ func (c *Collections) Get(
|
||||
// Enumerate drives for the specified resourceOwner
|
||||
pager, err := c.drivePagerFunc(c.source, c.service, c.resourceOwner, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
retry := c.source == OneDriveSource
|
||||
@ -287,39 +291,34 @@ func (c *Collections) Get(
|
||||
excludedItems = map[string]struct{}{}
|
||||
)
|
||||
|
||||
// Update the collection map with items from each drive
|
||||
for _, d := range drives {
|
||||
driveID := *d.GetId()
|
||||
driveName := *d.GetName()
|
||||
|
||||
prevDelta := prevDeltas[driveID]
|
||||
oldPaths := oldPathsByDriveID[driveID]
|
||||
var (
|
||||
driveID = ptr.Val(d.GetId())
|
||||
driveName = ptr.Val(d.GetName())
|
||||
prevDelta = prevDeltas[driveID]
|
||||
oldPaths = oldPathsByDriveID[driveID]
|
||||
numOldDelta = 0
|
||||
)
|
||||
|
||||
numOldDelta := 0
|
||||
if len(prevDelta) > 0 {
|
||||
numOldDelta++
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infow(
|
||||
"previous metadata for drive",
|
||||
"num_paths_entries",
|
||||
len(oldPaths),
|
||||
"num_deltas_entries",
|
||||
numOldDelta)
|
||||
"num_paths_entries", len(oldPaths),
|
||||
"num_deltas_entries", numOldDelta)
|
||||
|
||||
delta, paths, excluded, err := collectItems(
|
||||
ctx,
|
||||
c.itemPagerFunc(
|
||||
c.service,
|
||||
driveID,
|
||||
"",
|
||||
),
|
||||
c.itemPagerFunc(c.service, driveID, ""),
|
||||
driveID,
|
||||
driveName,
|
||||
c.UpdateCollections,
|
||||
oldPaths,
|
||||
prevDelta,
|
||||
)
|
||||
errs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -343,7 +342,6 @@ func (c *Collections) Get(
|
||||
// token because it thinks the folder paths weren't persisted.
|
||||
folderPaths[driveID] = map[string]string{}
|
||||
maps.Copy(folderPaths[driveID], paths)
|
||||
|
||||
maps.Copy(excludedItems, excluded)
|
||||
|
||||
logger.Ctx(ctx).Infow(
|
||||
@ -372,18 +370,15 @@ func (c *Collections) Get(
|
||||
graph.NewMetadataEntry(graph.PreviousPathFileName, folderPaths),
|
||||
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
|
||||
},
|
||||
c.statusUpdater,
|
||||
)
|
||||
c.statusUpdater)
|
||||
|
||||
if err != nil {
|
||||
// Technically it's safe to continue here because the logic for starting an
|
||||
// incremental backup should eventually find that the metadata files are
|
||||
// empty/missing and default to a full backup.
|
||||
logger.Ctx(ctx).Warnw(
|
||||
"making metadata collection for future incremental backups",
|
||||
"error",
|
||||
err,
|
||||
)
|
||||
logger.Ctx(ctx).
|
||||
With("err", err).
|
||||
Infow("making metadata collection for future incremental backups", clues.InErr(err).Slice()...)
|
||||
} else {
|
||||
collections = append(collections, metadata)
|
||||
}
|
||||
@ -453,8 +448,15 @@ func (c *Collections) UpdateCollections(
|
||||
newPaths map[string]string,
|
||||
excluded map[string]struct{},
|
||||
invalidPrevDelta bool,
|
||||
errs *fault.Errors,
|
||||
) error {
|
||||
et := errs.Tracker()
|
||||
|
||||
for _, item := range items {
|
||||
if et.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
var (
|
||||
prevPath path.Path
|
||||
prevCollectionPath path.Path
|
||||
@ -480,25 +482,29 @@ func (c *Collections) UpdateCollections(
|
||||
continue
|
||||
}
|
||||
|
||||
var (
|
||||
itemID = ptr.Val(item.GetId())
|
||||
ictx = clues.Add(ctx, "update_item_id", itemID)
|
||||
)
|
||||
|
||||
if item.GetParentReference() == nil ||
|
||||
item.GetParentReference().GetId() == nil ||
|
||||
(item.GetDeleted() == nil && item.GetParentReference().GetPath() == nil) {
|
||||
err := clues.New("no parent reference").With("item_id", *item.GetId())
|
||||
if item.GetName() != nil {
|
||||
err = err.With("item_name", *item.GetName())
|
||||
}
|
||||
|
||||
return err
|
||||
et.Add(clues.New("item missing parent reference").
|
||||
WithClues(ictx).
|
||||
With("item_id", itemID, "item_name", ptr.Val(item.GetName())))
|
||||
continue
|
||||
}
|
||||
|
||||
// Create a collection for the parent of this item
|
||||
collectionID := *item.GetParentReference().GetId()
|
||||
collectionID := ptr.Val(item.GetParentReference().GetId())
|
||||
ictx = clues.Add(ictx, "collection_id", collectionID)
|
||||
|
||||
var collectionPathStr string
|
||||
if item.GetDeleted() == nil {
|
||||
collectionPathStr = *item.GetParentReference().GetPath()
|
||||
collectionPathStr = ptr.Val(item.GetParentReference().GetPath())
|
||||
} else {
|
||||
collectionPathStr, ok = oldPaths[*item.GetParentReference().GetId()]
|
||||
collectionPathStr, ok = oldPaths[ptr.Val(item.GetParentReference().GetId())]
|
||||
if !ok {
|
||||
// This collection was created and destroyed in
|
||||
// between the current and previous invocation
|
||||
@ -510,25 +516,26 @@ func (c *Collections) UpdateCollections(
|
||||
collectionPathStr,
|
||||
c.tenant,
|
||||
c.resourceOwner,
|
||||
c.source,
|
||||
)
|
||||
c.source)
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ictx)
|
||||
}
|
||||
|
||||
// Skip items that don't match the folder selectors we were given.
|
||||
if shouldSkipDrive(ctx, collectionPath, c.matcher, driveName) {
|
||||
logger.Ctx(ctx).Infof("Skipping path %s", collectionPath.String())
|
||||
if shouldSkipDrive(ictx, collectionPath, c.matcher, driveName) {
|
||||
logger.Ctx(ictx).Infow("Skipping path", "skipped_path", collectionPath.String())
|
||||
continue
|
||||
}
|
||||
|
||||
switch {
|
||||
case item.GetFolder() != nil, item.GetPackage() != nil:
|
||||
prevPathStr, ok := oldPaths[*item.GetId()]
|
||||
prevPathStr, ok := oldPaths[itemID]
|
||||
if ok {
|
||||
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
|
||||
if err != nil {
|
||||
return clues.Wrap(err, "invalid previous path").With("path_string", prevPathStr)
|
||||
et.Add(clues.Wrap(err, "invalid previous path").
|
||||
WithClues(ictx).
|
||||
With("path_string", prevPathStr))
|
||||
}
|
||||
}
|
||||
|
||||
@ -536,7 +543,7 @@ func (c *Collections) UpdateCollections(
|
||||
// Nested folders also return deleted delta results so we don't have to
|
||||
// worry about doing a prefix search in the map to remove the subtree of
|
||||
// the deleted folder/package.
|
||||
delete(newPaths, *item.GetId())
|
||||
delete(newPaths, itemID)
|
||||
|
||||
if prevPath == nil {
|
||||
// It is possible that an item was created and
|
||||
@ -555,10 +562,9 @@ func (c *Collections) UpdateCollections(
|
||||
c.statusUpdater,
|
||||
c.source,
|
||||
c.ctrl,
|
||||
invalidPrevDelta,
|
||||
)
|
||||
invalidPrevDelta)
|
||||
|
||||
c.CollectionMap[*item.GetId()] = col
|
||||
c.CollectionMap[itemID] = col
|
||||
|
||||
break
|
||||
}
|
||||
@ -568,14 +574,15 @@ func (c *Collections) UpdateCollections(
|
||||
// parentRef or such.
|
||||
folderPath, err := collectionPath.Append(*item.GetName(), false)
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Errorw("failed building collection path", "error", err)
|
||||
return err
|
||||
logger.Ctx(ictx).Errorw("building collection path", "error", err)
|
||||
et.Add(clues.Stack(err).WithClues(ictx))
|
||||
continue
|
||||
}
|
||||
|
||||
// Moved folders don't cause delta results for any subfolders nested in
|
||||
// them. We need to go through and update paths to handle that. We only
|
||||
// update newPaths so we don't accidentally clobber previous deletes.
|
||||
updatePath(newPaths, *item.GetId(), folderPath.String())
|
||||
updatePath(newPaths, itemID, folderPath.String())
|
||||
|
||||
found, err := updateCollectionPaths(*item.GetId(), c.CollectionMap, folderPath)
|
||||
if err != nil {
|
||||
@ -598,7 +605,7 @@ func (c *Collections) UpdateCollections(
|
||||
c.ctrl,
|
||||
invalidPrevDelta,
|
||||
)
|
||||
c.CollectionMap[*item.GetId()] = col
|
||||
c.CollectionMap[itemID] = col
|
||||
c.NumContainers++
|
||||
}
|
||||
}
|
||||
@ -615,7 +622,7 @@ func (c *Collections) UpdateCollections(
|
||||
// deleted, we want to avoid it. If it was
|
||||
// renamed/moved/modified, we still have to drop the
|
||||
// original one and download a fresh copy.
|
||||
excluded[*item.GetId()] = struct{}{}
|
||||
excluded[itemID] = struct{}{}
|
||||
}
|
||||
|
||||
if item.GetDeleted() != nil {
|
||||
@ -679,11 +686,11 @@ func (c *Collections) UpdateCollections(
|
||||
}
|
||||
|
||||
default:
|
||||
return errors.Errorf("item type not supported. item name : %s", *item.GetName())
|
||||
return clues.New("item type not supported").WithClues(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return et.Err()
|
||||
}
|
||||
|
||||
func shouldSkipDrive(ctx context.Context, drivePath path.Path, m folderMatcher, driveName string) bool {
|
||||
@ -705,7 +712,7 @@ func GetCanonicalPath(p, tenant, resourceOwner string, source driveSource) (path
|
||||
case SharePointSource:
|
||||
result, err = pathBuilder.ToDataLayerSharePointPath(tenant, resourceOwner, path.LibrariesCategory, false)
|
||||
default:
|
||||
return nil, errors.Errorf("unrecognized drive data source")
|
||||
return nil, clues.New("unrecognized data source")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -719,7 +726,7 @@ func includePath(ctx context.Context, m folderMatcher, folderPath path.Path) boo
|
||||
// Check if the folder is allowed by the scope.
|
||||
folderPathString, err := path.GetDriveFolderPath(folderPath)
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Error(err)
|
||||
logger.Ctx(ctx).With("err", err).Error("getting drive folder path")
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
@ -702,7 +703,7 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() {
|
||||
outputFolderMap,
|
||||
excludes,
|
||||
false,
|
||||
)
|
||||
fault.New(true))
|
||||
tt.expect(t, err)
|
||||
assert.Equal(t, len(tt.expectedCollectionIDs), len(c.CollectionMap), "total collections")
|
||||
assert.Equal(t, tt.expectedItemCount, c.NumItems, "item count")
|
||||
@ -1058,7 +1059,7 @@ func (suite *OneDriveCollectionsSuite) TestDeserializeMetadata() {
|
||||
cols = append(cols, data.NotFoundRestoreCollection{Collection: mc})
|
||||
}
|
||||
|
||||
deltas, paths, err := deserializeMetadata(ctx, cols)
|
||||
deltas, paths, err := deserializeMetadata(ctx, cols, fault.New(true))
|
||||
test.errCheck(t, err)
|
||||
|
||||
assert.Equal(t, test.expectedDeltas, deltas)
|
||||
@ -1597,7 +1598,7 @@ func (suite *OneDriveCollectionsSuite) TestGet() {
|
||||
assert.NoError(t, err, "creating metadata collection")
|
||||
|
||||
prevMetadata := []data.RestoreCollection{data.NotFoundRestoreCollection{Collection: mc}}
|
||||
cols, delList, err := c.Get(ctx, prevMetadata)
|
||||
cols, delList, err := c.Get(ctx, prevMetadata, fault.New(true))
|
||||
test.errCheck(t, err)
|
||||
|
||||
if err != nil {
|
||||
@ -1607,9 +1608,12 @@ func (suite *OneDriveCollectionsSuite) TestGet() {
|
||||
for _, baseCol := range cols {
|
||||
folderPath := baseCol.FullPath().String()
|
||||
if folderPath == metadataPath.String() {
|
||||
deltas, paths, err := deserializeMetadata(ctx, []data.RestoreCollection{
|
||||
data.NotFoundRestoreCollection{Collection: baseCol},
|
||||
})
|
||||
deltas, paths, err := deserializeMetadata(
|
||||
ctx,
|
||||
[]data.RestoreCollection{
|
||||
data.NotFoundRestoreCollection{Collection: baseCol},
|
||||
},
|
||||
fault.New(true))
|
||||
if !assert.NoError(t, err, "deserializing metadata") {
|
||||
continue
|
||||
}
|
||||
@ -1804,6 +1808,7 @@ func (suite *OneDriveCollectionsSuite) TestCollectItems() {
|
||||
newPaths map[string]string,
|
||||
excluded map[string]struct{},
|
||||
doNotMergeItems bool,
|
||||
errs *fault.Errors,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
@ -1816,7 +1821,7 @@ func (suite *OneDriveCollectionsSuite) TestCollectItems() {
|
||||
collectorFunc,
|
||||
map[string]string{},
|
||||
test.prevDelta,
|
||||
)
|
||||
fault.New(true))
|
||||
|
||||
require.ErrorIs(suite.T(), err, test.err, "delta fetch err")
|
||||
require.Equal(suite.T(), test.deltaURL, delta.URL, "delta url")
|
||||
|
||||
@ -6,9 +6,9 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
msdrive "github.com/microsoftgraph/msgraph-sdk-go/drive"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
@ -17,10 +17,11 @@ import (
|
||||
gapi "github.com/alcionai/corso/src/internal/connector/graph/api"
|
||||
"github.com/alcionai/corso/src/internal/connector/onedrive/api"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
var errFolderNotFound = errors.New("folder not found")
|
||||
var errFolderNotFound = clues.New("folder not found")
|
||||
|
||||
const (
|
||||
getDrivesRetries = 3
|
||||
@ -77,8 +78,6 @@ func drives(
|
||||
retry bool,
|
||||
) ([]models.Driveable, error) {
|
||||
var (
|
||||
err error
|
||||
page gapi.PageLinker
|
||||
numberOfRetries = getDrivesRetries
|
||||
drives = []models.Driveable{}
|
||||
)
|
||||
@ -89,30 +88,31 @@ func drives(
|
||||
|
||||
// Loop through all pages returned by Graph API.
|
||||
for {
|
||||
var (
|
||||
err error
|
||||
page gapi.PageLinker
|
||||
)
|
||||
|
||||
// Retry Loop for Drive retrieval. Request can timeout
|
||||
for i := 0; i <= numberOfRetries; i++ {
|
||||
page, err = pager.GetPage(ctx)
|
||||
if err != nil {
|
||||
// Various error handling. May return an error or perform a retry.
|
||||
detailedError := support.ConnectorStackErrorTraceWrap(err, "").Error()
|
||||
if strings.Contains(detailedError, userMysiteURLNotFound) ||
|
||||
strings.Contains(detailedError, userMysiteURLNotFoundMsg) ||
|
||||
strings.Contains(detailedError, userMysiteNotFound) ||
|
||||
strings.Contains(detailedError, userMysiteNotFoundMsg) {
|
||||
errMsg := support.ConnectorStackErrorTraceWrap(err, "").Error()
|
||||
if strings.Contains(errMsg, userMysiteURLNotFound) ||
|
||||
strings.Contains(errMsg, userMysiteURLNotFoundMsg) ||
|
||||
strings.Contains(errMsg, userMysiteNotFound) ||
|
||||
strings.Contains(errMsg, userMysiteNotFoundMsg) {
|
||||
logger.Ctx(ctx).Infof("resource owner does not have a drive")
|
||||
return make([]models.Driveable, 0), nil // no license or drives.
|
||||
}
|
||||
|
||||
if strings.Contains(detailedError, contextDeadlineExceeded) && i < numberOfRetries {
|
||||
if strings.Contains(errMsg, contextDeadlineExceeded) && i < numberOfRetries {
|
||||
time.Sleep(time.Duration(3*(i+1)) * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to retrieve drives. details: %s",
|
||||
detailedError,
|
||||
)
|
||||
return nil, clues.Wrap(err, "retrieving drives").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
// No error encountered, break the retry loop so we can extract results
|
||||
@ -122,7 +122,7 @@ func drives(
|
||||
|
||||
tmp, err := pager.ValuesIn(page)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "extracting drives from response")
|
||||
return nil, clues.Wrap(err, "extracting drives from response").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
drives = append(drives, tmp...)
|
||||
@ -135,7 +135,7 @@ func drives(
|
||||
pager.SetNext(nextLink)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Debugf("Found %d drives", len(drives))
|
||||
logger.Ctx(ctx).Debugf("retrieved %d valid drives", len(drives))
|
||||
|
||||
return drives, nil
|
||||
}
|
||||
@ -149,6 +149,7 @@ type itemCollector func(
|
||||
newPaths map[string]string,
|
||||
excluded map[string]struct{},
|
||||
validPrevDelta bool,
|
||||
errs *fault.Errors,
|
||||
) error
|
||||
|
||||
type itemPager interface {
|
||||
@ -193,6 +194,7 @@ func collectItems(
|
||||
collector itemCollector,
|
||||
oldPaths map[string]string,
|
||||
prevDelta string,
|
||||
errs *fault.Errors,
|
||||
) (DeltaUpdate, map[string]string, map[string]struct{}, error) {
|
||||
var (
|
||||
newDeltaURL = ""
|
||||
@ -220,19 +222,17 @@ func collectItems(
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return DeltaUpdate{}, nil, nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to query drive items. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
return DeltaUpdate{}, nil, nil, clues.Wrap(err, "getting page").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
vals, err := pager.ValuesIn(page)
|
||||
if err != nil {
|
||||
return DeltaUpdate{}, nil, nil, errors.Wrap(err, "extracting items from response")
|
||||
return DeltaUpdate{}, nil, nil, clues.Wrap(err, "extracting items from response").
|
||||
WithClues(ctx).
|
||||
With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
err = collector(ctx, driveID, driveName, vals, oldPaths, newPaths, excluded, invalidPrevDelta)
|
||||
err = collector(ctx, driveID, driveName, vals, oldPaths, newPaths, excluded, invalidPrevDelta, errs)
|
||||
if err != nil {
|
||||
return DeltaUpdate{}, nil, nil, err
|
||||
}
|
||||
@ -277,25 +277,16 @@ func getFolder(
|
||||
foundItem, err = builder.Get(ctx, nil)
|
||||
|
||||
if err != nil {
|
||||
var oDataError *odataerrors.ODataError
|
||||
if errors.As(err, &oDataError) &&
|
||||
oDataError.GetError() != nil &&
|
||||
oDataError.GetError().GetCode() != nil &&
|
||||
*oDataError.GetError().GetCode() == itemNotFoundErrorCode {
|
||||
return nil, errors.WithStack(errFolderNotFound)
|
||||
if graph.IsErrDeletedInFlight(err) {
|
||||
return nil, clues.Stack(errFolderNotFound, err).WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(err,
|
||||
"failed to get folder %s/%s. details: %s",
|
||||
parentFolderID,
|
||||
folderName,
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
return nil, clues.Wrap(err, "getting folder").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
// Check if the item found is a folder, fail the call if not
|
||||
if foundItem.GetFolder() == nil {
|
||||
return nil, errors.WithStack(errFolderNotFound)
|
||||
return nil, clues.Stack(errFolderNotFound).WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
return foundItem, nil
|
||||
@ -311,16 +302,11 @@ func createItem(
|
||||
// Graph SDK doesn't yet provide a POST method for `/children` so we set the `rawUrl` ourselves as recommended
|
||||
// here: https://github.com/microsoftgraph/msgraph-sdk-go/issues/155#issuecomment-1136254310
|
||||
rawURL := fmt.Sprintf(itemChildrenRawURLFmt, driveID, parentFolderID)
|
||||
|
||||
builder := msdrive.NewItemsRequestBuilder(rawURL, service.Adapter())
|
||||
|
||||
newItem, err := builder.Post(ctx, newItem, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to create item. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
return nil, clues.Wrap(err, "creating item").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
return newItem, nil
|
||||
@ -356,65 +342,71 @@ func GetAllFolders(
|
||||
gs graph.Servicer,
|
||||
pager drivePager,
|
||||
prefix string,
|
||||
errs *fault.Errors,
|
||||
) ([]*Displayable, error) {
|
||||
drives, err := drives(ctx, pager, true)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting OneDrive folders")
|
||||
}
|
||||
|
||||
folders := map[string]*Displayable{}
|
||||
var (
|
||||
folders = map[string]*Displayable{}
|
||||
et = errs.Tracker()
|
||||
)
|
||||
|
||||
for _, d := range drives {
|
||||
_, _, _, err = collectItems(
|
||||
ctx,
|
||||
defaultItemPager(
|
||||
gs,
|
||||
*d.GetId(),
|
||||
"",
|
||||
),
|
||||
*d.GetId(),
|
||||
*d.GetName(),
|
||||
func(
|
||||
innerCtx context.Context,
|
||||
driveID, driveName string,
|
||||
items []models.DriveItemable,
|
||||
oldPaths map[string]string,
|
||||
newPaths map[string]string,
|
||||
excluded map[string]struct{},
|
||||
doNotMergeItems bool,
|
||||
) error {
|
||||
for _, item := range items {
|
||||
// Skip the root item.
|
||||
if item.GetRoot() != nil {
|
||||
continue
|
||||
}
|
||||
if et.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Only selecting folders right now, not packages.
|
||||
if item.GetFolder() == nil {
|
||||
continue
|
||||
}
|
||||
var (
|
||||
id = ptr.Val(d.GetId())
|
||||
name = ptr.Val(d.GetName())
|
||||
)
|
||||
|
||||
if item.GetId() == nil || len(*item.GetId()) == 0 {
|
||||
logger.Ctx(ctx).Warn("folder without ID")
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(*item.GetName(), prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Add the item instead of the folder because the item has more
|
||||
// functionality.
|
||||
folders[*item.GetId()] = &Displayable{item}
|
||||
ictx := clues.Add(ctx, "drive_id", id, "drive_name", name) // TODO: pii
|
||||
collector := func(
|
||||
innerCtx context.Context,
|
||||
driveID, driveName string,
|
||||
items []models.DriveItemable,
|
||||
oldPaths map[string]string,
|
||||
newPaths map[string]string,
|
||||
excluded map[string]struct{},
|
||||
doNotMergeItems bool,
|
||||
errs *fault.Errors,
|
||||
) error {
|
||||
for _, item := range items {
|
||||
// Skip the root item.
|
||||
if item.GetRoot() != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
map[string]string{},
|
||||
"",
|
||||
)
|
||||
// Only selecting folders right now, not packages.
|
||||
if item.GetFolder() == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
itemID := ptr.Val(item.GetId())
|
||||
if len(itemID) == 0 {
|
||||
logger.Ctx(ctx).Info("folder missing ID")
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(*item.GetName(), prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Add the item instead of the folder because the item has more
|
||||
// functionality.
|
||||
folders[itemID] = &Displayable{item}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
_, _, _, err = collectItems(ictx, defaultItemPager(gs, id, ""), id, name, collector, map[string]string{}, "", errs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "getting items for drive %s", *d.GetName())
|
||||
et.Add(clues.Wrap(err, "enumerating items in drive"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -424,7 +416,7 @@ func GetAllFolders(
|
||||
res = append(res, f)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
return res, et.Err()
|
||||
}
|
||||
|
||||
func DeleteItem(
|
||||
@ -435,7 +427,10 @@ func DeleteItem(
|
||||
) error {
|
||||
err := gs.Client().DrivesById(driveID).ItemsById(itemID).Delete(ctx, nil)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "deleting item with ID %s", itemID)
|
||||
return clues.Wrap(err, "deleting item").
|
||||
WithClues(ctx).
|
||||
With("item_id", itemID).
|
||||
With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
@ -399,7 +400,7 @@ func (suite *OneDriveSuite) TestCreateGetDeleteFolder() {
|
||||
pager, err := PagerForSource(OneDriveSource, gs, suite.userID, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
allFolders, err := GetAllFolders(ctx, gs, pager, test.prefix)
|
||||
allFolders, err := GetAllFolders(ctx, gs, pager, test.prefix, fault.New(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
foundFolderIDs := []string{}
|
||||
@ -465,7 +466,7 @@ func (suite *OneDriveSuite) TestOneDriveNewCollections() {
|
||||
service,
|
||||
service.updateStatus,
|
||||
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}},
|
||||
).Get(ctx, nil)
|
||||
).Get(ctx, nil, fault.New(true))
|
||||
assert.NoError(t, err)
|
||||
// Don't expect excludes as this isn't an incremental backup.
|
||||
assert.Empty(t, excludes)
|
||||
|
||||
@ -4,17 +4,17 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/connector/uploadsession"
|
||||
"github.com/alcionai/corso/src/internal/version"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
@ -33,7 +33,12 @@ func getDriveItem(
|
||||
srv graph.Servicer,
|
||||
driveID, itemID string,
|
||||
) (models.DriveItemable, error) {
|
||||
return srv.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil)
|
||||
di, err := srv.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "getting item").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
return di, nil
|
||||
}
|
||||
|
||||
// sharePointItemReader will return a io.ReadCloser for the specified item
|
||||
@ -69,7 +74,7 @@ func oneDriveItemMetaReader(
|
||||
|
||||
metaJSON, err := json.Marshal(meta)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, 0, clues.Wrap(err, "marshalling json").WithClues(ctx)
|
||||
}
|
||||
|
||||
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
|
||||
@ -106,12 +111,12 @@ func oneDriveItemReader(
|
||||
func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, error) {
|
||||
url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("extracting file url: file %s", *item.GetId())
|
||||
return nil, clues.New("extracting file url").With("item_id", ptr.Val(item.GetId()))
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, *url, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "new request")
|
||||
return nil, clues.Wrap(err, "new request").With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
//nolint:lll
|
||||
@ -144,7 +149,7 @@ func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, e
|
||||
return resp, graph.Err503ServiceUnavailable
|
||||
}
|
||||
|
||||
return resp, errors.New("non-2xx http response: " + resp.Status)
|
||||
return resp, clues.Wrap(clues.New(resp.Status), "non-2xx http response")
|
||||
}
|
||||
|
||||
// oneDriveItemInfo will populate a details.OneDriveInfo struct
|
||||
@ -171,9 +176,9 @@ func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDrive
|
||||
|
||||
return &details.OneDriveInfo{
|
||||
ItemType: details.OneDriveItem,
|
||||
ItemName: *di.GetName(),
|
||||
Created: *di.GetCreatedDateTime(),
|
||||
Modified: *di.GetLastModifiedDateTime(),
|
||||
ItemName: ptr.Val(di.GetName()),
|
||||
Created: ptr.Val(di.GetCreatedDateTime()),
|
||||
Modified: ptr.Val(di.GetLastModifiedDateTime()),
|
||||
DriveName: parent,
|
||||
Size: itemSize,
|
||||
Owner: email,
|
||||
@ -187,11 +192,13 @@ func oneDriveItemMetaInfo(
|
||||
ctx context.Context, service graph.Servicer,
|
||||
driveID string, di models.DriveItemable,
|
||||
) (Metadata, error) {
|
||||
itemID := di.GetId()
|
||||
|
||||
perm, err := service.Client().DrivesById(driveID).ItemsById(*itemID).Permissions().Get(ctx, nil)
|
||||
perm, err := service.Client().
|
||||
DrivesById(driveID).
|
||||
ItemsById(ptr.Val(di.GetId())).
|
||||
Permissions().
|
||||
Get(ctx, nil)
|
||||
if err != nil {
|
||||
return Metadata{}, err
|
||||
return Metadata{}, clues.Wrap(err, "getting item metadata").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
uperms := filterUserPermissions(perm.GetValue())
|
||||
@ -223,7 +230,7 @@ func filterUserPermissions(perms []models.Permissionable) []UserPermission {
|
||||
}
|
||||
|
||||
up = append(up, UserPermission{
|
||||
ID: *p.GetId(),
|
||||
ID: ptr.Val(p.GetId()),
|
||||
Roles: roles,
|
||||
Email: *p.GetGrantedToV2().GetUser().GetAdditionalData()["email"].(*string),
|
||||
Expiration: p.GetExpirationDateTime(),
|
||||
@ -275,9 +282,9 @@ func sharePointItemInfo(di models.DriveItemable, itemSize int64) *details.ShareP
|
||||
|
||||
return &details.SharePointInfo{
|
||||
ItemType: details.OneDriveItem,
|
||||
ItemName: *di.GetName(),
|
||||
Created: *di.GetCreatedDateTime(),
|
||||
Modified: *di.GetLastModifiedDateTime(),
|
||||
ItemName: ptr.Val(di.GetName()),
|
||||
Created: ptr.Val(di.GetCreatedDateTime()),
|
||||
Modified: ptr.Val(di.GetLastModifiedDateTime()),
|
||||
DriveName: parent,
|
||||
Size: itemSize,
|
||||
Owner: id,
|
||||
@ -295,20 +302,18 @@ func driveItemWriter(
|
||||
itemSize int64,
|
||||
) (io.Writer, error) {
|
||||
session := msdrives.NewItemItemsItemCreateUploadSessionPostRequestBody()
|
||||
ctx = clues.Add(ctx, "upload_item_id", itemID)
|
||||
|
||||
r, err := service.Client().DrivesById(driveID).ItemsById(itemID).CreateUploadSession().Post(ctx, session, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to create upload session for item %s. details: %s",
|
||||
itemID,
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
return nil, clues.Wrap(err, "creating item upload session").
|
||||
WithClues(ctx).
|
||||
With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
url := *r.GetUploadUrl()
|
||||
logger.Ctx(ctx).Debug("created an upload session")
|
||||
|
||||
logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url)
|
||||
url := ptr.Val(r.GetUploadUrl())
|
||||
|
||||
return uploadsession.NewWriter(itemID, url, itemSize), nil
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
)
|
||||
|
||||
type ItemIntegrationSuite struct {
|
||||
@ -107,6 +108,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
|
||||
newPaths map[string]string,
|
||||
excluded map[string]struct{},
|
||||
doNotMergeItems bool,
|
||||
errs *fault.Errors,
|
||||
) error {
|
||||
for _, item := range items {
|
||||
if item.GetFile() != nil {
|
||||
@ -129,7 +131,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
|
||||
itemCollector,
|
||||
map[string]string{},
|
||||
"",
|
||||
)
|
||||
fault.New(true))
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Test Requirement 2: Need a file
|
||||
|
||||
@ -87,7 +87,8 @@ func DataCollections(
|
||||
site,
|
||||
scope,
|
||||
su,
|
||||
ctrlOpts)
|
||||
ctrlOpts,
|
||||
errs)
|
||||
if err != nil {
|
||||
et.Add(err)
|
||||
continue
|
||||
@ -169,6 +170,7 @@ func collectLibraries(
|
||||
scope selectors.SharePointScope,
|
||||
updater statusUpdater,
|
||||
ctrlOpts control.Options,
|
||||
errs *fault.Errors,
|
||||
) ([]data.BackupCollection, map[string]struct{}, error) {
|
||||
logger.Ctx(ctx).Debug("creating SharePoint Library collections")
|
||||
|
||||
@ -187,7 +189,7 @@ func collectLibraries(
|
||||
|
||||
// TODO(ashmrtn): Pass previous backup metadata when SharePoint supports delta
|
||||
// token-based incrementals.
|
||||
odcs, excludes, err := colls.Get(ctx, nil)
|
||||
odcs, excludes, err := colls.Get(ctx, nil, errs)
|
||||
if err != nil {
|
||||
return nil, nil, clues.Wrap(err, "getting library").WithClues(ctx).With(graph.ErrData(err)...)
|
||||
}
|
||||
|
||||
@ -105,7 +105,7 @@ func (suite *SharePointLibrariesSuite) TestUpdateCollections() {
|
||||
&MockGraphService{},
|
||||
nil,
|
||||
control.Options{})
|
||||
err := c.UpdateCollections(ctx, "driveID1", "General", test.items, paths, newPaths, excluded, true)
|
||||
err := c.UpdateCollections(ctx, "driveID1", "General", test.items, paths, newPaths, excluded, true, fault.New(true))
|
||||
test.expect(t, err)
|
||||
assert.Equal(t, len(test.expectedCollectionIDs), len(c.CollectionMap), "collection paths")
|
||||
assert.Equal(t, test.expectedItemCount, c.NumItems, "item count")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user