add clues & fault to onedrive collections (#2512)
## Does this PR need a docs update or release note? - [ ] ⛔ No ## Type of change - [x] 🧹 Tech Debt/Cleanup ## Issue(s) * #1970 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
b04fc02b9d
commit
8ee8d265b9
@ -157,7 +157,7 @@ func purgeOneDriveFolders(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cfs, err := onedrive.GetAllFolders(ctx, gs, pager, prefix)
|
cfs, err := onedrive.GetAllFolders(ctx, gs, pager, prefix, fault.New(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -97,7 +97,7 @@ func (gc *GraphConnector) DataCollections(
|
|||||||
gc.Service,
|
gc.Service,
|
||||||
gc.UpdateStatus,
|
gc.UpdateStatus,
|
||||||
ctrlOpts,
|
ctrlOpts,
|
||||||
)
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
errCodeActivityLimitReached = "activityLimitReached"
|
errCodeActivityLimitReached = "activityLimitReached"
|
||||||
errCodeItemNotFound = "ErrorItemNotFound"
|
errCodeItemNotFound = "ErrorItemNotFound"
|
||||||
|
errCodeItemNotFoundShort = "itemNotFound"
|
||||||
errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
||||||
errCodeResyncRequired = "ResyncRequired"
|
errCodeResyncRequired = "ResyncRequired"
|
||||||
errCodeSyncFolderNotFound = "ErrorSyncFolderNotFound"
|
errCodeSyncFolderNotFound = "ErrorSyncFolderNotFound"
|
||||||
@ -53,7 +54,12 @@ func IsErrDeletedInFlight(err error) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if hasErrorCode(err, errCodeItemNotFound, errCodeSyncFolderNotFound) {
|
if hasErrorCode(
|
||||||
|
err,
|
||||||
|
errCodeItemNotFound,
|
||||||
|
errCodeItemNotFoundShort,
|
||||||
|
errCodeSyncFolderNotFound,
|
||||||
|
) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
@ -82,7 +83,7 @@ type Collection struct {
|
|||||||
type itemReaderFunc func(
|
type itemReaderFunc func(
|
||||||
hc *http.Client,
|
hc *http.Client,
|
||||||
item models.DriveItemable,
|
item models.DriveItemable,
|
||||||
) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error)
|
) (details.ItemInfo, io.ReadCloser, error)
|
||||||
|
|
||||||
// itemMetaReaderFunc returns a reader for the metadata of the
|
// itemMetaReaderFunc returns a reader for the metadata of the
|
||||||
// specified item
|
// specified item
|
||||||
@ -166,7 +167,7 @@ func (oc *Collection) Items(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
errs *fault.Errors, // TODO: currently unused while onedrive isn't up to date with clues/fault
|
errs *fault.Errors, // TODO: currently unused while onedrive isn't up to date with clues/fault
|
||||||
) <-chan data.Stream {
|
) <-chan data.Stream {
|
||||||
go oc.populateItems(ctx)
|
go oc.populateItems(ctx, errs)
|
||||||
return oc.data
|
return oc.data
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,23 +241,22 @@ func (od *Item) ModTime() time.Time {
|
|||||||
|
|
||||||
// populateItems iterates through items added to the collection
|
// populateItems iterates through items added to the collection
|
||||||
// and uses the collection `itemReader` to read the item
|
// and uses the collection `itemReader` to read the item
|
||||||
func (oc *Collection) populateItems(ctx context.Context) {
|
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) {
|
||||||
var (
|
var (
|
||||||
errs error
|
|
||||||
byteCount int64
|
byteCount int64
|
||||||
itemsRead int64
|
itemsRead int64
|
||||||
dirsRead int64
|
dirsRead int64
|
||||||
itemsFound int64
|
itemsFound int64
|
||||||
dirsFound int64
|
dirsFound int64
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
m sync.Mutex
|
et = errs.Tracker()
|
||||||
)
|
)
|
||||||
|
|
||||||
// Retrieve the OneDrive folder path to set later in
|
// Retrieve the OneDrive folder path to set later in
|
||||||
// `details.OneDriveInfo`
|
// `details.OneDriveInfo`
|
||||||
parentPathString, err := path.GetDriveFolderPath(oc.folderPath)
|
parentPathString, err := path.GetDriveFolderPath(oc.folderPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
oc.reportAsCompleted(ctx, 0, 0, 0, err)
|
oc.reportAsCompleted(ctx, 0, 0, 0, clues.Wrap(err, "getting drive path").WithClues(ctx))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -271,15 +271,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
||||||
defer close(semaphoreCh)
|
defer close(semaphoreCh)
|
||||||
|
|
||||||
errUpdater := func(id string, err error) {
|
|
||||||
m.Lock()
|
|
||||||
// TODO: Label(fault.LabelForceNoBackupCreation)
|
|
||||||
errs = support.WrapAndAppend(id, err, errs)
|
|
||||||
m.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range oc.driveItems {
|
for _, item := range oc.driveItems {
|
||||||
if oc.ctrl.FailFast && errs != nil {
|
if et.Err() != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -287,22 +280,27 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func(item models.DriveItemable) {
|
go func(ctx context.Context, item models.DriveItemable) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer func() { <-semaphoreCh }()
|
defer func() { <-semaphoreCh }()
|
||||||
|
|
||||||
// Read the item
|
// Read the item
|
||||||
var (
|
var (
|
||||||
itemID = *item.GetId()
|
itemID = ptr.Val(item.GetId())
|
||||||
itemName = *item.GetName()
|
itemName = ptr.Val(item.GetName())
|
||||||
itemSize = *item.GetSize()
|
itemSize = ptr.Val(item.GetSize())
|
||||||
itemInfo details.ItemInfo
|
itemInfo details.ItemInfo
|
||||||
itemMeta io.ReadCloser
|
itemMeta io.ReadCloser
|
||||||
itemMetaSize int
|
itemMetaSize int
|
||||||
metaSuffix string
|
metaSuffix string
|
||||||
err error
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ctx = clues.Add(ctx,
|
||||||
|
"restore_item_id", itemID,
|
||||||
|
"restore_item_name", itemName,
|
||||||
|
"restore_item_size", itemSize,
|
||||||
|
"restore_item_info", itemInfo)
|
||||||
|
|
||||||
isFile := item.GetFile() != nil
|
isFile := item.GetFile() != nil
|
||||||
|
|
||||||
if isFile {
|
if isFile {
|
||||||
@ -325,7 +323,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
oc.ctrl.ToggleFeatures.EnablePermissionsBackup)
|
oc.ctrl.ToggleFeatures.EnablePermissionsBackup)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errUpdater(itemID, clues.Wrap(err, "getting item metadata"))
|
et.Add(clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -372,7 +370,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
|
|
||||||
// check for errors following retries
|
// check for errors following retries
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errUpdater(itemID, err)
|
et.Add(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,8 +380,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
itemData,
|
itemData,
|
||||||
observe.ItemBackupMsg,
|
observe.ItemBackupMsg,
|
||||||
observe.PII(itemName+dataSuffix),
|
observe.PII(itemName+dataSuffix),
|
||||||
itemSize,
|
itemSize)
|
||||||
)
|
|
||||||
go closer()
|
go closer()
|
||||||
|
|
||||||
return progReader, nil
|
return progReader, nil
|
||||||
@ -441,18 +438,17 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
atomic.AddInt64(&byteCount, itemSize)
|
atomic.AddInt64(&byteCount, itemSize)
|
||||||
|
|
||||||
folderProgress <- struct{}{}
|
folderProgress <- struct{}{}
|
||||||
}(item)
|
}(ctx, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, errs)
|
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, et.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64, errs error) {
|
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64, err error) {
|
||||||
close(oc.data)
|
close(oc.data)
|
||||||
|
|
||||||
// TODO: add Label(fault.LabelForceNoBackupCreation) to errs
|
|
||||||
status := support.CreateStatus(ctx, support.Backup,
|
status := support.CreateStatus(ctx, support.Backup,
|
||||||
1, // num folders (always 1)
|
1, // num folders (always 1)
|
||||||
support.CollectionMetrics{
|
support.CollectionMetrics{
|
||||||
@ -460,7 +456,7 @@ func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRe
|
|||||||
Successes: itemsRead, // items read successfully,
|
Successes: itemsRead, // items read successfully,
|
||||||
TotalBytes: byteCount, // Number of bytes read in the operation,
|
TotalBytes: byteCount, // Number of bytes read in the operation,
|
||||||
},
|
},
|
||||||
errs,
|
err,
|
||||||
oc.folderPath.Folder(false), // Additional details
|
oc.folderPath.Folder(false), // Additional details
|
||||||
)
|
)
|
||||||
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
|
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/internal/observe"
|
"github.com/alcionai/corso/src/internal/observe"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
)
|
)
|
||||||
@ -118,32 +119,40 @@ func NewCollections(
|
|||||||
func deserializeMetadata(
|
func deserializeMetadata(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
cols []data.RestoreCollection,
|
cols []data.RestoreCollection,
|
||||||
|
errs *fault.Errors,
|
||||||
) (map[string]string, map[string]map[string]string, error) {
|
) (map[string]string, map[string]map[string]string, error) {
|
||||||
logger.Ctx(ctx).Infow(
|
logger.Ctx(ctx).Infow(
|
||||||
"deserialzing previous backup metadata",
|
"deserialzing previous backup metadata",
|
||||||
"num_collections",
|
"num_collections", len(cols))
|
||||||
len(cols),
|
|
||||||
|
var (
|
||||||
|
prevDeltas = map[string]string{}
|
||||||
|
prevFolders = map[string]map[string]string{}
|
||||||
|
et = errs.Tracker()
|
||||||
)
|
)
|
||||||
|
|
||||||
prevDeltas := map[string]string{}
|
|
||||||
prevFolders := map[string]map[string]string{}
|
|
||||||
|
|
||||||
for _, col := range cols {
|
for _, col := range cols {
|
||||||
items := col.Items(ctx, nil) // TODO: fault.Errors instead of nil
|
if et.Err() != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
items := col.Items(ctx, errs)
|
||||||
|
|
||||||
for breakLoop := false; !breakLoop; {
|
for breakLoop := false; !breakLoop; {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, nil, errors.Wrap(ctx.Err(), "deserialzing previous backup metadata")
|
return nil, nil, clues.Wrap(ctx.Err(), "deserialzing previous backup metadata").WithClues(ctx)
|
||||||
|
|
||||||
case item, ok := <-items:
|
case item, ok := <-items:
|
||||||
if !ok {
|
if !ok {
|
||||||
// End of collection items.
|
|
||||||
breakLoop = true
|
breakLoop = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var (
|
||||||
|
err error
|
||||||
|
ictx = clues.Add(ctx, "item_uuid", item.UUID())
|
||||||
|
)
|
||||||
|
|
||||||
switch item.UUID() {
|
switch item.UUID() {
|
||||||
case graph.PreviousPathFileName:
|
case graph.PreviousPathFileName:
|
||||||
@ -153,11 +162,9 @@ func deserializeMetadata(
|
|||||||
err = deserializeMap(item.ToReader(), prevDeltas)
|
err = deserializeMap(item.ToReader(), prevDeltas)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
logger.Ctx(ctx).Infow(
|
logger.Ctx(ictx).Infow(
|
||||||
"skipping unknown metadata file",
|
"skipping unknown metadata file",
|
||||||
"file_name",
|
"file_name", item.UUID())
|
||||||
item.UUID(),
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -174,20 +181,15 @@ func deserializeMetadata(
|
|||||||
// we end up in a situation where we're sourcing items from the wrong
|
// we end up in a situation where we're sourcing items from the wrong
|
||||||
// base in kopia wrapper.
|
// base in kopia wrapper.
|
||||||
if errors.Is(err, errExistingMapping) {
|
if errors.Is(err, errExistingMapping) {
|
||||||
return nil, nil, errors.Wrapf(
|
return nil, nil, clues.Wrap(err, "deserializing metadata file").WithClues(ictx)
|
||||||
err,
|
|
||||||
"deserializing metadata file %s",
|
|
||||||
item.UUID(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Ctx(ctx).Errorw(
|
err = clues.Stack(err).WithClues(ictx)
|
||||||
"deserializing base backup metadata. Falling back to full backup for selected drives",
|
|
||||||
"error",
|
et.Add(err)
|
||||||
err,
|
logger.Ctx(ictx).
|
||||||
"file_name",
|
With("err", err).
|
||||||
item.UUID(),
|
Errorw("deserializing base backup metadata", clues.InErr(err).Slice()...)
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,10 +216,10 @@ func deserializeMetadata(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return prevDeltas, prevFolders, nil
|
return prevDeltas, prevFolders, et.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
var errExistingMapping = errors.New("mapping already exists for same drive ID")
|
var errExistingMapping = clues.New("mapping already exists for same drive ID")
|
||||||
|
|
||||||
// deserializeMap takes an reader and a map of already deserialized items and
|
// deserializeMap takes an reader and a map of already deserialized items and
|
||||||
// adds the newly deserialized items to alreadyFound. Items are only added to
|
// adds the newly deserialized items to alreadyFound. Items are only added to
|
||||||
@ -228,8 +230,7 @@ func deserializeMap[T any](reader io.ReadCloser, alreadyFound map[string]T) erro
|
|||||||
|
|
||||||
tmp := map[string]T{}
|
tmp := map[string]T{}
|
||||||
|
|
||||||
err := json.NewDecoder(reader).Decode(&tmp)
|
if err := json.NewDecoder(reader).Decode(&tmp); err != nil {
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "deserializing file contents")
|
return errors.Wrap(err, "deserializing file contents")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +244,7 @@ func deserializeMap[T any](reader io.ReadCloser, alreadyFound map[string]T) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
if duplicate {
|
if duplicate {
|
||||||
return errors.WithStack(errExistingMapping)
|
return clues.Stack(errExistingMapping)
|
||||||
}
|
}
|
||||||
|
|
||||||
maps.Copy(alreadyFound, tmp)
|
maps.Copy(alreadyFound, tmp)
|
||||||
@ -256,8 +257,9 @@ func deserializeMap[T any](reader io.ReadCloser, alreadyFound map[string]T) erro
|
|||||||
func (c *Collections) Get(
|
func (c *Collections) Get(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prevMetadata []data.RestoreCollection,
|
prevMetadata []data.RestoreCollection,
|
||||||
|
errs *fault.Errors,
|
||||||
) ([]data.BackupCollection, map[string]struct{}, error) {
|
) ([]data.BackupCollection, map[string]struct{}, error) {
|
||||||
prevDeltas, oldPathsByDriveID, err := deserializeMetadata(ctx, prevMetadata)
|
prevDeltas, oldPathsByDriveID, err := deserializeMetadata(ctx, prevMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -265,7 +267,7 @@ func (c *Collections) Get(
|
|||||||
// Enumerate drives for the specified resourceOwner
|
// Enumerate drives for the specified resourceOwner
|
||||||
pager, err := c.drivePagerFunc(c.source, c.service, c.resourceOwner, nil)
|
pager, err := c.drivePagerFunc(c.source, c.service, c.resourceOwner, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
retry := c.source == OneDriveSource
|
retry := c.source == OneDriveSource
|
||||||
@ -288,39 +290,33 @@ func (c *Collections) Get(
|
|||||||
excludedItems = map[string]struct{}{}
|
excludedItems = map[string]struct{}{}
|
||||||
)
|
)
|
||||||
|
|
||||||
// Update the collection map with items from each drive
|
|
||||||
for _, d := range drives {
|
for _, d := range drives {
|
||||||
driveID := *d.GetId()
|
var (
|
||||||
driveName := *d.GetName()
|
driveID = ptr.Val(d.GetId())
|
||||||
|
driveName = ptr.Val(d.GetName())
|
||||||
|
prevDelta = prevDeltas[driveID]
|
||||||
|
oldPaths = oldPathsByDriveID[driveID]
|
||||||
|
numOldDelta = 0
|
||||||
|
)
|
||||||
|
|
||||||
prevDelta := prevDeltas[driveID]
|
|
||||||
oldPaths := oldPathsByDriveID[driveID]
|
|
||||||
|
|
||||||
numOldDelta := 0
|
|
||||||
if len(prevDelta) > 0 {
|
if len(prevDelta) > 0 {
|
||||||
numOldDelta++
|
numOldDelta++
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Ctx(ctx).Infow(
|
logger.Ctx(ctx).Infow(
|
||||||
"previous metadata for drive",
|
"previous metadata for drive",
|
||||||
"num_paths_entries",
|
"num_paths_entries", len(oldPaths),
|
||||||
len(oldPaths),
|
"num_deltas_entries", numOldDelta)
|
||||||
"num_deltas_entries",
|
|
||||||
numOldDelta)
|
|
||||||
|
|
||||||
delta, paths, excluded, err := collectItems(
|
delta, paths, excluded, err := collectItems(
|
||||||
ctx,
|
ctx,
|
||||||
c.itemPagerFunc(
|
c.itemPagerFunc(c.service, driveID, ""),
|
||||||
c.service,
|
|
||||||
driveID,
|
|
||||||
"",
|
|
||||||
),
|
|
||||||
driveID,
|
driveID,
|
||||||
driveName,
|
driveName,
|
||||||
c.UpdateCollections,
|
c.UpdateCollections,
|
||||||
oldPaths,
|
oldPaths,
|
||||||
prevDelta,
|
prevDelta,
|
||||||
)
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -418,18 +414,15 @@ func (c *Collections) Get(
|
|||||||
graph.NewMetadataEntry(graph.PreviousPathFileName, folderPaths),
|
graph.NewMetadataEntry(graph.PreviousPathFileName, folderPaths),
|
||||||
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
|
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
|
||||||
},
|
},
|
||||||
c.statusUpdater,
|
c.statusUpdater)
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Technically it's safe to continue here because the logic for starting an
|
// Technically it's safe to continue here because the logic for starting an
|
||||||
// incremental backup should eventually find that the metadata files are
|
// incremental backup should eventually find that the metadata files are
|
||||||
// empty/missing and default to a full backup.
|
// empty/missing and default to a full backup.
|
||||||
logger.Ctx(ctx).Warnw(
|
logger.Ctx(ctx).
|
||||||
"making metadata collection for future incremental backups",
|
With("err", err).
|
||||||
"error",
|
Infow("making metadata collection for future incremental backups", clues.InErr(err).Slice()...)
|
||||||
err,
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
collections = append(collections, metadata)
|
collections = append(collections, metadata)
|
||||||
}
|
}
|
||||||
@ -500,8 +493,15 @@ func (c *Collections) UpdateCollections(
|
|||||||
excluded map[string]struct{},
|
excluded map[string]struct{},
|
||||||
itemCollection map[string]string,
|
itemCollection map[string]string,
|
||||||
invalidPrevDelta bool,
|
invalidPrevDelta bool,
|
||||||
|
errs *fault.Errors,
|
||||||
) error {
|
) error {
|
||||||
|
et := errs.Tracker()
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
|
if et.Err() != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
prevPath path.Path
|
prevPath path.Path
|
||||||
prevCollectionPath path.Path
|
prevCollectionPath path.Path
|
||||||
@ -527,25 +527,31 @@ func (c *Collections) UpdateCollections(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
itemID = ptr.Val(item.GetId())
|
||||||
|
ictx = clues.Add(ctx, "update_item_id", itemID)
|
||||||
|
)
|
||||||
|
|
||||||
if item.GetParentReference() == nil ||
|
if item.GetParentReference() == nil ||
|
||||||
item.GetParentReference().GetId() == nil ||
|
item.GetParentReference().GetId() == nil ||
|
||||||
(item.GetDeleted() == nil && item.GetParentReference().GetPath() == nil) {
|
(item.GetDeleted() == nil && item.GetParentReference().GetPath() == nil) {
|
||||||
err := clues.New("no parent reference").With("item_id", *item.GetId())
|
et.Add(clues.New("item missing parent reference").
|
||||||
if item.GetName() != nil {
|
WithClues(ictx).
|
||||||
err = err.With("item_name", *item.GetName())
|
With("item_id", itemID, "item_name", ptr.Val(item.GetName())).
|
||||||
}
|
Label(fault.LabelForceNoBackupCreation))
|
||||||
|
|
||||||
return err
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a collection for the parent of this item
|
// Create a collection for the parent of this item
|
||||||
collectionID := *item.GetParentReference().GetId()
|
collectionID := ptr.Val(item.GetParentReference().GetId())
|
||||||
|
ictx = clues.Add(ictx, "collection_id", collectionID)
|
||||||
|
|
||||||
var collectionPathStr string
|
var collectionPathStr string
|
||||||
if item.GetDeleted() == nil {
|
if item.GetDeleted() == nil {
|
||||||
collectionPathStr = *item.GetParentReference().GetPath()
|
collectionPathStr = ptr.Val(item.GetParentReference().GetPath())
|
||||||
} else {
|
} else {
|
||||||
collectionPathStr, ok = oldPaths[*item.GetParentReference().GetId()]
|
collectionPathStr, ok = oldPaths[ptr.Val(item.GetParentReference().GetId())]
|
||||||
if !ok {
|
if !ok {
|
||||||
// This collection was created and destroyed in
|
// This collection was created and destroyed in
|
||||||
// between the current and previous invocation
|
// between the current and previous invocation
|
||||||
@ -557,10 +563,9 @@ func (c *Collections) UpdateCollections(
|
|||||||
collectionPathStr,
|
collectionPathStr,
|
||||||
c.tenant,
|
c.tenant,
|
||||||
c.resourceOwner,
|
c.resourceOwner,
|
||||||
c.source,
|
c.source)
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return clues.Stack(err).WithClues(ictx)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -583,17 +588,19 @@ func (c *Collections) UpdateCollections(
|
|||||||
// Skip items that don't match the folder selectors we were given.
|
// Skip items that don't match the folder selectors we were given.
|
||||||
if shouldSkipDrive(ctx, itemPath, c.matcher, driveName) &&
|
if shouldSkipDrive(ctx, itemPath, c.matcher, driveName) &&
|
||||||
shouldSkipDrive(ctx, collectionPath, c.matcher, driveName) {
|
shouldSkipDrive(ctx, collectionPath, c.matcher, driveName) {
|
||||||
logger.Ctx(ctx).Infof("Skipping path %s", collectionPath.String())
|
logger.Ctx(ictx).Infow("Skipping path", "skipped_path", collectionPath.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case item.GetFolder() != nil, item.GetPackage() != nil:
|
case item.GetFolder() != nil, item.GetPackage() != nil:
|
||||||
prevPathStr, ok := oldPaths[*item.GetId()]
|
prevPathStr, ok := oldPaths[itemID]
|
||||||
if ok {
|
if ok {
|
||||||
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
|
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return clues.Wrap(err, "invalid previous path").With("path_string", prevPathStr)
|
et.Add(clues.Wrap(err, "invalid previous path").
|
||||||
|
WithClues(ictx).
|
||||||
|
With("path_string", prevPathStr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -601,7 +608,7 @@ func (c *Collections) UpdateCollections(
|
|||||||
// Nested folders also return deleted delta results so we don't have to
|
// Nested folders also return deleted delta results so we don't have to
|
||||||
// worry about doing a prefix search in the map to remove the subtree of
|
// worry about doing a prefix search in the map to remove the subtree of
|
||||||
// the deleted folder/package.
|
// the deleted folder/package.
|
||||||
delete(newPaths, *item.GetId())
|
delete(newPaths, itemID)
|
||||||
|
|
||||||
if prevPath == nil {
|
if prevPath == nil {
|
||||||
// It is possible that an item was created and
|
// It is possible that an item was created and
|
||||||
@ -620,10 +627,9 @@ func (c *Collections) UpdateCollections(
|
|||||||
c.statusUpdater,
|
c.statusUpdater,
|
||||||
c.source,
|
c.source,
|
||||||
c.ctrl,
|
c.ctrl,
|
||||||
invalidPrevDelta,
|
invalidPrevDelta)
|
||||||
)
|
|
||||||
|
|
||||||
c.CollectionMap[*item.GetId()] = col
|
c.CollectionMap[itemID] = col
|
||||||
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -635,7 +641,7 @@ func (c *Collections) UpdateCollections(
|
|||||||
|
|
||||||
found, err := updateCollectionPaths(*item.GetId(), c.CollectionMap, itemPath)
|
found, err := updateCollectionPaths(*item.GetId(), c.CollectionMap, itemPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return clues.Stack(err).WithClues(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
@ -674,8 +680,8 @@ func (c *Collections) UpdateCollections(
|
|||||||
// deleted, we want to avoid it. If it was
|
// deleted, we want to avoid it. If it was
|
||||||
// renamed/moved/modified, we still have to drop the
|
// renamed/moved/modified, we still have to drop the
|
||||||
// original one and download a fresh copy.
|
// original one and download a fresh copy.
|
||||||
excluded[*item.GetId()+DataFileSuffix] = struct{}{}
|
excluded[itemID+DataFileSuffix] = struct{}{}
|
||||||
excluded[*item.GetId()+MetaFileSuffix] = struct{}{}
|
excluded[itemID+MetaFileSuffix] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if item.GetDeleted() != nil {
|
if item.GetDeleted() != nil {
|
||||||
@ -761,11 +767,11 @@ func (c *Collections) UpdateCollections(
|
|||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return errors.Errorf("item type not supported. item name : %s", *item.GetName())
|
return clues.New("item type not supported").WithClues(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return et.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldSkipDrive(ctx context.Context, drivePath path.Path, m folderMatcher, driveName string) bool {
|
func shouldSkipDrive(ctx context.Context, drivePath path.Path, m folderMatcher, driveName string) bool {
|
||||||
@ -791,7 +797,7 @@ func GetCanonicalPath(p, tenant, resourceOwner string, source driveSource) (path
|
|||||||
case SharePointSource:
|
case SharePointSource:
|
||||||
result, err = pathBuilder.ToDataLayerSharePointPath(tenant, resourceOwner, path.LibrariesCategory, false)
|
result, err = pathBuilder.ToDataLayerSharePointPath(tenant, resourceOwner, path.LibrariesCategory, false)
|
||||||
default:
|
default:
|
||||||
return nil, errors.Errorf("unrecognized drive data source")
|
return nil, clues.New("unrecognized data source")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -805,7 +811,7 @@ func includePath(ctx context.Context, m folderMatcher, folderPath path.Path) boo
|
|||||||
// Check if the folder is allowed by the scope.
|
// Check if the folder is allowed by the scope.
|
||||||
folderPathString, err := path.GetDriveFolderPath(folderPath)
|
folderPathString, err := path.GetDriveFolderPath(folderPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Ctx(ctx).Error(err)
|
logger.Ctx(ctx).With("err", err).Error("getting drive folder path")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
)
|
)
|
||||||
@ -740,7 +741,7 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() {
|
|||||||
excludes,
|
excludes,
|
||||||
itemCollection,
|
itemCollection,
|
||||||
false,
|
false,
|
||||||
)
|
fault.New(true))
|
||||||
tt.expect(t, err)
|
tt.expect(t, err)
|
||||||
assert.Equal(t, len(tt.expectedCollectionIDs), len(c.CollectionMap), "total collections")
|
assert.Equal(t, len(tt.expectedCollectionIDs), len(c.CollectionMap), "total collections")
|
||||||
assert.Equal(t, tt.expectedItemCount, c.NumItems, "item count")
|
assert.Equal(t, tt.expectedItemCount, c.NumItems, "item count")
|
||||||
@ -971,7 +972,7 @@ func (suite *OneDriveCollectionsSuite) TestDeserializeMetadata() {
|
|||||||
},
|
},
|
||||||
expectedDeltas: map[string]string{},
|
expectedDeltas: map[string]string{},
|
||||||
expectedPaths: map[string]map[string]string{},
|
expectedPaths: map[string]map[string]string{},
|
||||||
errCheck: assert.NoError,
|
errCheck: assert.Error,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// Unexpected files are logged and skipped. They don't cause an error to
|
// Unexpected files are logged and skipped. They don't cause an error to
|
||||||
@ -1094,14 +1095,13 @@ func (suite *OneDriveCollectionsSuite) TestDeserializeMetadata() {
|
|||||||
path.OneDriveService,
|
path.OneDriveService,
|
||||||
path.FilesCategory,
|
path.FilesCategory,
|
||||||
c(),
|
c(),
|
||||||
func(*support.ConnectorOperationStatus) {},
|
func(*support.ConnectorOperationStatus) {})
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
cols = append(cols, data.NotFoundRestoreCollection{Collection: mc})
|
cols = append(cols, data.NotFoundRestoreCollection{Collection: mc})
|
||||||
}
|
}
|
||||||
|
|
||||||
deltas, paths, err := deserializeMetadata(ctx, cols)
|
deltas, paths, err := deserializeMetadata(ctx, cols, fault.New(true))
|
||||||
test.errCheck(t, err)
|
test.errCheck(t, err)
|
||||||
|
|
||||||
assert.Equal(t, test.expectedDeltas, deltas)
|
assert.Equal(t, test.expectedDeltas, deltas)
|
||||||
@ -1786,7 +1786,7 @@ func (suite *OneDriveCollectionsSuite) TestGet() {
|
|||||||
assert.NoError(t, err, "creating metadata collection")
|
assert.NoError(t, err, "creating metadata collection")
|
||||||
|
|
||||||
prevMetadata := []data.RestoreCollection{data.NotFoundRestoreCollection{Collection: mc}}
|
prevMetadata := []data.RestoreCollection{data.NotFoundRestoreCollection{Collection: mc}}
|
||||||
cols, delList, err := c.Get(ctx, prevMetadata)
|
cols, delList, err := c.Get(ctx, prevMetadata, fault.New(true))
|
||||||
test.errCheck(t, err)
|
test.errCheck(t, err)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1803,9 +1803,12 @@ func (suite *OneDriveCollectionsSuite) TestGet() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if folderPath == metadataPath.String() {
|
if folderPath == metadataPath.String() {
|
||||||
deltas, paths, err := deserializeMetadata(ctx, []data.RestoreCollection{
|
deltas, paths, err := deserializeMetadata(
|
||||||
data.NotFoundRestoreCollection{Collection: baseCol},
|
ctx,
|
||||||
})
|
[]data.RestoreCollection{
|
||||||
|
data.NotFoundRestoreCollection{Collection: baseCol},
|
||||||
|
},
|
||||||
|
fault.New(true))
|
||||||
if !assert.NoError(t, err, "deserializing metadata") {
|
if !assert.NoError(t, err, "deserializing metadata") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -2016,6 +2019,7 @@ func (suite *OneDriveCollectionsSuite) TestCollectItems() {
|
|||||||
excluded map[string]struct{},
|
excluded map[string]struct{},
|
||||||
itemCollection map[string]string,
|
itemCollection map[string]string,
|
||||||
doNotMergeItems bool,
|
doNotMergeItems bool,
|
||||||
|
errs *fault.Errors,
|
||||||
) error {
|
) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -2028,7 +2032,7 @@ func (suite *OneDriveCollectionsSuite) TestCollectItems() {
|
|||||||
collectorFunc,
|
collectorFunc,
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
test.prevDelta,
|
test.prevDelta,
|
||||||
)
|
fault.New(true))
|
||||||
|
|
||||||
require.ErrorIs(t, err, test.err, "delta fetch err")
|
require.ErrorIs(t, err, test.err, "delta fetch err")
|
||||||
require.Equal(t, test.deltaURL, delta.URL, "delta url")
|
require.Equal(t, test.deltaURL, delta.URL, "delta url")
|
||||||
|
|||||||
@ -9,15 +9,12 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
// OneDrive
|
|
||||||
// ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
type odFolderMatcher struct {
|
type odFolderMatcher struct {
|
||||||
scope selectors.OneDriveScope
|
scope selectors.OneDriveScope
|
||||||
}
|
}
|
||||||
@ -41,6 +38,7 @@ func DataCollections(
|
|||||||
service graph.Servicer,
|
service graph.Servicer,
|
||||||
su support.StatusUpdater,
|
su support.StatusUpdater,
|
||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
|
errs *fault.Errors,
|
||||||
) ([]data.BackupCollection, map[string]struct{}, error) {
|
) ([]data.BackupCollection, map[string]struct{}, error) {
|
||||||
odb, err := selector.ToOneDriveBackup()
|
odb, err := selector.ToOneDriveBackup()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -48,6 +46,7 @@ func DataCollections(
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
et = errs.Tracker()
|
||||||
user = selector.DiscreteOwner
|
user = selector.DiscreteOwner
|
||||||
collections = []data.BackupCollection{}
|
collections = []data.BackupCollection{}
|
||||||
allExcludes = map[string]struct{}{}
|
allExcludes = map[string]struct{}{}
|
||||||
@ -55,9 +54,13 @@ func DataCollections(
|
|||||||
|
|
||||||
// for each scope that includes oneDrive items, get all
|
// for each scope that includes oneDrive items, get all
|
||||||
for _, scope := range odb.Scopes() {
|
for _, scope := range odb.Scopes() {
|
||||||
logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections")
|
if et.Err() != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
odcs, excludes, err := NewCollections(
|
logger.Ctx(ctx).Debug("creating OneDrive collections")
|
||||||
|
|
||||||
|
nc := NewCollections(
|
||||||
itemClient,
|
itemClient,
|
||||||
tenant,
|
tenant,
|
||||||
user,
|
user,
|
||||||
@ -65,10 +68,11 @@ func DataCollections(
|
|||||||
odFolderMatcher{scope},
|
odFolderMatcher{scope},
|
||||||
service,
|
service,
|
||||||
su,
|
su,
|
||||||
ctrlOpts,
|
ctrlOpts)
|
||||||
).Get(ctx, metadata)
|
|
||||||
|
odcs, excludes, err := nc.Get(ctx, metadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
et.Add(clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
|
||||||
}
|
}
|
||||||
|
|
||||||
collections = append(collections, odcs...)
|
collections = append(collections, odcs...)
|
||||||
@ -76,5 +80,5 @@ func DataCollections(
|
|||||||
maps.Copy(allExcludes, excludes)
|
maps.Copy(allExcludes, excludes)
|
||||||
}
|
}
|
||||||
|
|
||||||
return collections, allExcludes, nil
|
return collections, allExcludes, et.Err()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,9 +6,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
msdrive "github.com/microsoftgraph/msgraph-sdk-go/drive"
|
msdrive "github.com/microsoftgraph/msgraph-sdk-go/drive"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
|
|
||||||
@ -17,10 +17,11 @@ import (
|
|||||||
gapi "github.com/alcionai/corso/src/internal/connector/graph/api"
|
gapi "github.com/alcionai/corso/src/internal/connector/graph/api"
|
||||||
"github.com/alcionai/corso/src/internal/connector/onedrive/api"
|
"github.com/alcionai/corso/src/internal/connector/onedrive/api"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errFolderNotFound = errors.New("folder not found")
|
var errFolderNotFound = clues.New("folder not found")
|
||||||
|
|
||||||
const (
|
const (
|
||||||
getDrivesRetries = 3
|
getDrivesRetries = 3
|
||||||
@ -77,8 +78,6 @@ func drives(
|
|||||||
retry bool,
|
retry bool,
|
||||||
) ([]models.Driveable, error) {
|
) ([]models.Driveable, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
|
||||||
page gapi.PageLinker
|
|
||||||
numberOfRetries = getDrivesRetries
|
numberOfRetries = getDrivesRetries
|
||||||
drives = []models.Driveable{}
|
drives = []models.Driveable{}
|
||||||
)
|
)
|
||||||
@ -89,30 +88,31 @@ func drives(
|
|||||||
|
|
||||||
// Loop through all pages returned by Graph API.
|
// Loop through all pages returned by Graph API.
|
||||||
for {
|
for {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
page gapi.PageLinker
|
||||||
|
)
|
||||||
|
|
||||||
// Retry Loop for Drive retrieval. Request can timeout
|
// Retry Loop for Drive retrieval. Request can timeout
|
||||||
for i := 0; i <= numberOfRetries; i++ {
|
for i := 0; i <= numberOfRetries; i++ {
|
||||||
page, err = pager.GetPage(ctx)
|
page, err = pager.GetPage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Various error handling. May return an error or perform a retry.
|
// Various error handling. May return an error or perform a retry.
|
||||||
detailedError := support.ConnectorStackErrorTraceWrap(err, "").Error()
|
errMsg := support.ConnectorStackErrorTraceWrap(err, "").Error()
|
||||||
if strings.Contains(detailedError, userMysiteURLNotFound) ||
|
if strings.Contains(errMsg, userMysiteURLNotFound) ||
|
||||||
strings.Contains(detailedError, userMysiteURLNotFoundMsg) ||
|
strings.Contains(errMsg, userMysiteURLNotFoundMsg) ||
|
||||||
strings.Contains(detailedError, userMysiteNotFound) ||
|
strings.Contains(errMsg, userMysiteNotFound) ||
|
||||||
strings.Contains(detailedError, userMysiteNotFoundMsg) {
|
strings.Contains(errMsg, userMysiteNotFoundMsg) {
|
||||||
logger.Ctx(ctx).Infof("resource owner does not have a drive")
|
logger.Ctx(ctx).Infof("resource owner does not have a drive")
|
||||||
return make([]models.Driveable, 0), nil // no license or drives.
|
return make([]models.Driveable, 0), nil // no license or drives.
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.Contains(detailedError, contextDeadlineExceeded) && i < numberOfRetries {
|
if strings.Contains(errMsg, contextDeadlineExceeded) && i < numberOfRetries {
|
||||||
time.Sleep(time.Duration(3*(i+1)) * time.Second)
|
time.Sleep(time.Duration(3*(i+1)) * time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.Wrapf(
|
return nil, clues.Wrap(err, "retrieving drives").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
err,
|
|
||||||
"failed to retrieve drives. details: %s",
|
|
||||||
detailedError,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// No error encountered, break the retry loop so we can extract results
|
// No error encountered, break the retry loop so we can extract results
|
||||||
@ -122,7 +122,7 @@ func drives(
|
|||||||
|
|
||||||
tmp, err := pager.ValuesIn(page)
|
tmp, err := pager.ValuesIn(page)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "extracting drives from response")
|
return nil, clues.Wrap(err, "extracting drives from response").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
drives = append(drives, tmp...)
|
drives = append(drives, tmp...)
|
||||||
@ -135,7 +135,7 @@ func drives(
|
|||||||
pager.SetNext(nextLink)
|
pager.SetNext(nextLink)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Ctx(ctx).Debugf("Found %d drives", len(drives))
|
logger.Ctx(ctx).Debugf("retrieved %d valid drives", len(drives))
|
||||||
|
|
||||||
return drives, nil
|
return drives, nil
|
||||||
}
|
}
|
||||||
@ -150,6 +150,7 @@ type itemCollector func(
|
|||||||
excluded map[string]struct{},
|
excluded map[string]struct{},
|
||||||
fileCollectionMap map[string]string,
|
fileCollectionMap map[string]string,
|
||||||
validPrevDelta bool,
|
validPrevDelta bool,
|
||||||
|
errs *fault.Errors,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
type itemPager interface {
|
type itemPager interface {
|
||||||
@ -195,6 +196,7 @@ func collectItems(
|
|||||||
collector itemCollector,
|
collector itemCollector,
|
||||||
oldPaths map[string]string,
|
oldPaths map[string]string,
|
||||||
prevDelta string,
|
prevDelta string,
|
||||||
|
errs *fault.Errors,
|
||||||
) (DeltaUpdate, map[string]string, map[string]struct{}, error) {
|
) (DeltaUpdate, map[string]string, map[string]struct{}, error) {
|
||||||
var (
|
var (
|
||||||
newDeltaURL = ""
|
newDeltaURL = ""
|
||||||
@ -229,16 +231,14 @@ func collectItems(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeltaUpdate{}, nil, nil, errors.Wrapf(
|
return DeltaUpdate{}, nil, nil, clues.Wrap(err, "getting page").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
err,
|
|
||||||
"failed to query drive items. details: %s",
|
|
||||||
support.ConnectorStackErrorTrace(err),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := pager.ValuesIn(page)
|
vals, err := pager.ValuesIn(page)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeltaUpdate{}, nil, nil, errors.Wrap(err, "extracting items from response")
|
return DeltaUpdate{}, nil, nil, clues.Wrap(err, "extracting items from response").
|
||||||
|
WithClues(ctx).
|
||||||
|
With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collector(
|
err = collector(
|
||||||
@ -251,7 +251,7 @@ func collectItems(
|
|||||||
excluded,
|
excluded,
|
||||||
itemCollection,
|
itemCollection,
|
||||||
invalidPrevDelta,
|
invalidPrevDelta,
|
||||||
)
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeltaUpdate{}, nil, nil, err
|
return DeltaUpdate{}, nil, nil, err
|
||||||
}
|
}
|
||||||
@ -294,27 +294,17 @@ func getFolder(
|
|||||||
)
|
)
|
||||||
|
|
||||||
foundItem, err = builder.Get(ctx, nil)
|
foundItem, err = builder.Get(ctx, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var oDataError *odataerrors.ODataError
|
if graph.IsErrDeletedInFlight(err) {
|
||||||
if errors.As(err, &oDataError) &&
|
return nil, clues.Stack(errFolderNotFound, err).WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
oDataError.GetError() != nil &&
|
|
||||||
oDataError.GetError().GetCode() != nil &&
|
|
||||||
*oDataError.GetError().GetCode() == itemNotFoundErrorCode {
|
|
||||||
return nil, errors.WithStack(errFolderNotFound)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.Wrapf(err,
|
return nil, clues.Wrap(err, "getting folder").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
"failed to get folder %s/%s. details: %s",
|
|
||||||
parentFolderID,
|
|
||||||
folderName,
|
|
||||||
support.ConnectorStackErrorTrace(err),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the item found is a folder, fail the call if not
|
// Check if the item found is a folder, fail the call if not
|
||||||
if foundItem.GetFolder() == nil {
|
if foundItem.GetFolder() == nil {
|
||||||
return nil, errors.WithStack(errFolderNotFound)
|
return nil, clues.Stack(errFolderNotFound).WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return foundItem, nil
|
return foundItem, nil
|
||||||
@ -330,16 +320,11 @@ func createItem(
|
|||||||
// Graph SDK doesn't yet provide a POST method for `/children` so we set the `rawUrl` ourselves as recommended
|
// Graph SDK doesn't yet provide a POST method for `/children` so we set the `rawUrl` ourselves as recommended
|
||||||
// here: https://github.com/microsoftgraph/msgraph-sdk-go/issues/155#issuecomment-1136254310
|
// here: https://github.com/microsoftgraph/msgraph-sdk-go/issues/155#issuecomment-1136254310
|
||||||
rawURL := fmt.Sprintf(itemChildrenRawURLFmt, driveID, parentFolderID)
|
rawURL := fmt.Sprintf(itemChildrenRawURLFmt, driveID, parentFolderID)
|
||||||
|
|
||||||
builder := msdrive.NewItemsRequestBuilder(rawURL, service.Adapter())
|
builder := msdrive.NewItemsRequestBuilder(rawURL, service.Adapter())
|
||||||
|
|
||||||
newItem, err := builder.Post(ctx, newItem, nil)
|
newItem, err := builder.Post(ctx, newItem, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(
|
return nil, clues.Wrap(err, "creating item").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
err,
|
|
||||||
"failed to create item. details: %s",
|
|
||||||
support.ConnectorStackErrorTrace(err),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return newItem, nil
|
return newItem, nil
|
||||||
@ -375,66 +360,72 @@ func GetAllFolders(
|
|||||||
gs graph.Servicer,
|
gs graph.Servicer,
|
||||||
pager drivePager,
|
pager drivePager,
|
||||||
prefix string,
|
prefix string,
|
||||||
|
errs *fault.Errors,
|
||||||
) ([]*Displayable, error) {
|
) ([]*Displayable, error) {
|
||||||
drives, err := drives(ctx, pager, true)
|
drives, err := drives(ctx, pager, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "getting OneDrive folders")
|
return nil, errors.Wrap(err, "getting OneDrive folders")
|
||||||
}
|
}
|
||||||
|
|
||||||
folders := map[string]*Displayable{}
|
var (
|
||||||
|
folders = map[string]*Displayable{}
|
||||||
|
et = errs.Tracker()
|
||||||
|
)
|
||||||
|
|
||||||
for _, d := range drives {
|
for _, d := range drives {
|
||||||
_, _, _, err = collectItems(
|
if et.Err() != nil {
|
||||||
ctx,
|
break
|
||||||
defaultItemPager(
|
}
|
||||||
gs,
|
|
||||||
*d.GetId(),
|
|
||||||
"",
|
|
||||||
),
|
|
||||||
*d.GetId(),
|
|
||||||
*d.GetName(),
|
|
||||||
func(
|
|
||||||
innerCtx context.Context,
|
|
||||||
driveID, driveName string,
|
|
||||||
items []models.DriveItemable,
|
|
||||||
oldPaths map[string]string,
|
|
||||||
newPaths map[string]string,
|
|
||||||
excluded map[string]struct{},
|
|
||||||
itemCollection map[string]string,
|
|
||||||
doNotMergeItems bool,
|
|
||||||
) error {
|
|
||||||
for _, item := range items {
|
|
||||||
// Skip the root item.
|
|
||||||
if item.GetRoot() != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only selecting folders right now, not packages.
|
var (
|
||||||
if item.GetFolder() == nil {
|
id = ptr.Val(d.GetId())
|
||||||
continue
|
name = ptr.Val(d.GetName())
|
||||||
}
|
)
|
||||||
|
|
||||||
if item.GetId() == nil || len(*item.GetId()) == 0 {
|
ictx := clues.Add(ctx, "drive_id", id, "drive_name", name) // TODO: pii
|
||||||
logger.Ctx(ctx).Warn("folder without ID")
|
collector := func(
|
||||||
continue
|
innerCtx context.Context,
|
||||||
}
|
driveID, driveName string,
|
||||||
|
items []models.DriveItemable,
|
||||||
if !strings.HasPrefix(*item.GetName(), prefix) {
|
oldPaths map[string]string,
|
||||||
continue
|
newPaths map[string]string,
|
||||||
}
|
excluded map[string]struct{},
|
||||||
|
itemCollection map[string]string,
|
||||||
// Add the item instead of the folder because the item has more
|
doNotMergeItems bool,
|
||||||
// functionality.
|
errs *fault.Errors,
|
||||||
folders[*item.GetId()] = &Displayable{item}
|
) error {
|
||||||
|
for _, item := range items {
|
||||||
|
// Skip the root item.
|
||||||
|
if item.GetRoot() != nil {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// Only selecting folders right now, not packages.
|
||||||
},
|
if item.GetFolder() == nil {
|
||||||
map[string]string{},
|
continue
|
||||||
"",
|
}
|
||||||
)
|
|
||||||
|
itemID := ptr.Val(item.GetId())
|
||||||
|
if len(itemID) == 0 {
|
||||||
|
logger.Ctx(ctx).Info("folder missing ID")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.HasPrefix(*item.GetName(), prefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the item instead of the folder because the item has more
|
||||||
|
// functionality.
|
||||||
|
folders[itemID] = &Displayable{item}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, _, err = collectItems(ictx, defaultItemPager(gs, id, ""), id, name, collector, map[string]string{}, "", errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "getting items for drive %s", *d.GetName())
|
et.Add(clues.Wrap(err, "enumerating items in drive"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -444,7 +435,7 @@ func GetAllFolders(
|
|||||||
res = append(res, f)
|
res = append(res, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, et.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteItem(
|
func DeleteItem(
|
||||||
@ -455,7 +446,10 @@ func DeleteItem(
|
|||||||
) error {
|
) error {
|
||||||
err := gs.Client().DrivesById(driveID).ItemsById(itemID).Delete(ctx, nil)
|
err := gs.Client().DrivesById(driveID).ItemsById(itemID).Delete(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "deleting item with ID %s", itemID)
|
return clues.Wrap(err, "deleting item").
|
||||||
|
WithClues(ctx).
|
||||||
|
With("item_id", itemID).
|
||||||
|
With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
)
|
)
|
||||||
@ -404,7 +405,7 @@ func (suite *OneDriveSuite) TestCreateGetDeleteFolder() {
|
|||||||
pager, err := PagerForSource(OneDriveSource, gs, suite.userID, nil)
|
pager, err := PagerForSource(OneDriveSource, gs, suite.userID, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
allFolders, err := GetAllFolders(ctx, gs, pager, test.prefix)
|
allFolders, err := GetAllFolders(ctx, gs, pager, test.prefix, fault.New(true))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
foundFolderIDs := []string{}
|
foundFolderIDs := []string{}
|
||||||
@ -472,7 +473,7 @@ func (suite *OneDriveSuite) TestOneDriveNewCollections() {
|
|||||||
service,
|
service,
|
||||||
service.updateStatus,
|
service.updateStatus,
|
||||||
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}},
|
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}},
|
||||||
).Get(ctx, nil)
|
).Get(ctx, nil, fault.New(true))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
// Don't expect excludes as this isn't an incremental backup.
|
// Don't expect excludes as this isn't an incremental backup.
|
||||||
assert.Empty(t, excludes)
|
assert.Empty(t, excludes)
|
||||||
|
|||||||
@ -4,7 +4,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
@ -16,7 +15,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
|
||||||
"github.com/alcionai/corso/src/internal/connector/uploadsession"
|
"github.com/alcionai/corso/src/internal/connector/uploadsession"
|
||||||
"github.com/alcionai/corso/src/internal/version"
|
"github.com/alcionai/corso/src/internal/version"
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
@ -35,7 +33,12 @@ func getDriveItem(
|
|||||||
srv graph.Servicer,
|
srv graph.Servicer,
|
||||||
driveID, itemID string,
|
driveID, itemID string,
|
||||||
) (models.DriveItemable, error) {
|
) (models.DriveItemable, error) {
|
||||||
return srv.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil)
|
di, err := srv.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Wrap(err, "getting item").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return di, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sharePointItemReader will return a io.ReadCloser for the specified item
|
// sharePointItemReader will return a io.ReadCloser for the specified item
|
||||||
@ -130,12 +133,12 @@ func oneDriveItemReader(
|
|||||||
func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, error) {
|
func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, error) {
|
||||||
url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
|
url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("extracting file url: file %s", *item.GetId())
|
return nil, clues.New("extracting file url").With("item_id", ptr.Val(item.GetId()))
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, *url, nil)
|
req, err := http.NewRequest(http.MethodGet, *url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "new request")
|
return nil, clues.Wrap(err, "new request").With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:lll
|
//nolint:lll
|
||||||
@ -168,7 +171,7 @@ func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, e
|
|||||||
return resp, graph.Err503ServiceUnavailable
|
return resp, graph.Err503ServiceUnavailable
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, errors.New("non-2xx http response: " + resp.Status)
|
return resp, clues.Wrap(clues.New(resp.Status), "non-2xx http response")
|
||||||
}
|
}
|
||||||
|
|
||||||
// oneDriveItemInfo will populate a details.OneDriveInfo struct
|
// oneDriveItemInfo will populate a details.OneDriveInfo struct
|
||||||
@ -217,16 +220,19 @@ func oneDriveItemPermissionInfo(
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
id := ptr.Val(di.GetId())
|
||||||
|
|
||||||
perm, err := service.
|
perm, err := service.
|
||||||
Client().
|
Client().
|
||||||
DrivesById(driveID).
|
DrivesById(driveID).
|
||||||
ItemsById(*di.GetId()).
|
ItemsById(id).
|
||||||
Permissions().
|
Permissions().
|
||||||
Get(ctx, nil)
|
Get(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := support.ConnectorStackErrorTrace(err)
|
err = clues.Wrap(err, "fetching item permissions").
|
||||||
err = clues.Wrap(err, "fetching item permissions: "+msg).
|
WithClues(ctx).
|
||||||
With("item_id", *di.GetId())
|
With("item_id", id).
|
||||||
|
With(graph.ErrData(err)...)
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -260,7 +266,7 @@ func filterUserPermissions(perms []models.Permissionable) []UserPermission {
|
|||||||
}
|
}
|
||||||
|
|
||||||
up = append(up, UserPermission{
|
up = append(up, UserPermission{
|
||||||
ID: *p.GetId(),
|
ID: ptr.Val(p.GetId()),
|
||||||
Roles: roles,
|
Roles: roles,
|
||||||
Email: *p.GetGrantedToV2().GetUser().GetAdditionalData()["email"].(*string),
|
Email: *p.GetGrantedToV2().GetUser().GetAdditionalData()["email"].(*string),
|
||||||
Expiration: p.GetExpirationDateTime(),
|
Expiration: p.GetExpirationDateTime(),
|
||||||
@ -328,20 +334,18 @@ func driveItemWriter(
|
|||||||
itemSize int64,
|
itemSize int64,
|
||||||
) (io.Writer, error) {
|
) (io.Writer, error) {
|
||||||
session := msdrives.NewItemItemsItemCreateUploadSessionPostRequestBody()
|
session := msdrives.NewItemItemsItemCreateUploadSessionPostRequestBody()
|
||||||
|
ctx = clues.Add(ctx, "upload_item_id", itemID)
|
||||||
|
|
||||||
r, err := service.Client().DrivesById(driveID).ItemsById(itemID).CreateUploadSession().Post(ctx, session, nil)
|
r, err := service.Client().DrivesById(driveID).ItemsById(itemID).CreateUploadSession().Post(ctx, session, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(
|
return nil, clues.Wrap(err, "creating item upload session").
|
||||||
err,
|
WithClues(ctx).
|
||||||
"failed to create upload session for item %s. details: %s",
|
With(graph.ErrData(err)...)
|
||||||
itemID,
|
|
||||||
support.ConnectorStackErrorTrace(err),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
url := *r.GetUploadUrl()
|
logger.Ctx(ctx).Debug("created an upload session")
|
||||||
|
|
||||||
logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url)
|
url := ptr.Val(r.GetUploadUrl())
|
||||||
|
|
||||||
return uploadsession.NewWriter(itemID, url, itemSize), nil
|
return uploadsession.NewWriter(itemID, url, itemSize), nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,7 +6,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -15,24 +14,14 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/common"
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ItemIntegrationSuite struct {
|
type ItemIntegrationSuite struct {
|
||||||
tester.Suite
|
tester.Suite
|
||||||
// site string
|
|
||||||
// siteDriveID string
|
|
||||||
user string
|
user string
|
||||||
userDriveID string
|
userDriveID string
|
||||||
client *msgraphsdk.GraphServiceClient
|
service graph.Servicer
|
||||||
adapter *msgraphsdk.GraphRequestAdapter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *ItemIntegrationSuite) Client() *msgraphsdk.GraphServiceClient {
|
|
||||||
return suite.client
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *ItemIntegrationSuite) Adapter() *msgraphsdk.GraphRequestAdapter {
|
|
||||||
return suite.adapter
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestItemIntegrationSuite(t *testing.T) {
|
func TestItemIntegrationSuite(t *testing.T) {
|
||||||
@ -51,30 +40,10 @@ func (suite *ItemIntegrationSuite) SetupSuite() {
|
|||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
a := tester.NewM365Account(t)
|
suite.service = loadTestService(t)
|
||||||
m365, err := a.M365Config()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
adapter, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
suite.client = msgraphsdk.NewGraphServiceClient(adapter)
|
|
||||||
suite.adapter = adapter
|
|
||||||
|
|
||||||
// TODO: fulfill file preconditions required for testing (expected files w/in drive
|
|
||||||
// and guarateed drive read-write access)
|
|
||||||
// suite.site = tester.M365SiteID(t)
|
|
||||||
// spDrives, err := drives(ctx, suite, suite.site, SharePointSource)
|
|
||||||
// require.NoError(t, err)
|
|
||||||
// // Test Requirement 1: Need a drive
|
|
||||||
// require.Greaterf(t, len(spDrives), 0, "site %s does not have a drive", suite.site)
|
|
||||||
|
|
||||||
// // Pick the first drive
|
|
||||||
// suite.siteDriveID = *spDrives[0].GetId()
|
|
||||||
|
|
||||||
suite.user = tester.SecondaryM365UserID(t)
|
suite.user = tester.SecondaryM365UserID(t)
|
||||||
|
|
||||||
pager, err := PagerForSource(OneDriveSource, suite, suite.user, nil)
|
pager, err := PagerForSource(OneDriveSource, suite.service, suite.user, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
odDrives, err := drives(ctx, pager, true)
|
odDrives, err := drives(ctx, pager, true)
|
||||||
@ -106,6 +75,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
|
|||||||
excluded map[string]struct{},
|
excluded map[string]struct{},
|
||||||
itemCollection map[string]string,
|
itemCollection map[string]string,
|
||||||
doNotMergeItems bool,
|
doNotMergeItems bool,
|
||||||
|
errs *fault.Errors,
|
||||||
) error {
|
) error {
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
if item.GetFile() != nil {
|
if item.GetFile() != nil {
|
||||||
@ -119,7 +89,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
|
|||||||
_, _, _, err := collectItems(
|
_, _, _, err := collectItems(
|
||||||
ctx,
|
ctx,
|
||||||
defaultItemPager(
|
defaultItemPager(
|
||||||
suite,
|
suite.service,
|
||||||
suite.userDriveID,
|
suite.userDriveID,
|
||||||
"",
|
"",
|
||||||
),
|
),
|
||||||
@ -128,7 +98,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
|
|||||||
itemCollector,
|
itemCollector,
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
"",
|
"",
|
||||||
)
|
fault.New(true))
|
||||||
require.NoError(suite.T(), err)
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
// Test Requirement 2: Need a file
|
// Test Requirement 2: Need a file
|
||||||
@ -173,43 +143,42 @@ func (suite *ItemIntegrationSuite) TestItemWriter() {
|
|||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
|
||||||
|
|
||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
root, err := suite.Client().DrivesById(test.driveID).Root().Get(ctx, nil)
|
t := suite.T()
|
||||||
|
srv := suite.service
|
||||||
|
|
||||||
|
root, err := srv.Client().DrivesById(test.driveID).Root().Get(ctx, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Test Requirement 2: "Test Folder" should exist
|
// Test Requirement 2: "Test Folder" should exist
|
||||||
folder, err := getFolder(ctx, suite, test.driveID, *root.GetId(), "Test Folder")
|
folder, err := getFolder(ctx, srv, test.driveID, *root.GetId(), "Test Folder")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
newFolderName := "testfolder_" + common.FormatNow(common.SimpleTimeTesting)
|
newFolderName := "testfolder_" + common.FormatNow(common.SimpleTimeTesting)
|
||||||
t.Logf("Test will create folder %s", newFolderName)
|
t.Logf("Test will create folder %s", newFolderName)
|
||||||
|
|
||||||
newFolder, err := createItem(ctx, suite, test.driveID, *folder.GetId(), newItem(newFolderName, true))
|
newFolder, err := createItem(ctx, srv, test.driveID, *folder.GetId(), newItem(newFolderName, true))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NotNil(t, newFolder.GetId())
|
require.NotNil(t, newFolder.GetId())
|
||||||
|
|
||||||
newItemName := "testItem_" + common.FormatNow(common.SimpleTimeTesting)
|
newItemName := "testItem_" + common.FormatNow(common.SimpleTimeTesting)
|
||||||
t.Logf("Test will create item %s", newItemName)
|
t.Logf("Test will create item %s", newItemName)
|
||||||
|
|
||||||
newItem, err := createItem(ctx, suite, test.driveID, *newFolder.GetId(), newItem(newItemName, false))
|
newItem, err := createItem(ctx, srv, test.driveID, *newFolder.GetId(), newItem(newItemName, false))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NotNil(t, newItem.GetId())
|
require.NotNil(t, newItem.GetId())
|
||||||
|
|
||||||
// HACK: Leveraging this to test getFolder behavior for a file. `getFolder()` on the
|
// HACK: Leveraging this to test getFolder behavior for a file. `getFolder()` on the
|
||||||
// newly created item should fail because it's a file not a folder
|
// newly created item should fail because it's a file not a folder
|
||||||
_, err = getFolder(ctx, suite, test.driveID, *newFolder.GetId(), newItemName)
|
_, err = getFolder(ctx, srv, test.driveID, *newFolder.GetId(), newItemName)
|
||||||
require.ErrorIs(t, err, errFolderNotFound)
|
require.ErrorIs(t, err, errFolderNotFound)
|
||||||
|
|
||||||
// Initialize a 100KB mockDataProvider
|
// Initialize a 100KB mockDataProvider
|
||||||
td, writeSize := mockDataReader(int64(100 * 1024))
|
td, writeSize := mockDataReader(int64(100 * 1024))
|
||||||
|
|
||||||
w, err := driveItemWriter(ctx, suite, test.driveID, *newItem.GetId(), writeSize)
|
w, err := driveItemWriter(ctx, srv, test.driveID, *newItem.GetId(), writeSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Using a 32 KB buffer for the copy allows us to validate the
|
// Using a 32 KB buffer for the copy allows us to validate the
|
||||||
@ -246,20 +215,21 @@ func (suite *ItemIntegrationSuite) TestDriveGetFolder() {
|
|||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
|
||||||
|
|
||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
root, err := suite.Client().DrivesById(test.driveID).Root().Get(ctx, nil)
|
t := suite.T()
|
||||||
|
srv := suite.service
|
||||||
|
|
||||||
|
root, err := srv.Client().DrivesById(test.driveID).Root().Get(ctx, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Lookup a folder that doesn't exist
|
// Lookup a folder that doesn't exist
|
||||||
_, err = getFolder(ctx, suite, test.driveID, *root.GetId(), "FolderDoesNotExist")
|
_, err = getFolder(ctx, srv, test.driveID, *root.GetId(), "FolderDoesNotExist")
|
||||||
require.ErrorIs(t, err, errFolderNotFound)
|
require.ErrorIs(t, err, errFolderNotFound)
|
||||||
|
|
||||||
// Lookup a folder that does exist
|
// Lookup a folder that does exist
|
||||||
_, err = getFolder(ctx, suite, test.driveID, *root.GetId(), "")
|
_, err = getFolder(ctx, srv, test.driveID, *root.GetId(), "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -126,9 +126,13 @@ func createRestoreFoldersWithPermissions(
|
|||||||
// cannot avoid storing the nested permissions as it is possible that
|
// cannot avoid storing the nested permissions as it is possible that
|
||||||
// a file in a folder can remove the nested permission that is present
|
// a file in a folder can remove the nested permission that is present
|
||||||
// on itself.
|
// on itself.
|
||||||
func getChildPermissions(childPermissions, parentPermissions []UserPermission) ([]UserPermission, []UserPermission) {
|
func getChildPermissions(
|
||||||
addedPermissions := []UserPermission{}
|
childPermissions, parentPermissions []UserPermission,
|
||||||
removedPermissions := []UserPermission{}
|
) ([]UserPermission, []UserPermission) {
|
||||||
|
var (
|
||||||
|
addedPermissions = []UserPermission{}
|
||||||
|
removedPermissions = []UserPermission{}
|
||||||
|
)
|
||||||
|
|
||||||
for _, cp := range childPermissions {
|
for _, cp := range childPermissions {
|
||||||
found := false
|
found := false
|
||||||
|
|||||||
@ -242,11 +242,11 @@ func (suite *SharePointCollectionSuite) TestRestoreLocation() {
|
|||||||
service := createTestService(t, suite.creds)
|
service := createTestService(t, suite.creds)
|
||||||
rootFolder := "General_" + common.FormatNow(common.SimpleTimeTesting)
|
rootFolder := "General_" + common.FormatNow(common.SimpleTimeTesting)
|
||||||
folderID, err := createRestoreFolders(ctx, service, suite.siteID, []string{rootFolder})
|
folderID, err := createRestoreFolders(ctx, service, suite.siteID, []string{rootFolder})
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Log("FolderID: " + folderID)
|
t.Log("FolderID: " + folderID)
|
||||||
|
|
||||||
_, err = createRestoreFolders(ctx, service, suite.siteID, []string{rootFolder, "Tsao"})
|
_, err = createRestoreFolders(ctx, service, suite.siteID, []string{rootFolder, "Tsao"})
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// CleanUp
|
// CleanUp
|
||||||
siteDrive, err := service.Client().SitesById(suite.siteID).Drive().Get(ctx, nil)
|
siteDrive, err := service.Client().SitesById(suite.siteID).Drive().Get(ctx, nil)
|
||||||
|
|||||||
@ -87,7 +87,8 @@ func DataCollections(
|
|||||||
site,
|
site,
|
||||||
scope,
|
scope,
|
||||||
su,
|
su,
|
||||||
ctrlOpts)
|
ctrlOpts,
|
||||||
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
et.Add(err)
|
et.Add(err)
|
||||||
continue
|
continue
|
||||||
@ -169,6 +170,7 @@ func collectLibraries(
|
|||||||
scope selectors.SharePointScope,
|
scope selectors.SharePointScope,
|
||||||
updater statusUpdater,
|
updater statusUpdater,
|
||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
|
errs *fault.Errors,
|
||||||
) ([]data.BackupCollection, map[string]struct{}, error) {
|
) ([]data.BackupCollection, map[string]struct{}, error) {
|
||||||
logger.Ctx(ctx).Debug("creating SharePoint Library collections")
|
logger.Ctx(ctx).Debug("creating SharePoint Library collections")
|
||||||
|
|
||||||
@ -187,7 +189,7 @@ func collectLibraries(
|
|||||||
|
|
||||||
// TODO(ashmrtn): Pass previous backup metadata when SharePoint supports delta
|
// TODO(ashmrtn): Pass previous backup metadata when SharePoint supports delta
|
||||||
// token-based incrementals.
|
// token-based incrementals.
|
||||||
odcs, excludes, err := colls.Get(ctx, nil)
|
odcs, excludes, err := colls.Get(ctx, nil, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, clues.Wrap(err, "getting library").WithClues(ctx).With(graph.ErrData(err)...)
|
return nil, nil, clues.Wrap(err, "getting library").WithClues(ctx).With(graph.ErrData(err)...)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -117,7 +117,7 @@ func (suite *SharePointLibrariesSuite) TestUpdateCollections() {
|
|||||||
excluded,
|
excluded,
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
true,
|
true,
|
||||||
)
|
fault.New(true))
|
||||||
test.expect(t, err)
|
test.expect(t, err)
|
||||||
assert.Equal(t, len(test.expectedCollectionIDs), len(c.CollectionMap), "collection paths")
|
assert.Equal(t, len(test.expectedCollectionIDs), len(c.CollectionMap), "collection paths")
|
||||||
assert.Equal(t, test.expectedItemCount, c.NumItems, "item count")
|
assert.Equal(t, test.expectedItemCount, c.NumItems, "item count")
|
||||||
|
|||||||
@ -247,7 +247,14 @@ func Seed(ctx context.Context, lvl, logfile string) (context.Context, *zap.Sugar
|
|||||||
func SeedLevel(ctx context.Context, level logLevel) (context.Context, *zap.SugaredLogger) {
|
func SeedLevel(ctx context.Context, level logLevel) (context.Context, *zap.SugaredLogger) {
|
||||||
l := ctx.Value(ctxKey)
|
l := ctx.Value(ctxKey)
|
||||||
if l == nil {
|
if l == nil {
|
||||||
zsl := singleton(level, defaultLogLocation())
|
logfile := os.Getenv("CORSO_LOG_FILE")
|
||||||
|
|
||||||
|
if len(logfile) == 0 {
|
||||||
|
logfile = defaultLogLocation()
|
||||||
|
}
|
||||||
|
|
||||||
|
zsl := singleton(level, logfile)
|
||||||
|
|
||||||
return Set(ctx, zsl), zsl
|
return Set(ctx, zsl), zsl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user