Re-Enable Kopia-assisted incrementals for OneDrive (#2126)

## Description

This addresses the deadlock in the item progress reader by deferring the reader creation to
when the first read is issued for the item

## Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No 

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #1702 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Vaibhav Kamra 2023-01-11 19:50:26 -08:00 committed by GitHub
parent 4b1641e978
commit cfe6549987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 35 additions and 18 deletions

View File

@ -19,6 +19,7 @@ require (
github.com/microsoftgraph/msgraph-sdk-go-core v0.31.1
github.com/pkg/errors v0.9.1
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.14.0

View File

@ -357,6 +357,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1 h1:lQ3JvmcVO1/AMFbabvUSJ4YtJRpEAX9Qza73p5j03sw=
github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1/go.mod h1:4aKqcbhASNqjbrG0h9BmkzcWvPJGxbef4B+j0XfFrZo=
github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw=
github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/spatialcurrent/go-lazy/pkg/lazy"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
@ -37,8 +38,7 @@ var (
_ data.Collection = &Collection{}
_ data.Stream = &Item{}
_ data.StreamInfo = &Item{}
// TODO(ashmrtn): Uncomment when #1702 is resolved.
//_ data.StreamModTime = &Item{}
_ data.StreamModTime = &Item{}
)
// Collection represents a set of OneDrive objects retrieved from M365
@ -158,10 +158,9 @@ func (od *Item) Info() details.ItemInfo {
return od.info
}
// TODO(ashmrtn): Uncomment when #1702 is resolved.
//func (od *Item) ModTime() time.Time {
// return od.info.Modified
//}
func (od *Item) ModTime() time.Time {
return od.info.Modified()
}
// populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item
@ -253,8 +252,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
itemSize = itemInfo.OneDrive.Size
}
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemName, itemSize)
go closer()
return progReader, nil
})
// Item read successfully, add to collection
atomic.AddInt64(&itemsRead, 1)
@ -263,7 +265,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
oc.data <- &Item{
id: itemName,
data: progReader,
data: itemReader,
info: itemInfo,
}
folderProgress <- struct{}{}

View File

@ -7,6 +7,7 @@ import (
"io"
"sync"
"testing"
"time"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -58,6 +59,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
testItemID = "fakeItemID"
testItemName = "itemName"
testItemData = []byte("testdata")
now = time.Now()
)
table := []struct {
@ -72,7 +74,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
numInstances: 1,
source: OneDriveSource,
itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}},
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)),
nil
},
@ -86,7 +88,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
numInstances: 3,
source: OneDriveSource,
itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}},
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)),
nil
},
@ -100,7 +102,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
numInstances: 1,
source: SharePointSource,
itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}},
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)),
nil
},
@ -114,7 +116,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
numInstances: 3,
source: SharePointSource,
itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}},
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)),
nil
},
@ -176,6 +178,11 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
readItemInfo := readItem.(data.StreamInfo)
assert.Equal(t, testItemName, readItem.UUID())
require.Implements(t, (*data.StreamModTime)(nil), readItem)
mt := readItem.(data.StreamModTime)
assert.Equal(t, now, mt.ModTime())
readData, err := io.ReadAll(readItem.ToReader())
require.NoError(t, err)

View File

@ -218,6 +218,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
// checker to see if conditions are correct for incremental backup behavior such as
// retrieving metadata like delta tokens and previous paths.
func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool {
// Delta-based incrementals currently only supported for Exchange
if sel.Service != selectors.ServiceExchange {
return false
}
return !opts.ToggleFeatures.DisableIncrementals
}

View File

@ -173,7 +173,7 @@ func (b *Builder) AddFoldersForItem(folders []folderEntry, itemInfo ItemInfo, up
}
// Update the folder's size and modified time
itemModified := itemInfo.modified()
itemModified := itemInfo.Modified()
folder.Info.Folder.Size += itemInfo.size()
@ -381,7 +381,7 @@ func (i ItemInfo) size() int64 {
return 0
}
func (i ItemInfo) modified() time.Time {
func (i ItemInfo) Modified() time.Time {
switch {
case i.Exchange != nil:
return i.Exchange.Modified