From b9b58e8c300a9363e1f04a4171ad01b962bcfc24 Mon Sep 17 00:00:00 2001 From: Keepers Date: Fri, 4 Aug 2023 13:59:19 -0600 Subject: [PATCH] threadsafe maps in restore caches (#3941) replaces the restore cache maps with threadsafe maps. Onedrive restores would rarely fail due to concurrent map writes against the restore cache maps. This change should protect against failure in those conditions. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :bug: Bugfix #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- .../m365/onedrive/item_collector_test.go | 2 +- src/internal/m365/onedrive/permission.go | 23 +++++++++-------- src/internal/m365/onedrive/permission_test.go | 8 +++++- src/internal/m365/onedrive/restore.go | 21 +++++++++------- src/internal/m365/onedrive/restore_caches.go | 25 ++++++++++--------- src/internal/m365/onedrive/restore_test.go | 22 ++++++++-------- src/internal/operations/test/onedrive_test.go | 3 ++- 7 files changed, 58 insertions(+), 46 deletions(-) diff --git a/src/internal/m365/onedrive/item_collector_test.go b/src/internal/m365/onedrive/item_collector_test.go index eae82e7e2..ea6726839 100644 --- a/src/internal/m365/onedrive/item_collector_test.go +++ b/src/internal/m365/onedrive/item_collector_test.go @@ -362,7 +362,7 @@ func (suite *OneDriveIntgSuite) TestCreateGetDeleteFolder() { } caches := NewRestoreCaches(nil) - caches.DriveIDToDriveInfo[driveID] = driveInfo{rootFolderID: ptr.Val(rootFolder.GetId())} + caches.DriveIDToDriveInfo.Store(driveID, driveInfo{rootFolderID: ptr.Val(rootFolder.GetId())}) rh := NewRestoreHandler(suite.ac) diff --git a/src/internal/m365/onedrive/permission.go b/src/internal/m365/onedrive/permission.go index 539fee1c3..900d8c989 100644 --- a/src/internal/m365/onedrive/permission.go +++ b/src/internal/m365/onedrive/permission.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/drives" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/puzpuzpuz/xsync/v2" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" @@ -18,9 +19,9 @@ import ( func getParentMetadata( parentPath path.Path, - parentDirToMeta map[string]metadata.Metadata, + parentDirToMeta *xsync.MapOf[string, metadata.Metadata], ) (metadata.Metadata, error) { - parentMeta, ok := parentDirToMeta[parentPath.String()] + parentMeta, ok := parentDirToMeta.Load(parentPath.String()) if !ok { drivePath, err := path.ToDrivePath(parentPath) if err != nil { @@ -88,7 +89,7 @@ func getCollectionMetadata( func computePreviousLinkShares( ctx context.Context, originDir path.Path, - parentMetas map[string]metadata.Metadata, + parentMetas *xsync.MapOf[string, metadata.Metadata], ) ([]metadata.LinkShare, error) { linkShares := []metadata.LinkShare{} ctx = clues.Add(ctx, "origin_dir", originDir) @@ -110,7 +111,7 @@ func computePreviousLinkShares( break } - meta, ok := parentMetas[parent.String()] + meta, ok := parentMetas.Load(parent.String()) if !ok { return nil, clues.New("no metadata found in parent").WithClues(ictx) } @@ -138,7 +139,7 @@ func computePreviousMetadata( ctx context.Context, originDir path.Path, // map parent dir -> parent's metadata - parentMetas map[string]metadata.Metadata, + parentMetas *xsync.MapOf[string, metadata.Metadata], ) (metadata.Metadata, error) { var ( parent path.Path @@ -167,7 +168,7 @@ func computePreviousMetadata( return metadata.Metadata{}, nil } - meta, ok = parentMetas[parent.String()] + meta, ok = parentMetas.Load(parent.String()) if !ok { return metadata.Metadata{}, clues.New("no metadata found for parent folder: " + parent.String()).WithClues(ictx) } @@ -191,7 +192,7 @@ func UpdatePermissions( driveID string, itemID string, permAdded, permRemoved []metadata.Permission, - oldPermIDToNewID map[string]string, + oldPermIDToNewID *xsync.MapOf[string, string], ) error { // The ordering of the operations is important here. We first // remove all the removed permissions and then add the added ones. @@ -206,7 +207,7 @@ func UpdatePermissions( // this is bad citizenship, and could end up consuming a lot of // system resources if servicers leak client connections (sockets, etc). - pid, ok := oldPermIDToNewID[p.ID] + pid, ok := oldPermIDToNewID.Load(p.ID) if !ok { return clues.New("no new permission id").WithClues(ctx) } @@ -270,7 +271,7 @@ func UpdatePermissions( return clues.Stack(err) } - oldPermIDToNewID[p.ID] = ptr.Val(newPerm.GetValue()[0].GetId()) + oldPermIDToNewID.Store(p.ID, ptr.Val(newPerm.GetValue()[0].GetId())) } return nil @@ -287,7 +288,7 @@ func UpdateLinkShares( driveID string, itemID string, lsAdded, lsRemoved []metadata.LinkShare, - oldLinkShareIDToNewID map[string]string, + oldLinkShareIDToNewID *xsync.MapOf[string, string], ) (bool, error) { // You can only delete inherited sharing links the first time you // create a sharing link which is done using @@ -366,7 +367,7 @@ func UpdateLinkShares( return alreadyDeleted, clues.Stack(err) } - oldLinkShareIDToNewID[ls.ID] = ptr.Val(newLS.GetId()) + oldLinkShareIDToNewID.Store(ls.ID, ptr.Val(newLS.GetId())) } // It is possible to have empty link shares even though we should diff --git a/src/internal/m365/onedrive/permission_test.go b/src/internal/m365/onedrive/permission_test.go index e6c4881c3..7782fccd9 100644 --- a/src/internal/m365/onedrive/permission_test.go +++ b/src/internal/m365/onedrive/permission_test.go @@ -4,6 +4,7 @@ import ( "strings" "testing" + "github.com/puzpuzpuz/xsync/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -156,7 +157,12 @@ func runComputeParentPermissionsTest( ctx, flush := tester.NewContext(t) defer flush() - m, err := computePreviousMetadata(ctx, test.item, test.parentPerms) + input := xsync.NewMapOf[metadata.Metadata]() + for k, v := range test.parentPerms { + input.Store(k, v) + } + + m, err := computePreviousMetadata(ctx, test.item, input) require.NoError(t, err, "compute permissions") assert.Equal(t, m, test.meta) diff --git a/src/internal/m365/onedrive/restore.go b/src/internal/m365/onedrive/restore.go index a8d70c943..900f37e60 100644 --- a/src/internal/m365/onedrive/restore.go +++ b/src/internal/m365/onedrive/restore.go @@ -218,7 +218,7 @@ func RestoreCollection( } caches.collisionKeyToItemID = collisionKeyToItemID - caches.ParentDirToMeta[dc.FullPath().String()] = colMeta + caches.ParentDirToMeta.Store(dc.FullPath().String(), colMeta) items := dc.Items(ctx, errs) semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).ItemUpload()) @@ -391,7 +391,7 @@ func restoreItem( } trimmedPath := strings.TrimSuffix(itemPath.String(), metadata.DirMetaFileSuffix) - caches.ParentDirToMeta[trimmedPath] = meta + caches.ParentDirToMeta.Store(trimmedPath, meta) return details.ItemInfo{}, true, nil } @@ -680,10 +680,11 @@ func createRestoreFolders( caches *restoreCaches, ) (string, error) { var ( - driveID = drivePath.DriveID - folders = restoreDir.Elements() - location = path.Builder{}.Append(driveID) - parentFolderID = caches.DriveIDToDriveInfo[drivePath.DriveID].rootFolderID + driveID = drivePath.DriveID + folders = restoreDir.Elements() + location = path.Builder{}.Append(driveID) + parentFolderMeta, _ = caches.DriveIDToDriveInfo.Load(drivePath.DriveID) + parentFolderID = parentFolderMeta.rootFolderID ) ctx = clues.Add( @@ -1121,7 +1122,7 @@ func ensureDriveExists( // the drive might already be cached by ID. it's okay // if the name has changed. the ID is a better reference // anyway. - if di, ok := caches.DriveIDToDriveInfo[driveID]; ok { + if di, ok := caches.DriveIDToDriveInfo.Load(driveID); ok { return di, nil } @@ -1137,7 +1138,7 @@ func ensureDriveExists( oldName, ok := caches.BackupDriveIDName.NameOf(driveID) if ok { // check for drives that currently have the same name - if di, ok := caches.DriveNameToDriveInfo[oldName]; ok { + if di, ok := caches.DriveNameToDriveInfo.Load(oldName); ok { return di, nil } @@ -1172,5 +1173,7 @@ func ensureDriveExists( return driveInfo{}, clues.Wrap(err, "adding drive to cache").OrNil() } - return caches.DriveIDToDriveInfo[ptr.Val(newDrive.GetId())], nil + di, _ := caches.DriveIDToDriveInfo.Load(ptr.Val(newDrive.GetId())) + + return di, nil } diff --git a/src/internal/m365/onedrive/restore_caches.go b/src/internal/m365/onedrive/restore_caches.go index 6951a8bfe..096e0bff8 100644 --- a/src/internal/m365/onedrive/restore_caches.go +++ b/src/internal/m365/onedrive/restore_caches.go @@ -6,6 +6,7 @@ import ( "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/puzpuzpuz/xsync/v2" "github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/ptr" @@ -23,12 +24,12 @@ type driveInfo struct { type restoreCaches struct { BackupDriveIDName idname.Cacher collisionKeyToItemID map[string]api.DriveItemIDType - DriveIDToDriveInfo map[string]driveInfo - DriveNameToDriveInfo map[string]driveInfo + DriveIDToDriveInfo *xsync.MapOf[string, driveInfo] + DriveNameToDriveInfo *xsync.MapOf[string, driveInfo] Folders *folderCache - OldLinkShareIDToNewID map[string]string - OldPermIDToNewID map[string]string - ParentDirToMeta map[string]metadata.Metadata + OldLinkShareIDToNewID *xsync.MapOf[string, string] + OldPermIDToNewID *xsync.MapOf[string, string] + ParentDirToMeta *xsync.MapOf[string, metadata.Metadata] pool sync.Pool } @@ -52,8 +53,8 @@ func (rc *restoreCaches) AddDrive( di.rootFolderID = ptr.Val(root.GetId()) - rc.DriveIDToDriveInfo[di.id] = di - rc.DriveNameToDriveInfo[di.name] = di + rc.DriveIDToDriveInfo.Store(di.id, di) + rc.DriveNameToDriveInfo.Store(di.name, di) return nil } @@ -99,12 +100,12 @@ func NewRestoreCaches( return &restoreCaches{ BackupDriveIDName: backupDriveIDNames, collisionKeyToItemID: map[string]api.DriveItemIDType{}, - DriveIDToDriveInfo: map[string]driveInfo{}, - DriveNameToDriveInfo: map[string]driveInfo{}, + DriveIDToDriveInfo: xsync.NewMapOf[driveInfo](), + DriveNameToDriveInfo: xsync.NewMapOf[driveInfo](), Folders: NewFolderCache(), - OldLinkShareIDToNewID: map[string]string{}, - OldPermIDToNewID: map[string]string{}, - ParentDirToMeta: map[string]metadata.Metadata{}, + OldLinkShareIDToNewID: xsync.NewMapOf[string](), + OldPermIDToNewID: xsync.NewMapOf[string](), + ParentDirToMeta: xsync.NewMapOf[metadata.Metadata](), // Buffer pool for uploads pool: sync.Pool{ New: func() any { diff --git a/src/internal/m365/onedrive/restore_test.go b/src/internal/m365/onedrive/restore_test.go index 301a1b01e..b948a8cab 100644 --- a/src/internal/m365/onedrive/restore_test.go +++ b/src/internal/m365/onedrive/restore_test.go @@ -682,12 +682,12 @@ func (suite *RestoreUnitSuite) TestRestoreCaches_AddDrive() { test.expectErr(t, err, clues.ToCore(err)) if test.checkValues { - idResult := rc.DriveIDToDriveInfo[driveID] + idResult, _ := rc.DriveIDToDriveInfo.Load(driveID) assert.Equal(t, driveID, idResult.id, "drive id") assert.Equal(t, name, idResult.name, "drive name") assert.Equal(t, test.expectID, idResult.rootFolderID, "root folder id") - nameResult := rc.DriveNameToDriveInfo[name] + nameResult, _ := rc.DriveNameToDriveInfo.Load(name) assert.Equal(t, driveID, nameResult.id, "drive id") assert.Equal(t, name, nameResult.name, "drive name") assert.Equal(t, test.expectID, nameResult.rootFolderID, "root folder id") @@ -783,16 +783,16 @@ func (suite *RestoreUnitSuite) TestRestoreCaches_Populate() { err := rc.Populate(ctx, gdparf, "shmoo") test.expectErr(t, err, clues.ToCore(err)) - assert.Len(t, rc.DriveIDToDriveInfo, test.expectLen) - assert.Len(t, rc.DriveNameToDriveInfo, test.expectLen) + assert.Equal(t, rc.DriveIDToDriveInfo.Size(), test.expectLen) + assert.Equal(t, rc.DriveNameToDriveInfo.Size(), test.expectLen) if test.checkValues { - idResult := rc.DriveIDToDriveInfo[driveID] + idResult, _ := rc.DriveIDToDriveInfo.Load(driveID) assert.Equal(t, driveID, idResult.id, "drive id") assert.Equal(t, name, idResult.name, "drive name") assert.Equal(t, rfID, idResult.rootFolderID, "root folder id") - nameResult := rc.DriveNameToDriveInfo[name] + nameResult, _ := rc.DriveNameToDriveInfo.Load(name) assert.Equal(t, driveID, nameResult.id, "drive id") assert.Equal(t, name, nameResult.name, "drive name") assert.Equal(t, rfID, nameResult.rootFolderID, "root folder id") @@ -868,8 +868,8 @@ func (suite *RestoreUnitSuite) TestEnsureDriveExists() { id: id, name: name, } - rc.DriveIDToDriveInfo[id] = di - rc.DriveNameToDriveInfo[name] = di + rc.DriveIDToDriveInfo.Store(id, di) + rc.DriveNameToDriveInfo.Store(name, di) return rc } @@ -883,8 +883,8 @@ func (suite *RestoreUnitSuite) TestEnsureDriveExists() { id: "diff", name: name, } - rc.DriveIDToDriveInfo["diff"] = di - rc.DriveNameToDriveInfo[name] = di + rc.DriveIDToDriveInfo.Store("diff", di) + rc.DriveNameToDriveInfo.Store(name, di) return rc } @@ -1050,7 +1050,7 @@ func (suite *RestoreUnitSuite) TestEnsureDriveExists() { assert.Equal(t, test.expectName, di.name, "ensured drive has expected name") assert.Equal(t, test.expectID, di.id, "ensured drive has expected id") - nameResult := rc.DriveNameToDriveInfo[test.expectName] + nameResult, _ := rc.DriveNameToDriveInfo.Load(test.expectName) assert.Equal(t, test.expectName, nameResult.name, "found drive entry with expected name") } }) diff --git a/src/internal/operations/test/onedrive_test.go b/src/internal/operations/test/onedrive_test.go index 02ff334a7..75387a471 100644 --- a/src/internal/operations/test/onedrive_test.go +++ b/src/internal/operations/test/onedrive_test.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/drives" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/puzpuzpuz/xsync/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -334,7 +335,7 @@ func runDriveIncrementalTest( newFileName = "new_file.txt" newFileID string - permissionIDMappings = map[string]string{} + permissionIDMappings = xsync.NewMapOf[string]() writePerm = metadata.Permission{ ID: "perm-id", Roles: []string{"write"},