diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index 95af07e21..0cdf79c0e 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -35,9 +35,6 @@ const ( var ( _ data.BackupCollection = &Collection{} - _ data.Item = &Item{} - _ data.ItemInfo = &Item{} - _ data.ItemModTime = &Item{} _ data.Item = &metadata.Item{} _ data.ItemModTime = &metadata.Item{} ) @@ -244,22 +241,6 @@ func (oc Collection) DoNotMergeItems() bool { return oc.doNotMergeItems } -// Item represents a single item retrieved from OneDrive -type Item struct { - id string - data io.ReadCloser - info details.ItemInfo -} - -// Deleted implements an interface function. However, OneDrive items are marked -// as deleted by adding them to the exclude list so this can always return -// false. -func (i Item) Deleted() bool { return false } -func (i *Item) ID() string { return i.id } -func (i *Item) ToReader() io.ReadCloser { return i.data } -func (i *Item) Info() (details.ItemInfo, error) { return i.info, nil } -func (i *Item) ModTime() time.Time { return i.info.Modified() } - // getDriveItemContent fetch drive item's contents with retries func (oc *Collection) getDriveItemContent( ctx context.Context, @@ -470,6 +451,54 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) { oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount) } +type lazyItemGetter struct { + info *details.ItemInfo + item models.DriveItemable + driveID string + suffix string + itemExtensionFactory []extensions.CreateItemExtensioner + contentGetter func( + ctx context.Context, + driveID string, + item models.DriveItemable, + errs *fault.Bus) (io.ReadCloser, error) +} + +func (lig *lazyItemGetter) GetData( + ctx context.Context, + errs *fault.Bus, +) (io.ReadCloser, *details.ItemInfo, bool, error) { + rc, err := lig.contentGetter(ctx, lig.driveID, lig.item, errs) + if err != nil { + return nil, nil, false, clues.Stack(err) + } + + extRc, extData, err := extensions.AddItemExtensions( + ctx, + rc, + *lig.info, + lig.itemExtensionFactory) + if err != nil { + err := clues.Wrap(err, "adding extensions"). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation) + + return nil, nil, false, err + } + + lig.info.Extension.Data = extData.Data + + // display/log the item download + progReader, _ := observe.ItemProgress( + ctx, + extRc, + observe.ItemBackupMsg, + clues.Hide(ptr.Val(lig.item.GetName())+lig.suffix), + ptr.Val(lig.item.GetSize())) + + return progReader, lig.info, false, nil +} + func (oc *Collection) streamDriveItem( ctx context.Context, parentPath *path.Builder, @@ -530,45 +559,23 @@ func (oc *Collection) streamDriveItem( if isFile { dataSuffix := metadata.DataFileSuffix - // Construct a new lazy readCloser to feed to the collection consumer. + // Use a LazyItem to feed to the collection consumer. // This ensures that downloads won't be attempted unless that consumer // attempts to read bytes. Assumption is that kopia will check things // like file modtimes before attempting to read. - itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - rc, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs) - if err != nil { - return nil, err - } - - extRc, extData, err := extensions.AddItemExtensions( - ctx, - rc, - itemInfo, - itemExtensionFactory) - if err != nil { - err := clues.Wrap(err, "adding extensions").Label(fault.LabelForceNoBackupCreation) - errs.AddRecoverable(ctx, err) - return nil, err - } - - itemInfo.Extension.Data = extData.Data - - // display/log the item download - progReader, _ := observe.ItemProgress( - ctx, - extRc, - observe.ItemBackupMsg, - clues.Hide(itemName+dataSuffix), - itemSize) - - return progReader, nil - }) - - oc.data <- &Item{ - id: itemID + dataSuffix, - data: itemReader, - info: itemInfo, - } + oc.data <- data.NewLazyItem( + ctx, + &lazyItemGetter{ + info: &itemInfo, + item: item, + driveID: oc.driveID, + itemExtensionFactory: itemExtensionFactory, + contentGetter: oc.getDriveItemContent, + suffix: dataSuffix, + }, + itemID+dataSuffix, + itemInfo.Modified(), + errs) } metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { diff --git a/src/internal/m365/collection/drive/collection_test.go b/src/internal/m365/collection/drive/collection_test.go index ad2c9cc93..b99a2484e 100644 --- a/src/internal/m365/collection/drive/collection_test.go +++ b/src/internal/m365/collection/drive/collection_test.go @@ -981,15 +981,17 @@ func (suite *CollectionUnitSuite) TestItemExtensions() { ei, ok := collItem.(data.ItemInfo) assert.True(t, ok) - itemInfo, err := ei.Info() - require.NoError(t, err, clues.ToCore(err)) + r := collItem.ToReader() - _, err = io.ReadAll(collItem.ToReader()) + _, err = io.ReadAll(r) test.expectReadErr(t, err, clues.ToCore(err)) - err = collItem.ToReader().Close() + err = r.Close() test.expectCloseErr(t, err, clues.ToCore(err)) + itemInfo, err := ei.Info() + require.NoError(t, err, clues.ToCore(err)) + // Verify extension data test.expect(t, itemInfo, test.payload) }) diff --git a/src/internal/m365/helper_test.go b/src/internal/m365/helper_test.go index d22603823..d6b7c256c 100644 --- a/src/internal/m365/helper_test.go +++ b/src/internal/m365/helper_test.go @@ -17,7 +17,6 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/m365/collection/drive" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub" m365Stub "github.com/alcionai/corso/src/internal/m365/stub" @@ -731,7 +730,7 @@ func compareDriveItem( ) if !isMeta { - oitem := item.(*drive.Item) + oitem := item.(data.ItemInfo) info, err := oitem.Info() if !assert.NoError(t, err, clues.ToCore(err)) {