From cfe65499870527b72133c35f384c8ede19f250f0 Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Wed, 11 Jan 2023 19:50:26 -0800 Subject: [PATCH] Re-Enable Kopia-assisted incrementals for OneDrive (#2126) ## Description This addresses the deadlock in the item progress reader by deferring the reader creation to when the first read is issued for the item ## Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [x] :clock1: Yes, but in a later PR - [ ] :no_entry: No ## Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup ## Issue(s) * #1702 ## Test Plan - [x] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- src/go.mod | 1 + src/go.sum | 2 ++ src/internal/connector/onedrive/collection.go | 26 ++++++++++--------- .../connector/onedrive/collection_test.go | 15 ++++++++--- src/internal/operations/backup.go | 5 ++++ src/pkg/backup/details/details.go | 4 +-- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/go.mod b/src/go.mod index 6634c617b..7a55c7c13 100644 --- a/src/go.mod +++ b/src/go.mod @@ -19,6 +19,7 @@ require ( github.com/microsoftgraph/msgraph-sdk-go-core v0.31.1 github.com/pkg/errors v0.9.1 github.com/rudderlabs/analytics-go v3.3.3+incompatible + github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1 github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.14.0 diff --git a/src/go.sum b/src/go.sum index be4a51dde..f17de6887 100644 --- a/src/go.sum +++ b/src/go.sum @@ -357,6 +357,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1 h1:lQ3JvmcVO1/AMFbabvUSJ4YtJRpEAX9Qza73p5j03sw= +github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1/go.mod h1:4aKqcbhASNqjbrG0h9BmkzcWvPJGxbef4B+j0XfFrZo= github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw= github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 1f985162b..22ac8d746 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -9,6 +9,7 @@ import ( "time" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" @@ -34,11 +35,10 @@ const ( ) var ( - _ data.Collection = &Collection{} - _ data.Stream = &Item{} - _ data.StreamInfo = &Item{} - // TODO(ashmrtn): Uncomment when #1702 is resolved. - //_ data.StreamModTime = &Item{} + _ data.Collection = &Collection{} + _ data.Stream = &Item{} + _ data.StreamInfo = &Item{} + _ data.StreamModTime = &Item{} ) // Collection represents a set of OneDrive objects retrieved from M365 @@ -158,10 +158,9 @@ func (od *Item) Info() details.ItemInfo { return od.info } -// TODO(ashmrtn): Uncomment when #1702 is resolved. -//func (od *Item) ModTime() time.Time { -// return od.info.Modified -//} +func (od *Item) ModTime() time.Time { + return od.info.Modified() +} // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item @@ -253,8 +252,11 @@ func (oc *Collection) populateItems(ctx context.Context) { itemSize = itemInfo.OneDrive.Size } - progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemName, itemSize) - go closer() + itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemName, itemSize) + go closer() + return progReader, nil + }) // Item read successfully, add to collection atomic.AddInt64(&itemsRead, 1) @@ -263,7 +265,7 @@ func (oc *Collection) populateItems(ctx context.Context) { oc.data <- &Item{ id: itemName, - data: progReader, + data: itemReader, info: itemInfo, } folderProgress <- struct{}{} diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index e55675618..a19021ff7 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -7,6 +7,7 @@ import ( "io" "sync" "testing" + "time" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -58,6 +59,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { testItemID = "fakeItemID" testItemName = "itemName" testItemData = []byte("testdata") + now = time.Now() ) table := []struct { @@ -72,7 +74,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { numInstances: 1, source: OneDriveSource, itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { - return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}}, + return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil }, @@ -86,7 +88,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { numInstances: 3, source: OneDriveSource, itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { - return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}}, + return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil }, @@ -100,7 +102,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { numInstances: 1, source: SharePointSource, itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { - return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}}, + return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil }, @@ -114,7 +116,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { numInstances: 3, source: SharePointSource, itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { - return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}}, + return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil }, @@ -176,6 +178,11 @@ func (suite *CollectionUnitTestSuite) TestCollection() { readItemInfo := readItem.(data.StreamInfo) assert.Equal(t, testItemName, readItem.UUID()) + + require.Implements(t, (*data.StreamModTime)(nil), readItem) + mt := readItem.(data.StreamModTime) + assert.Equal(t, now, mt.ModTime()) + readData, err := io.ReadAll(readItem.ToReader()) require.NoError(t, err) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index d4d3056a3..f2e68aec5 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -218,6 +218,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { // checker to see if conditions are correct for incremental backup behavior such as // retrieving metadata like delta tokens and previous paths. func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool { + // Delta-based incrementals currently only supported for Exchange + if sel.Service != selectors.ServiceExchange { + return false + } + return !opts.ToggleFeatures.DisableIncrementals } diff --git a/src/pkg/backup/details/details.go b/src/pkg/backup/details/details.go index d407ca690..f244c72c9 100644 --- a/src/pkg/backup/details/details.go +++ b/src/pkg/backup/details/details.go @@ -173,7 +173,7 @@ func (b *Builder) AddFoldersForItem(folders []folderEntry, itemInfo ItemInfo, up } // Update the folder's size and modified time - itemModified := itemInfo.modified() + itemModified := itemInfo.Modified() folder.Info.Folder.Size += itemInfo.size() @@ -381,7 +381,7 @@ func (i ItemInfo) size() int64 { return 0 } -func (i ItemInfo) modified() time.Time { +func (i ItemInfo) Modified() time.Time { switch { case i.Exchange != nil: return i.Exchange.Modified