Use generic item in Drive backup (#4357)
Use generic lazy item implementation for drive backups. This is a bit of a stop-gap implementation since the drive logic is more complex than exchange logic Refactoring the drive logic to streamline it would help reduce the number of functions called to download a single item etc. --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change - [ ] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [x] 🧹 Tech Debt/Cleanup #### Issue(s) * #4191 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
a91c823e42
commit
5c4419fdc0
@ -35,9 +35,6 @@ const (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
_ data.BackupCollection = &Collection{}
|
_ data.BackupCollection = &Collection{}
|
||||||
_ data.Item = &Item{}
|
|
||||||
_ data.ItemInfo = &Item{}
|
|
||||||
_ data.ItemModTime = &Item{}
|
|
||||||
_ data.Item = &metadata.Item{}
|
_ data.Item = &metadata.Item{}
|
||||||
_ data.ItemModTime = &metadata.Item{}
|
_ data.ItemModTime = &metadata.Item{}
|
||||||
)
|
)
|
||||||
@ -244,22 +241,6 @@ func (oc Collection) DoNotMergeItems() bool {
|
|||||||
return oc.doNotMergeItems
|
return oc.doNotMergeItems
|
||||||
}
|
}
|
||||||
|
|
||||||
// Item represents a single item retrieved from OneDrive
|
|
||||||
type Item struct {
|
|
||||||
id string
|
|
||||||
data io.ReadCloser
|
|
||||||
info details.ItemInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deleted implements an interface function. However, OneDrive items are marked
|
|
||||||
// as deleted by adding them to the exclude list so this can always return
|
|
||||||
// false.
|
|
||||||
func (i Item) Deleted() bool { return false }
|
|
||||||
func (i *Item) ID() string { return i.id }
|
|
||||||
func (i *Item) ToReader() io.ReadCloser { return i.data }
|
|
||||||
func (i *Item) Info() (details.ItemInfo, error) { return i.info, nil }
|
|
||||||
func (i *Item) ModTime() time.Time { return i.info.Modified() }
|
|
||||||
|
|
||||||
// getDriveItemContent fetch drive item's contents with retries
|
// getDriveItemContent fetch drive item's contents with retries
|
||||||
func (oc *Collection) getDriveItemContent(
|
func (oc *Collection) getDriveItemContent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -470,6 +451,54 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount)
|
oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type lazyItemGetter struct {
|
||||||
|
info *details.ItemInfo
|
||||||
|
item models.DriveItemable
|
||||||
|
driveID string
|
||||||
|
suffix string
|
||||||
|
itemExtensionFactory []extensions.CreateItemExtensioner
|
||||||
|
contentGetter func(
|
||||||
|
ctx context.Context,
|
||||||
|
driveID string,
|
||||||
|
item models.DriveItemable,
|
||||||
|
errs *fault.Bus) (io.ReadCloser, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lig *lazyItemGetter) GetData(
|
||||||
|
ctx context.Context,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) (io.ReadCloser, *details.ItemInfo, bool, error) {
|
||||||
|
rc, err := lig.contentGetter(ctx, lig.driveID, lig.item, errs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, false, clues.Stack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
extRc, extData, err := extensions.AddItemExtensions(
|
||||||
|
ctx,
|
||||||
|
rc,
|
||||||
|
*lig.info,
|
||||||
|
lig.itemExtensionFactory)
|
||||||
|
if err != nil {
|
||||||
|
err := clues.Wrap(err, "adding extensions").
|
||||||
|
WithClues(ctx).
|
||||||
|
Label(fault.LabelForceNoBackupCreation)
|
||||||
|
|
||||||
|
return nil, nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lig.info.Extension.Data = extData.Data
|
||||||
|
|
||||||
|
// display/log the item download
|
||||||
|
progReader, _ := observe.ItemProgress(
|
||||||
|
ctx,
|
||||||
|
extRc,
|
||||||
|
observe.ItemBackupMsg,
|
||||||
|
clues.Hide(ptr.Val(lig.item.GetName())+lig.suffix),
|
||||||
|
ptr.Val(lig.item.GetSize()))
|
||||||
|
|
||||||
|
return progReader, lig.info, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (oc *Collection) streamDriveItem(
|
func (oc *Collection) streamDriveItem(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
parentPath *path.Builder,
|
parentPath *path.Builder,
|
||||||
@ -530,45 +559,23 @@ func (oc *Collection) streamDriveItem(
|
|||||||
if isFile {
|
if isFile {
|
||||||
dataSuffix := metadata.DataFileSuffix
|
dataSuffix := metadata.DataFileSuffix
|
||||||
|
|
||||||
// Construct a new lazy readCloser to feed to the collection consumer.
|
// Use a LazyItem to feed to the collection consumer.
|
||||||
// This ensures that downloads won't be attempted unless that consumer
|
// This ensures that downloads won't be attempted unless that consumer
|
||||||
// attempts to read bytes. Assumption is that kopia will check things
|
// attempts to read bytes. Assumption is that kopia will check things
|
||||||
// like file modtimes before attempting to read.
|
// like file modtimes before attempting to read.
|
||||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
oc.data <- data.NewLazyItem(
|
||||||
rc, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
ctx,
|
||||||
if err != nil {
|
&lazyItemGetter{
|
||||||
return nil, err
|
info: &itemInfo,
|
||||||
}
|
item: item,
|
||||||
|
driveID: oc.driveID,
|
||||||
extRc, extData, err := extensions.AddItemExtensions(
|
itemExtensionFactory: itemExtensionFactory,
|
||||||
ctx,
|
contentGetter: oc.getDriveItemContent,
|
||||||
rc,
|
suffix: dataSuffix,
|
||||||
itemInfo,
|
},
|
||||||
itemExtensionFactory)
|
itemID+dataSuffix,
|
||||||
if err != nil {
|
itemInfo.Modified(),
|
||||||
err := clues.Wrap(err, "adding extensions").Label(fault.LabelForceNoBackupCreation)
|
errs)
|
||||||
errs.AddRecoverable(ctx, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
itemInfo.Extension.Data = extData.Data
|
|
||||||
|
|
||||||
// display/log the item download
|
|
||||||
progReader, _ := observe.ItemProgress(
|
|
||||||
ctx,
|
|
||||||
extRc,
|
|
||||||
observe.ItemBackupMsg,
|
|
||||||
clues.Hide(itemName+dataSuffix),
|
|
||||||
itemSize)
|
|
||||||
|
|
||||||
return progReader, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
oc.data <- &Item{
|
|
||||||
id: itemID + dataSuffix,
|
|
||||||
data: itemReader,
|
|
||||||
info: itemInfo,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
|||||||
@ -981,15 +981,17 @@ func (suite *CollectionUnitSuite) TestItemExtensions() {
|
|||||||
ei, ok := collItem.(data.ItemInfo)
|
ei, ok := collItem.(data.ItemInfo)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
|
|
||||||
itemInfo, err := ei.Info()
|
r := collItem.ToReader()
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
|
||||||
|
|
||||||
_, err = io.ReadAll(collItem.ToReader())
|
_, err = io.ReadAll(r)
|
||||||
test.expectReadErr(t, err, clues.ToCore(err))
|
test.expectReadErr(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
err = collItem.ToReader().Close()
|
err = r.Close()
|
||||||
test.expectCloseErr(t, err, clues.ToCore(err))
|
test.expectCloseErr(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
itemInfo, err := ei.Info()
|
||||||
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
// Verify extension data
|
// Verify extension data
|
||||||
test.expect(t, itemInfo, test.payload)
|
test.expect(t, itemInfo, test.payload)
|
||||||
})
|
})
|
||||||
|
|||||||
@ -17,7 +17,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/internal/m365/collection/drive"
|
|
||||||
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
|
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
|
||||||
odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub"
|
odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub"
|
||||||
m365Stub "github.com/alcionai/corso/src/internal/m365/stub"
|
m365Stub "github.com/alcionai/corso/src/internal/m365/stub"
|
||||||
@ -731,7 +730,7 @@ func compareDriveItem(
|
|||||||
)
|
)
|
||||||
|
|
||||||
if !isMeta {
|
if !isMeta {
|
||||||
oitem := item.(*drive.Item)
|
oitem := item.(data.ItemInfo)
|
||||||
|
|
||||||
info, err := oitem.Info()
|
info, err := oitem.Info()
|
||||||
if !assert.NoError(t, err, clues.ToCore(err)) {
|
if !assert.NoError(t, err, clues.ToCore(err)) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user