od: add per-container collision caching and checks (#3640)

pushes collision checks in onedrive upward from the api layer (due to the bug with ConflictBehavior) and into the restore controller.  Collision resolution in drive follows the same pattern that we use in exchange.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🌻 Feature

#### Issue(s)

* #3562

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-06-30 10:13:35 -06:00 committed by GitHub
parent c218dd865b
commit f2a4fd0ba3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 262 additions and 66 deletions

View File

@ -80,7 +80,10 @@ type GetItemer interface {
// ---------------------------------------------------------------------------
type RestoreHandler interface {
DeleteItemer
DeleteItemPermissioner
GetFolderByNamer
GetItemsByCollisionKeyser
GetRootFolderer
ItemInfoAugmenter
NewItemContentUploader
@ -90,13 +93,11 @@ type RestoreHandler interface {
UpdateItemLinkSharer
}
type NewItemContentUploader interface {
// NewItemContentUpload creates an upload session which is used as a writer
// for large item content.
NewItemContentUpload(
type DeleteItemer interface {
DeleteItem(
ctx context.Context,
driveID, itemID string,
) (models.UploadSessionable, error)
) error
}
type DeleteItemPermissioner interface {
@ -106,6 +107,28 @@ type DeleteItemPermissioner interface {
) error
}
type GetItemsByCollisionKeyser interface {
// GetItemsInContainerByCollisionKey looks up all items currently in
// the container, and returns them in a map[collisionKey]itemID.
// The collision key is uniquely defined by each category of data.
// Collision key checks are used during restore to handle the on-
// collision restore configurations that cause the item restore to get
// skipped, replaced, or copied.
GetItemsInContainerByCollisionKey(
ctx context.Context,
driveID, containerID string,
) (map[string]string, error)
}
type NewItemContentUploader interface {
// NewItemContentUpload creates an upload session which is used as a writer
// for large item content.
NewItemContentUpload(
ctx context.Context,
driveID, itemID string,
) (models.UploadSessionable, error)
}
type UpdateItemPermissioner interface {
PostItemPermissionUpdate(
ctx context.Context,

View File

@ -147,11 +147,11 @@ func (h itemRestoreHandler) AugmentItemInfo(
return augmentItemInfo(dii, item, size, parentPath)
}
func (h itemRestoreHandler) NewItemContentUpload(
func (h itemRestoreHandler) DeleteItem(
ctx context.Context,
driveID, itemID string,
) (models.UploadSessionable, error) {
return h.ac.NewItemContentUpload(ctx, driveID, itemID)
) error {
return h.ac.DeleteItem(ctx, driveID, itemID)
}
func (h itemRestoreHandler) DeleteItemPermission(
@ -161,6 +161,25 @@ func (h itemRestoreHandler) DeleteItemPermission(
return h.ac.DeleteItemPermission(ctx, driveID, itemID, permissionID)
}
func (h itemRestoreHandler) GetItemsInContainerByCollisionKey(
ctx context.Context,
driveID, containerID string,
) (map[string]string, error) {
m, err := h.ac.GetItemsInContainerByCollisionKey(ctx, driveID, containerID)
if err != nil {
return nil, err
}
return m, nil
}
func (h itemRestoreHandler) NewItemContentUpload(
ctx context.Context,
driveID, itemID string,
) (models.UploadSessionable, error) {
return h.ac.NewItemContentUpload(ctx, driveID, itemID)
}
func (h itemRestoreHandler) PostItemPermissionUpdate(
ctx context.Context,
driveID, itemID string,

View File

@ -233,11 +233,20 @@ func (m GetsItemPermission) GetItemPermission(
type RestoreHandler struct {
ItemInfo details.ItemInfo
CollisionKeyMap map[string]string
CalledDeleteItem bool
CalledDeleteItemOn string
DeleteItemErr error
CalledPostItem bool
PostItemResp models.DriveItemable
PostItemErr error
UploadSessionErr error
}
func (h RestoreHandler) AugmentItemInfo(
func (h *RestoreHandler) AugmentItemInfo(
details.ItemInfo,
models.DriveItemable,
int64,
@ -246,29 +255,46 @@ func (h RestoreHandler) AugmentItemInfo(
return h.ItemInfo
}
func (h RestoreHandler) NewItemContentUpload(
func (h *RestoreHandler) GetItemsInContainerByCollisionKey(
context.Context,
string, string,
) (models.UploadSessionable, error) {
return nil, clues.New("not implemented")
) (map[string]string, error) {
return h.CollisionKeyMap, nil
}
func (h RestoreHandler) DeleteItemPermission(
func (h *RestoreHandler) DeleteItem(
_ context.Context,
_, itemID string,
) error {
h.CalledDeleteItem = true
h.CalledDeleteItemOn = itemID
return h.DeleteItemErr
}
func (h *RestoreHandler) DeleteItemPermission(
context.Context,
string, string, string,
) error {
return clues.New("not implemented")
return nil
}
func (h RestoreHandler) PostItemPermissionUpdate(
func (h *RestoreHandler) NewItemContentUpload(
context.Context,
string, string,
) (models.UploadSessionable, error) {
return models.NewUploadSession(), h.UploadSessionErr
}
func (h *RestoreHandler) PostItemPermissionUpdate(
context.Context,
string, string,
*drives.ItemItemsItemInvitePostRequestBody,
) (drives.ItemItemsItemInviteResponseable, error) {
return nil, clues.New("not implemented")
return drives.NewItemItemsItemInviteResponse(), nil
}
func (h RestoreHandler) PostItemLinkShareUpdate(
func (h *RestoreHandler) PostItemLinkShareUpdate(
ctx context.Context,
driveID, itemID string,
body *drives.ItemItemsItemCreateLinkPostRequestBody,
@ -276,25 +302,26 @@ func (h RestoreHandler) PostItemLinkShareUpdate(
return nil, clues.New("not implemented")
}
func (h RestoreHandler) PostItemInContainer(
func (h *RestoreHandler) PostItemInContainer(
context.Context,
string, string,
models.DriveItemable,
control.CollisionPolicy,
) (models.DriveItemable, error) {
h.CalledPostItem = true
return h.PostItemResp, h.PostItemErr
}
func (h RestoreHandler) GetFolderByName(
func (h *RestoreHandler) GetFolderByName(
context.Context,
string, string, string,
) (models.DriveItemable, error) {
return nil, clues.New("not implemented")
return models.NewDriveItem(), nil
}
func (h RestoreHandler) GetRootFolder(
func (h *RestoreHandler) GetRootFolder(
context.Context,
string,
) (models.DriveItemable, error) {
return nil, clues.New("not implemented")
return models.NewDriveItem(), nil
}

View File

@ -81,9 +81,10 @@ func FileRespReadCloser(pl string) io.ReadCloser {
return io.NopCloser(bytes.NewReader([]byte(pl)))
}
const DriveFileMetaData = `{
"fileName": "fnords.txt"
}`
const (
DriveItemFileName = "fnords.txt"
DriveFileMetaData = `{"fileName": "` + DriveItemFileName + `"}`
)
//nolint:lll
const DriveFilePayloadData = `{

View File

@ -36,21 +36,24 @@ const (
)
type restoreCaches struct {
Folders *folderCache
ParentDirToMeta map[string]metadata.Metadata
OldPermIDToNewID map[string]string
OldLinkShareIDToNewID map[string]string
collisionKeyToItemID map[string]string
DriveIDToRootFolderID map[string]string
Folders *folderCache
OldLinkShareIDToNewID map[string]string
OldPermIDToNewID map[string]string
ParentDirToMeta map[string]metadata.Metadata
pool sync.Pool
}
func NewRestoreCaches() *restoreCaches {
return &restoreCaches{
Folders: NewFolderCache(),
ParentDirToMeta: map[string]metadata.Metadata{},
OldPermIDToNewID: map[string]string{},
OldLinkShareIDToNewID: map[string]string{},
collisionKeyToItemID: map[string]string{},
DriveIDToRootFolderID: map[string]string{},
Folders: NewFolderCache(),
OldLinkShareIDToNewID: map[string]string{},
OldPermIDToNewID: map[string]string{},
ParentDirToMeta: map[string]metadata.Metadata{},
// Buffer pool for uploads
pool: sync.Pool{
New: func() any {
@ -221,6 +224,12 @@ func RestoreCollection(
return metrics, clues.Wrap(err, "creating folders for restore")
}
collisionKeyToItemID, err := rh.GetItemsInContainerByCollisionKey(ctx, drivePath.DriveID, restoreFolderID)
if err != nil {
return metrics, clues.Wrap(err, "generating map of item collision keys")
}
caches.collisionKeyToItemID = collisionKeyToItemID
caches.ParentDirToMeta[dc.FullPath().String()] = colMeta
items := dc.Items(ctx, errs)
@ -356,6 +365,7 @@ func restoreItem(
fibn,
restoreFolderID,
copyBuffer,
caches.collisionKeyToItemID,
itemData)
if err != nil {
if errors.Is(err, graph.ErrItemAlreadyExistsConflict) && restoreCfg.OnCollision == control.Skip {
@ -458,6 +468,7 @@ func restoreV0File(
fibn data.FetchItemByNamer,
restoreFolderID string,
copyBuffer []byte,
collisionKeyToItemID map[string]string,
itemData data.Stream,
) (details.ItemInfo, error) {
_, itemInfo, err := restoreData(
@ -469,6 +480,7 @@ func restoreV0File(
itemData,
drivePath.DriveID,
restoreFolderID,
collisionKeyToItemID,
copyBuffer)
if err != nil {
return itemInfo, clues.Wrap(err, "restoring file")
@ -501,6 +513,7 @@ func restoreV1File(
itemData,
drivePath.DriveID,
restoreFolderID,
caches.collisionKeyToItemID,
copyBuffer)
if err != nil {
return details.ItemInfo{}, err
@ -583,6 +596,7 @@ func restoreV6File(
itemData,
drivePath.DriveID,
restoreFolderID,
caches.collisionKeyToItemID,
copyBuffer)
if err != nil {
return details.ItemInfo{}, err
@ -734,6 +748,7 @@ func createRestoreFolders(
}
type itemRestorer interface {
DeleteItemer
ItemInfoAugmenter
NewItemContentUploader
PostItemInContainerer
@ -748,6 +763,7 @@ func restoreData(
name string,
itemData data.Stream,
driveID, parentFolderID string,
collisionKeyToItemID map[string]string,
copyBuffer []byte,
) (string, details.ItemInfo, error) {
ctx, end := diagnostics.Span(ctx, "gc:oneDrive:restoreItem", diagnostics.Label("item_uuid", itemData.UUID()))
@ -761,18 +777,54 @@ func restoreData(
return "", details.ItemInfo{}, clues.New("item does not implement DataStreamInfo").WithClues(ctx)
}
var (
item = newItem(name, false)
collisionKey = api.DriveItemCollisionKey(item)
collisionID string
replace bool
)
if id, ok := collisionKeyToItemID[collisionKey]; ok {
log := logger.Ctx(ctx).With("collision_key", clues.Hide(collisionKey))
log.Debug("item collision")
if restoreCfg.OnCollision == control.Skip {
log.Debug("skipping item with collision")
return "", details.ItemInfo{}, graph.ErrItemAlreadyExistsConflict
}
collisionID = id
replace = restoreCfg.OnCollision == control.Replace
}
// drive items do not support PUT requests on the drive item data, so
// when replacing a collision, first delete the existing item. It would
// be nice to be able to do a post-then-delete like we do with exchange,
// but onedrive will conflict on the filename. So until the api's built-in
// conflict replace handling bug gets fixed, we either delete-post, and
// risk failures in the middle, or we post w/ copy, then delete, then patch
// the name, which could triple our graph calls in the worst case.
if replace {
if err := ir.DeleteItem(ctx, driveID, collisionID); err != nil {
return "", details.ItemInfo{}, clues.New("deleting colliding item")
}
}
// Create Item
// the Copy collision policy is used since we've technically already handled
// the collision behavior above. At this point, copy is most likely to succeed.
// We could go with control.Skip if we wanted to ensure no duplicate, but those
// duplicates will only happen under very unlikely race conditions.
newItem, err := ir.PostItemInContainer(
ctx,
driveID,
parentFolderID,
newItem(name, false),
restoreCfg.OnCollision)
item,
control.Copy)
if err != nil {
return "", details.ItemInfo{}, err
}
// Get a drive item writer
w, uploadURL, err := driveItemWriter(ctx, ir, driveID, ptr.Val(newItem.GetId()), ss.Size())
if err != nil {
return "", details.ItemInfo{}, clues.Wrap(err, "get item upload session")

View File

@ -5,10 +5,12 @@ import (
"github.com/alcionai/clues"
"github.com/google/uuid"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph"
odConsts "github.com/alcionai/corso/src/internal/m365/onedrive/consts"
"github.com/alcionai/corso/src/internal/m365/onedrive/mock"
@ -321,28 +323,76 @@ func (suite *RestoreUnitSuite) TestAugmentRestorePaths_DifferentRestorePath() {
}
}
func (suite *RestoreUnitSuite) TestRestoreItem_errItemAlreadyExists() {
func (suite *RestoreUnitSuite) TestRestoreItem_collisionHandling() {
const mndiID = "mndi-id"
table := []struct {
name string
collisionKeys map[string]string
onCollision control.CollisionPolicy
expectErr func(*testing.T, error)
expectSkipped assert.BoolAssertionFunc
expectMock func(*testing.T, *mock.RestoreHandler)
}{
{
name: "skip",
onCollision: control.Skip,
expectErr: func(t *testing.T, err error) {
require.NoError(t, err, clues.ToCore(err))
name: "no collision, copy",
collisionKeys: map[string]string{},
onCollision: control.Copy,
expectSkipped: assert.False,
expectMock: func(t *testing.T, rh *mock.RestoreHandler) {
assert.True(t, rh.CalledPostItem, "new item posted")
assert.False(t, rh.CalledDeleteItem, "new item deleted")
},
expectSkipped: assert.True,
},
{
name: "replace",
name: "no collision, replace",
collisionKeys: map[string]string{},
onCollision: control.Replace,
expectErr: func(t *testing.T, err error) {
require.ErrorIs(t, err, graph.ErrItemAlreadyExistsConflict, clues.ToCore(err))
},
expectSkipped: assert.False,
expectMock: func(t *testing.T, rh *mock.RestoreHandler) {
assert.True(t, rh.CalledPostItem, "new item posted")
assert.False(t, rh.CalledDeleteItem, "new item deleted")
},
},
{
name: "no collision, skip",
collisionKeys: map[string]string{},
onCollision: control.Skip,
expectSkipped: assert.False,
expectMock: func(t *testing.T, rh *mock.RestoreHandler) {
assert.True(t, rh.CalledPostItem, "new item posted")
assert.False(t, rh.CalledDeleteItem, "new item deleted")
},
},
{
name: "collision, copy",
collisionKeys: map[string]string{mock.DriveItemFileName: mndiID},
onCollision: control.Copy,
expectSkipped: assert.False,
expectMock: func(t *testing.T, rh *mock.RestoreHandler) {
assert.True(t, rh.CalledPostItem, "new item posted")
assert.False(t, rh.CalledDeleteItem, "new item deleted")
},
},
{
name: "collision, replace",
collisionKeys: map[string]string{mock.DriveItemFileName: mndiID},
onCollision: control.Replace,
expectSkipped: assert.False,
expectMock: func(t *testing.T, rh *mock.RestoreHandler) {
assert.True(t, rh.CalledPostItem, "new item posted")
assert.True(t, rh.CalledDeleteItem, "new item deleted")
assert.Equal(t, mndiID, rh.CalledDeleteItemOn, "deleted the correct item")
},
},
{
name: "collision, skip",
collisionKeys: map[string]string{mock.DriveItemFileName: mndiID},
onCollision: control.Skip,
expectSkipped: assert.True,
expectMock: func(t *testing.T, rh *mock.RestoreHandler) {
assert.False(t, rh.CalledPostItem, "new item posted")
assert.False(t, rh.CalledDeleteItem, "new item deleted")
},
},
}
for _, test := range table {
@ -352,16 +402,18 @@ func (suite *RestoreUnitSuite) TestRestoreItem_errItemAlreadyExists() {
ctx, flush := tester.NewContext(t)
defer flush()
mndi := models.NewDriveItem()
mndi.SetId(ptr.To(mndiID))
var (
rh = mock.RestoreHandler{
PostItemErr: graph.ErrItemAlreadyExistsConflict,
}
restoreCfg = control.RestoreConfig{
OnCollision: test.onCollision,
}
caches = NewRestoreCaches()
rh = &mock.RestoreHandler{PostItemResp: mndi}
restoreCfg = control.RestoreConfig{OnCollision: test.onCollision}
dpb = odConsts.DriveFolderPrefixBuilder("driveID1")
)
caches.collisionKeyToItemID = test.collisionKeys
dpp, err := dpb.ToDataLayerOneDrivePath("t", "u", false)
require.NoError(t, err)
@ -380,14 +432,17 @@ func (suite *RestoreUnitSuite) TestRestoreItem_errItemAlreadyExists() {
version.Backup,
dp,
"",
[]byte{},
NewRestoreCaches(),
make([]byte, graph.CopyBufferSize),
caches,
false,
&mock.Data{ID: uuid.NewString()},
&mock.Data{
ID: uuid.NewString(),
Reader: mock.FileRespReadCloser(mock.DriveFilePayloadData),
},
nil)
test.expectErr(t, err)
require.NoError(t, err, clues.ToCore(err))
test.expectSkipped(t, skip)
test.expectMock(t, rh)
})
}
}

View File

@ -173,11 +173,11 @@ func (h libraryRestoreHandler) AugmentItemInfo(
return augmentItemInfo(dii, item, size, parentPath)
}
func (h libraryRestoreHandler) NewItemContentUpload(
func (h libraryRestoreHandler) DeleteItem(
ctx context.Context,
driveID, itemID string,
) (models.UploadSessionable, error) {
return h.ac.NewItemContentUpload(ctx, driveID, itemID)
) error {
return h.ac.DeleteItem(ctx, driveID, itemID)
}
func (h libraryRestoreHandler) DeleteItemPermission(
@ -187,6 +187,25 @@ func (h libraryRestoreHandler) DeleteItemPermission(
return h.ac.DeleteItemPermission(ctx, driveID, itemID, permissionID)
}
func (h libraryRestoreHandler) GetItemsInContainerByCollisionKey(
ctx context.Context,
driveID, containerID string,
) (map[string]string, error) {
m, err := h.ac.GetItemsInContainerByCollisionKey(ctx, driveID, containerID)
if err != nil {
return nil, err
}
return m, nil
}
func (h libraryRestoreHandler) NewItemContentUpload(
ctx context.Context,
driveID, itemID string,
) (models.UploadSessionable, error) {
return h.ac.NewItemContentUpload(ctx, driveID, itemID)
}
func (h libraryRestoreHandler) PostItemPermissionUpdate(
ctx context.Context,
driveID, itemID string,