Integrate extensions into drive item reader (#3750)
<!-- PR description--> * Integrates corso extensions into onedrive/sharepoint library item backup flows. * Also includes integration tests for OD/SP, unit tests for collections. * Includes a small fix for `rw *backupStreamReader Close()` Remaining things which will be covered in later PRs: * extension tests with incremental backups * Observability related changes for extensions --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [x] 🕐 Yes, but in a later PR - [ ] ⛔ No #### Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * internal #### Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
5ea194dc87
commit
83e56ed396
@ -72,11 +72,16 @@ func (rw *backupStreamReader) Close() error {
|
||||
|
||||
rw.combined = nil
|
||||
|
||||
var outerErr error
|
||||
|
||||
for _, r := range rw.readers {
|
||||
r.Close()
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
outerErr = clues.Stack(err, clues.New("closing reader"))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return outerErr
|
||||
}
|
||||
|
||||
// restoreStreamReader is a wrapper around the io.Reader that kopia returns when
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/extensions"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
@ -405,10 +406,10 @@ func readItemContents(
|
||||
|
||||
type driveStats struct {
|
||||
dirsRead int64
|
||||
itemsRead int64
|
||||
byteCount int64
|
||||
itemsFound int64
|
||||
dirsFound int64
|
||||
byteCount int64
|
||||
itemsRead int64
|
||||
itemsFound int64
|
||||
}
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
@ -459,6 +460,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||
parentPath,
|
||||
item,
|
||||
&stats,
|
||||
oc.ctrl.ItemExtensionFactory,
|
||||
errs)
|
||||
|
||||
folderProgress <- struct{}{}
|
||||
@ -475,6 +477,7 @@ func (oc *Collection) populateDriveItem(
|
||||
parentPath *path.Builder,
|
||||
item models.DriveItemable,
|
||||
stats *driveStats,
|
||||
itemExtensionFactory []extensions.CreateItemExtensioner,
|
||||
errs *fault.Bus,
|
||||
) {
|
||||
var (
|
||||
@ -531,11 +534,30 @@ func (oc *Collection) populateDriveItem(
|
||||
// attempts to read bytes. Assumption is that kopia will check things
|
||||
// like file modtimes before attempting to read.
|
||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
||||
rc, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
itemData := rc
|
||||
|
||||
// Add per item extensions if available
|
||||
if len(itemExtensionFactory) > 0 {
|
||||
extRc, extData, err := extensions.AddItemExtensions(
|
||||
ctx,
|
||||
rc,
|
||||
itemInfo,
|
||||
itemExtensionFactory)
|
||||
if err != nil {
|
||||
err := clues.Wrap(err, "adding extensions").Label(fault.LabelForceNoBackupCreation)
|
||||
el.AddRecoverable(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
itemData = extRc
|
||||
itemInfo.Extension.Data = extData.Data
|
||||
}
|
||||
|
||||
// display/log the item download
|
||||
progReader, _ := observe.ItemProgress(
|
||||
ctx,
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
@ -28,6 +29,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/extensions"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
@ -190,7 +192,12 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
||||
mbh.ItemInfo.OneDrive.ItemName = stubItemName
|
||||
}
|
||||
|
||||
mbh.GetResps = []*http.Response{{StatusCode: http.StatusOK, Body: test.getBody}}
|
||||
mbh.GetResps = []*http.Response{
|
||||
{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: test.getBody,
|
||||
},
|
||||
}
|
||||
mbh.GetErrs = []error{test.getErr}
|
||||
mbh.GI = mock.GetsItem{Err: assert.AnError}
|
||||
|
||||
@ -771,3 +778,232 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *CollectionUnitTestSuite) TestItemExtensions() {
|
||||
type verifyExtensionOutput func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
)
|
||||
|
||||
var (
|
||||
t = suite.T()
|
||||
stubItemID = "itemID"
|
||||
stubItemName = "name"
|
||||
driveID = "driveID"
|
||||
collStatus = support.ControllerOperationStatus{}
|
||||
wg = sync.WaitGroup{}
|
||||
now = time.Now()
|
||||
readData = []byte("hello world!")
|
||||
pb = path.Builder{}.Append(path.Split("drive/driveID1/root:/folderPath")...)
|
||||
)
|
||||
|
||||
folderPath, err := pb.ToDataLayerOneDrivePath("a-tenant", "a-user", false)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
factories []extensions.CreateItemExtensioner
|
||||
payload []byte
|
||||
expectReadErr require.ErrorAssertionFunc
|
||||
expectCloseErr require.ErrorAssertionFunc
|
||||
rc io.ReadCloser
|
||||
expect verifyExtensionOutput
|
||||
}{
|
||||
{
|
||||
name: "nil extensions",
|
||||
factories: nil,
|
||||
payload: readData,
|
||||
expectReadErr: require.NoError,
|
||||
expectCloseErr: require.NoError,
|
||||
rc: io.NopCloser(bytes.NewReader(readData)),
|
||||
expect: func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
) {
|
||||
require.Nil(t, info.Extension.Data)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no extensions",
|
||||
factories: []extensions.CreateItemExtensioner{},
|
||||
payload: readData,
|
||||
expectReadErr: require.NoError,
|
||||
expectCloseErr: require.NoError,
|
||||
rc: io.NopCloser(bytes.NewReader(readData)),
|
||||
expect: func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
) {
|
||||
require.Nil(t, info.Extension.Data)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with extension",
|
||||
factories: []extensions.CreateItemExtensioner{
|
||||
&extensions.MockItemExtensionFactory{},
|
||||
},
|
||||
payload: readData,
|
||||
expectReadErr: require.NoError,
|
||||
expectCloseErr: require.NoError,
|
||||
rc: io.NopCloser(bytes.NewReader(readData)),
|
||||
expect: func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
) {
|
||||
verifyExtensionData(
|
||||
t,
|
||||
info.Extension,
|
||||
int64(len(payload)),
|
||||
crc32.ChecksumIEEE(payload))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "zero length payload",
|
||||
factories: []extensions.CreateItemExtensioner{
|
||||
&extensions.MockItemExtensionFactory{},
|
||||
},
|
||||
payload: []byte{},
|
||||
expectReadErr: require.NoError,
|
||||
expectCloseErr: require.NoError,
|
||||
rc: io.NopCloser(bytes.NewReader([]byte{})),
|
||||
expect: func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
) {
|
||||
verifyExtensionData(
|
||||
t,
|
||||
info.Extension,
|
||||
int64(len(payload)),
|
||||
crc32.ChecksumIEEE(payload))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "extension fails on read",
|
||||
factories: []extensions.CreateItemExtensioner{
|
||||
&extensions.MockItemExtensionFactory{
|
||||
FailOnRead: true,
|
||||
},
|
||||
},
|
||||
payload: readData,
|
||||
expectReadErr: require.Error,
|
||||
expectCloseErr: require.NoError,
|
||||
rc: io.NopCloser(bytes.NewReader(readData)),
|
||||
expect: func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
) {
|
||||
// The extension may have dirty data in this case, hence skipping
|
||||
// verification of extension info
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "extension fails on close",
|
||||
factories: []extensions.CreateItemExtensioner{
|
||||
&extensions.MockItemExtensionFactory{
|
||||
FailOnClose: true,
|
||||
},
|
||||
},
|
||||
payload: readData,
|
||||
expectReadErr: require.NoError,
|
||||
expectCloseErr: require.Error,
|
||||
rc: io.NopCloser(bytes.NewReader(readData)),
|
||||
expect: func(
|
||||
t *testing.T,
|
||||
info details.ItemInfo,
|
||||
payload []byte,
|
||||
) {
|
||||
// The extension may have dirty data in this case, hence skipping
|
||||
// verification of extension info
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
mbh := mock.DefaultOneDriveBH()
|
||||
mbh.GI = mock.GetsItem{Err: assert.AnError}
|
||||
mbh.GIP = mock.GetsItemPermission{Perm: models.NewPermissionCollectionResponse()}
|
||||
mbh.GetResps = []*http.Response{
|
||||
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(string(test.payload)))},
|
||||
}
|
||||
mbh.GetErrs = []error{
|
||||
nil,
|
||||
}
|
||||
|
||||
opts := control.Defaults()
|
||||
opts.ItemExtensionFactory = append(
|
||||
opts.ItemExtensionFactory,
|
||||
test.factories...)
|
||||
|
||||
coll, err := NewCollection(
|
||||
mbh,
|
||||
folderPath,
|
||||
nil,
|
||||
driveID,
|
||||
suite.testStatusUpdater(&wg, &collStatus),
|
||||
opts,
|
||||
CollectionScopeFolder,
|
||||
true,
|
||||
nil)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
stubItem := odTD.NewStubDriveItem(
|
||||
stubItemID,
|
||||
stubItemName,
|
||||
int64(len(test.payload)),
|
||||
now,
|
||||
now,
|
||||
true,
|
||||
false)
|
||||
|
||||
coll.Add(stubItem)
|
||||
|
||||
collItem, ok := <-coll.Items(ctx, fault.New(true))
|
||||
assert.True(t, ok)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
ei, ok := collItem.(data.StreamInfo)
|
||||
assert.True(t, ok)
|
||||
itemInfo := ei.Info()
|
||||
|
||||
_, err = io.ReadAll(collItem.ToReader())
|
||||
test.expectReadErr(t, err, clues.ToCore(err))
|
||||
|
||||
err = collItem.ToReader().Close()
|
||||
test.expectCloseErr(t, err, clues.ToCore(err))
|
||||
|
||||
// Verify extension data
|
||||
test.expect(t, itemInfo, test.payload)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func verifyExtensionData(
|
||||
t *testing.T,
|
||||
extensionData *details.ExtensionData,
|
||||
expectedBytes int64,
|
||||
expectedCrc uint32,
|
||||
) {
|
||||
require.NotNil(t, extensionData, "nil extension")
|
||||
assert.NotNil(t, extensionData.Data[extensions.KNumBytes], "key not found")
|
||||
assert.NotNil(t, extensionData.Data[extensions.KCrc32], "key not found")
|
||||
|
||||
eSize := extensionData.Data[extensions.KNumBytes].(int64)
|
||||
assert.Equal(t, expectedBytes, eSize, "incorrect num bytes")
|
||||
|
||||
c := extensionData.Data[extensions.KCrc32].(uint32)
|
||||
require.Equal(t, expectedCrc, c, "incorrect crc")
|
||||
}
|
||||
|
||||
@ -262,5 +262,7 @@ func augmentItemInfo(
|
||||
Size: size,
|
||||
}
|
||||
|
||||
dii.Extension = &details.ExtensionData{}
|
||||
|
||||
return dii
|
||||
}
|
||||
|
||||
@ -47,7 +47,10 @@ type BackupHandler struct {
|
||||
|
||||
func DefaultOneDriveBH() *BackupHandler {
|
||||
return &BackupHandler{
|
||||
ItemInfo: details.ItemInfo{OneDrive: &details.OneDriveInfo{}},
|
||||
ItemInfo: details.ItemInfo{
|
||||
OneDrive: &details.OneDriveInfo{},
|
||||
Extension: &details.ExtensionData{},
|
||||
},
|
||||
GI: GetsItem{Err: clues.New("not defined")},
|
||||
GIP: GetsItemPermission{Err: clues.New("not defined")},
|
||||
PathPrefixFn: defaultOneDrivePathPrefixer,
|
||||
@ -62,7 +65,10 @@ func DefaultOneDriveBH() *BackupHandler {
|
||||
|
||||
func DefaultSharePointBH() *BackupHandler {
|
||||
return &BackupHandler{
|
||||
ItemInfo: details.ItemInfo{SharePoint: &details.SharePointInfo{}},
|
||||
ItemInfo: details.ItemInfo{
|
||||
SharePoint: &details.SharePointInfo{},
|
||||
Extension: &details.ExtensionData{},
|
||||
},
|
||||
GI: GetsItem{Err: clues.New("not defined")},
|
||||
GIP: GetsItemPermission{Err: clues.New("not defined")},
|
||||
PathPrefixFn: defaultSharePointPathPrefixer,
|
||||
|
||||
@ -310,5 +310,7 @@ func augmentItemInfo(
|
||||
WebURL: weburl,
|
||||
}
|
||||
|
||||
dii.Extension = &details.ExtensionData{}
|
||||
|
||||
return dii
|
||||
}
|
||||
|
||||
@ -103,11 +103,11 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchange() {
|
||||
var (
|
||||
mb = evmock.NewBus()
|
||||
sel = test.selector().Selector
|
||||
ffs = control.Toggles{}
|
||||
opts = control.Defaults()
|
||||
whatSet = deeTD.CategoryFromRepoRef
|
||||
)
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, ffs, version.Backup)
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
sel = bod.sel
|
||||
@ -169,7 +169,7 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchange() {
|
||||
ctx,
|
||||
bod,
|
||||
incMB,
|
||||
ffs)
|
||||
opts)
|
||||
)
|
||||
|
||||
runAndCheckBackup(t, ctx, &incBO, incMB, true)
|
||||
@ -256,8 +256,10 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
containers = []string{container1, container2, container3, containerRename}
|
||||
sel = selectors.NewExchangeBackup([]string{suite.its.userID})
|
||||
whatSet = deeTD.CategoryFromRepoRef
|
||||
opts = control.Defaults()
|
||||
)
|
||||
|
||||
opts.ToggleFeatures = toggles
|
||||
ctrl, sels := ControllerWithSelector(t, ctx, acct, resource.Users, sel.Selector, nil, nil)
|
||||
sel.DiscreteOwner = sels.ID()
|
||||
sel.DiscreteOwnerName = sels.Name()
|
||||
@ -378,7 +380,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
}
|
||||
}
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, toggles, version.Backup)
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
// run the initial backup
|
||||
@ -769,7 +771,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
ctx, flush := tester.WithContext(t, ctx)
|
||||
defer flush()
|
||||
|
||||
incBO := newTestBackupOp(t, ctx, bod, incMB, toggles)
|
||||
incBO := newTestBackupOp(t, ctx, bod, incMB, opts)
|
||||
|
||||
suite.Run("PreTestSetup", func() {
|
||||
t := suite.T()
|
||||
|
||||
@ -34,6 +34,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/extensions"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
@ -87,7 +88,7 @@ func prepNewTestBackupOp(
|
||||
ctx context.Context, //revive:disable-line:context-as-argument
|
||||
bus events.Eventer,
|
||||
sel selectors.Selector,
|
||||
featureToggles control.Toggles,
|
||||
opts control.Options,
|
||||
backupVersion int,
|
||||
) (
|
||||
operations.BackupOperation,
|
||||
@ -148,7 +149,7 @@ func prepNewTestBackupOp(
|
||||
ctx,
|
||||
bod,
|
||||
bus,
|
||||
featureToggles)
|
||||
opts)
|
||||
|
||||
bod.sss = streamstore.NewStreamer(
|
||||
bod.kw,
|
||||
@ -167,11 +168,8 @@ func newTestBackupOp(
|
||||
ctx context.Context, //revive:disable-line:context-as-argument
|
||||
bod *backupOpDependencies,
|
||||
bus events.Eventer,
|
||||
featureToggles control.Toggles,
|
||||
opts control.Options,
|
||||
) operations.BackupOperation {
|
||||
opts := control.Defaults()
|
||||
|
||||
opts.ToggleFeatures = featureToggles
|
||||
bod.ctrl.IDNameLookup = idname.NewCache(map[string]string{bod.sel.ID(): bod.sel.Name()})
|
||||
|
||||
bo, err := operations.NewBackupOperation(
|
||||
@ -629,3 +627,25 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
|
||||
|
||||
return its
|
||||
}
|
||||
|
||||
func getTestExtensionFactories() []extensions.CreateItemExtensioner {
|
||||
return []extensions.CreateItemExtensioner{
|
||||
&extensions.MockItemExtensionFactory{},
|
||||
}
|
||||
}
|
||||
|
||||
func verifyExtensionData(
|
||||
t *testing.T,
|
||||
itemInfo details.ItemInfo,
|
||||
p path.ServiceType,
|
||||
) {
|
||||
require.NotNil(t, itemInfo.Extension, "nil extension")
|
||||
assert.NotNil(t, itemInfo.Extension.Data[extensions.KNumBytes], "key not found in extension")
|
||||
actualSize := int64(itemInfo.Extension.Data[extensions.KNumBytes].(float64))
|
||||
|
||||
if p == path.SharePointService {
|
||||
assert.Equal(t, itemInfo.SharePoint.Size, actualSize, "incorrect data in extension")
|
||||
} else {
|
||||
assert.Equal(t, itemInfo.OneDrive.Size, actualSize, "incorrect data in extension")
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,11 +70,12 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDrive() {
|
||||
osel = selectors.NewOneDriveBackup([]string{userID})
|
||||
ws = deeTD.DriveIDFromRepoRef
|
||||
svc = path.OneDriveService
|
||||
opts = control.Defaults()
|
||||
)
|
||||
|
||||
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, control.Toggles{}, version.Backup)
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||
@ -163,7 +164,7 @@ func runDriveIncrementalTest(
|
||||
|
||||
var (
|
||||
acct = tconfig.NewM365Account(t)
|
||||
ffs = control.Toggles{}
|
||||
opts = control.Defaults()
|
||||
mb = evmock.NewBus()
|
||||
ws = deeTD.DriveIDFromRepoRef
|
||||
|
||||
@ -259,7 +260,7 @@ func runDriveIncrementalTest(
|
||||
containerIDs[destName] = ptr.Val(resp.GetId())
|
||||
}
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, ffs, version.Backup)
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
sel = bod.sel
|
||||
@ -612,7 +613,7 @@ func runDriveIncrementalTest(
|
||||
ctx,
|
||||
bod,
|
||||
incMB,
|
||||
ffs)
|
||||
opts)
|
||||
)
|
||||
|
||||
ctx, flush := tester.WithContext(t, ctx)
|
||||
@ -701,7 +702,7 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
||||
|
||||
var (
|
||||
acct = tconfig.NewM365Account(t)
|
||||
ffs = control.Toggles{}
|
||||
opts = control.Defaults()
|
||||
mb = evmock.NewBus()
|
||||
|
||||
categories = map[path.CategoryType][]string{
|
||||
@ -729,7 +730,7 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
||||
oldsel := selectors.NewOneDriveBackup([]string{uname})
|
||||
oldsel.Include(selTD.OneDriveBackupFolderScope(oldsel))
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, oldsel.Selector, ffs, 0)
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, oldsel.Selector, opts, 0)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
sel := bod.sel
|
||||
@ -757,7 +758,7 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
||||
var (
|
||||
incMB = evmock.NewBus()
|
||||
// the incremental backup op should have a proper user ID for the id.
|
||||
incBO = newTestBackupOp(t, ctx, bod, incMB, ffs)
|
||||
incBO = newTestBackupOp(t, ctx, bod, incMB, opts)
|
||||
)
|
||||
|
||||
require.NotEqualf(
|
||||
@ -824,3 +825,58 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveExtensions() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
tenID = tconfig.M365TenantID(t)
|
||||
mb = evmock.NewBus()
|
||||
userID = tconfig.SecondaryM365UserID(t)
|
||||
osel = selectors.NewOneDriveBackup([]string{userID})
|
||||
ws = deeTD.DriveIDFromRepoRef
|
||||
svc = path.OneDriveService
|
||||
opts = control.Defaults()
|
||||
)
|
||||
|
||||
opts.ItemExtensionFactory = getTestExtensionFactories()
|
||||
|
||||
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||
|
||||
bID := bo.Results.BackupID
|
||||
|
||||
deets, expectDeets := deeTD.GetDeetsInBackup(
|
||||
t,
|
||||
ctx,
|
||||
bID,
|
||||
tenID,
|
||||
bod.sel.ID(),
|
||||
svc,
|
||||
ws,
|
||||
bod.kms,
|
||||
bod.sss)
|
||||
deeTD.CheckBackupDetails(
|
||||
t,
|
||||
ctx,
|
||||
bID,
|
||||
ws,
|
||||
bod.kms,
|
||||
bod.sss,
|
||||
expectDeets,
|
||||
false)
|
||||
|
||||
// Check that the extensions are in the backup
|
||||
for _, ent := range deets.Entries {
|
||||
if ent.Folder == nil {
|
||||
verifyExtensionData(t, ent.ItemInfo, path.OneDriveService)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/internal/tester/tconfig"
|
||||
"github.com/alcionai/corso/src/internal/version"
|
||||
deeTD "github.com/alcionai/corso/src/pkg/backup/details/testdata"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
@ -92,13 +93,14 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePoint() {
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
mb = evmock.NewBus()
|
||||
sel = selectors.NewSharePointBackup([]string{suite.its.siteID})
|
||||
mb = evmock.NewBus()
|
||||
sel = selectors.NewSharePointBackup([]string{suite.its.siteID})
|
||||
opts = control.Defaults()
|
||||
)
|
||||
|
||||
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{}, version.Backup)
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||
@ -112,3 +114,66 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePoint() {
|
||||
suite.its.siteID,
|
||||
path.LibrariesCategory)
|
||||
}
|
||||
|
||||
func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePointExtensions() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
mb = evmock.NewBus()
|
||||
sel = selectors.NewSharePointBackup([]string{suite.its.siteID})
|
||||
opts = control.Defaults()
|
||||
tenID = tconfig.M365TenantID(t)
|
||||
svc = path.SharePointService
|
||||
ws = deeTD.DriveIDFromRepoRef
|
||||
)
|
||||
|
||||
opts.ItemExtensionFactory = getTestExtensionFactories()
|
||||
|
||||
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
||||
|
||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
||||
defer bod.close(t, ctx)
|
||||
|
||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||
checkBackupIsInManifests(
|
||||
t,
|
||||
ctx,
|
||||
bod.kw,
|
||||
bod.sw,
|
||||
&bo,
|
||||
bod.sel,
|
||||
suite.its.siteID,
|
||||
path.LibrariesCategory)
|
||||
|
||||
bID := bo.Results.BackupID
|
||||
|
||||
deets, expectDeets := deeTD.GetDeetsInBackup(
|
||||
t,
|
||||
ctx,
|
||||
bID,
|
||||
tenID,
|
||||
bod.sel.ID(),
|
||||
svc,
|
||||
ws,
|
||||
bod.kms,
|
||||
bod.sss)
|
||||
deeTD.CheckBackupDetails(
|
||||
t,
|
||||
ctx,
|
||||
bID,
|
||||
ws,
|
||||
bod.kms,
|
||||
bod.sss,
|
||||
expectDeets,
|
||||
false)
|
||||
|
||||
// Check that the extensions are in the backup
|
||||
for _, ent := range deets.Entries {
|
||||
if ent.Folder == nil {
|
||||
verifyExtensionData(t, ent.ItemInfo, path.SharePointService)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -704,6 +704,8 @@ type ItemInfo struct {
|
||||
Exchange *ExchangeInfo `json:"exchange,omitempty"`
|
||||
SharePoint *SharePointInfo `json:"sharePoint,omitempty"`
|
||||
OneDrive *OneDriveInfo `json:"oneDrive,omitempty"`
|
||||
// Optional item extension data
|
||||
Extension *ExtensionData `json:"extension,omitempty"`
|
||||
}
|
||||
|
||||
// typedInfo should get embedded in each sesrvice type to track
|
||||
@ -1041,7 +1043,7 @@ func updateFolderWithinDrive(
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtensionInfo describes extension data associated with an item
|
||||
type ExtensionInfo struct {
|
||||
// ExtensionData stores extension data associated with an item
|
||||
type ExtensionData struct {
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
@ -15,7 +15,7 @@ type CreateItemExtensioner interface {
|
||||
context.Context,
|
||||
io.ReadCloser,
|
||||
details.ItemInfo,
|
||||
*details.ExtensionInfo,
|
||||
*details.ExtensionData,
|
||||
) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
@ -27,7 +27,7 @@ func AddItemExtensions(
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
factories []CreateItemExtensioner,
|
||||
) (io.ReadCloser, *details.ExtensionInfo, error) {
|
||||
) (io.ReadCloser, *details.ExtensionData, error) {
|
||||
if rc == nil {
|
||||
return nil, nil, clues.New("nil readcloser")
|
||||
}
|
||||
@ -38,7 +38,7 @@ func AddItemExtensions(
|
||||
|
||||
ctx = clues.Add(ctx, "num_extensions", len(factories))
|
||||
|
||||
extInfo := &details.ExtensionInfo{
|
||||
extData := &details.ExtensionData{
|
||||
Data: make(map[string]any),
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ func AddItemExtensions(
|
||||
return nil, nil, clues.New("nil extension factory")
|
||||
}
|
||||
|
||||
extRc, err := factory.CreateItemExtension(ctx, rc, info, extInfo)
|
||||
extRc, err := factory.CreateItemExtension(ctx, rc, info, extData)
|
||||
if err != nil {
|
||||
return nil, nil, clues.Wrap(err, "create item extension")
|
||||
}
|
||||
@ -57,5 +57,5 @@ func AddItemExtensions(
|
||||
|
||||
logger.Ctx(ctx).Debug("added item extensions")
|
||||
|
||||
return rc, extInfo, nil
|
||||
return rc, extData, nil
|
||||
}
|
||||
|
||||
@ -4,9 +4,12 @@ package extensions
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
@ -25,7 +28,7 @@ func TestExtensionsUnitSuite(t *testing.T) {
|
||||
func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
type outputValidationFunc func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool
|
||||
|
||||
@ -52,10 +55,10 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err == nil && extRc != nil && extInfo != nil
|
||||
return err == nil && extRc != nil && extData != nil
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -67,10 +70,10 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err == nil && extRc != nil && extInfo != nil
|
||||
return err == nil && extRc != nil && extData != nil
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -79,10 +82,10 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
return err != nil && extRc == nil && extData == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -95,26 +98,26 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
return err != nil && extRc == nil && extData == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "factory call returns error",
|
||||
factories: []CreateItemExtensioner{
|
||||
&MockItemExtensionFactory{
|
||||
shouldReturnError: true,
|
||||
FailOnFactoryCreation: true,
|
||||
},
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
return err != nil && extRc == nil && extData == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -122,16 +125,16 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
factories: []CreateItemExtensioner{
|
||||
&MockItemExtensionFactory{},
|
||||
&MockItemExtensionFactory{
|
||||
shouldReturnError: true,
|
||||
FailOnFactoryCreation: true,
|
||||
},
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
return err != nil && extRc == nil && extData == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -142,10 +145,10 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
rc: nil,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
return err != nil && extRc == nil && extData == nil
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -156,12 +159,79 @@ func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
extRc, extInfo, err := AddItemExtensions(
|
||||
extRc, extData, err := AddItemExtensions(
|
||||
ctx,
|
||||
test.rc,
|
||||
testItemInfo,
|
||||
test.factories)
|
||||
require.True(t, test.validateOutputs(extRc, extInfo, err))
|
||||
require.True(t, test.validateOutputs(extRc, extData, err))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func readFrom(rc io.ReadCloser) error {
|
||||
defer rc.Close()
|
||||
|
||||
var err error
|
||||
|
||||
p := make([]byte, 4)
|
||||
|
||||
for err == nil {
|
||||
_, err := rc.Read(p)
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (suite *ExtensionsUnitSuite) TestReadCloserWrappers() {
|
||||
data := []byte("hello world!")
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
factories []CreateItemExtensioner
|
||||
payload []byte
|
||||
check require.ErrorAssertionFunc
|
||||
rc io.ReadCloser
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
factories: []CreateItemExtensioner{
|
||||
&MockItemExtensionFactory{},
|
||||
},
|
||||
payload: data,
|
||||
check: require.NoError,
|
||||
rc: io.NopCloser(bytes.NewReader(data)),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
extRc, extData, err := AddItemExtensions(
|
||||
ctx,
|
||||
test.rc,
|
||||
details.ItemInfo{},
|
||||
test.factories)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
err = readFrom(extRc)
|
||||
test.check(t, err, clues.ToCore(err))
|
||||
|
||||
if err == nil {
|
||||
require.Equal(suite.T(), len(test.payload), int(extData.Data[KNumBytes].(int64)))
|
||||
c := extData.Data[KCrc32].(uint32)
|
||||
require.Equal(suite.T(), c, crc32.ChecksumIEEE(test.payload))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,8 +2,10 @@ package extensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
|
||||
@ -11,79 +13,88 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
KNumBytes = "NumBytes"
|
||||
KCrc32 = "Crc32"
|
||||
)
|
||||
|
||||
var _ io.ReadCloser = &MockExtension{}
|
||||
|
||||
type MockExtension struct {
|
||||
numBytes int
|
||||
crc32 uint32
|
||||
info details.ItemInfo
|
||||
extInfo *details.ExtensionInfo
|
||||
innerRc io.ReadCloser
|
||||
ctx context.Context
|
||||
failOnRead bool
|
||||
failOnClose bool
|
||||
NumBytes int64
|
||||
Crc32 uint32
|
||||
Info details.ItemInfo
|
||||
ExtData *details.ExtensionData
|
||||
InnerRc io.ReadCloser
|
||||
Ctx context.Context
|
||||
FailOnRead bool
|
||||
FailOnClose bool
|
||||
}
|
||||
|
||||
func (me *MockExtension) Read(p []byte) (int, error) {
|
||||
if me.failOnRead {
|
||||
if me.FailOnRead {
|
||||
return 0, clues.New("mock read error")
|
||||
}
|
||||
|
||||
n, err := me.innerRc.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
logger.CtxErr(me.ctx, err).Error("inner read error")
|
||||
return n, err
|
||||
n, err := me.InnerRc.Read(p)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
logger.CtxErr(me.Ctx, err).Error("inner read error")
|
||||
return n, clues.Stack(err)
|
||||
}
|
||||
|
||||
me.numBytes += n
|
||||
me.crc32 = crc32.Update(me.crc32, crc32.IEEETable, p[:n])
|
||||
atomic.AddInt64(&me.NumBytes, int64(n))
|
||||
|
||||
if err == io.EOF {
|
||||
logger.Ctx(me.ctx).Debug("mock extension reached EOF")
|
||||
me.extInfo.Data["numBytes"] = me.numBytes
|
||||
me.extInfo.Data["crc32"] = me.crc32
|
||||
me.Crc32 = crc32.Update(me.Crc32, crc32.IEEETable, p[:n])
|
||||
|
||||
if errors.Is(err, io.EOF) {
|
||||
me.ExtData.Data[KNumBytes] = me.NumBytes
|
||||
me.ExtData.Data[KCrc32] = me.Crc32
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (me *MockExtension) Close() error {
|
||||
if me.failOnClose {
|
||||
if me.FailOnClose {
|
||||
return clues.New("mock close error")
|
||||
}
|
||||
|
||||
err := me.innerRc.Close()
|
||||
err := me.InnerRc.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err)
|
||||
}
|
||||
|
||||
me.extInfo.Data["numBytes"] = me.numBytes
|
||||
me.extInfo.Data["crc32"] = me.crc32
|
||||
logger.Ctx(me.ctx).Infow(
|
||||
me.ExtData.Data[KNumBytes] = me.NumBytes
|
||||
me.ExtData.Data[KCrc32] = me.Crc32
|
||||
logger.Ctx(me.Ctx).Infow(
|
||||
"mock extension closed",
|
||||
"numBytes", me.numBytes, "crc32", me.crc32)
|
||||
KNumBytes, me.NumBytes, KCrc32, me.Crc32)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type MockItemExtensionFactory struct {
|
||||
shouldReturnError bool
|
||||
FailOnFactoryCreation bool
|
||||
FailOnRead bool
|
||||
FailOnClose bool
|
||||
}
|
||||
|
||||
func (m *MockItemExtensionFactory) CreateItemExtension(
|
||||
ctx context.Context,
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
extInfo *details.ExtensionInfo,
|
||||
extData *details.ExtensionData,
|
||||
) (io.ReadCloser, error) {
|
||||
if m.shouldReturnError {
|
||||
if m.FailOnFactoryCreation {
|
||||
return nil, clues.New("factory error")
|
||||
}
|
||||
|
||||
return &MockExtension{
|
||||
ctx: ctx,
|
||||
innerRc: rc,
|
||||
info: info,
|
||||
extInfo: extInfo,
|
||||
Ctx: ctx,
|
||||
InnerRc: rc,
|
||||
Info: info,
|
||||
ExtData: extData,
|
||||
FailOnRead: m.FailOnRead,
|
||||
FailOnClose: m.FailOnClose,
|
||||
}, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user