threadsafe maps in restore caches (#3941)

replaces the restore cache maps with threadsafe maps.

Onedrive restores would rarely fail due to concurrent
map writes against the restore cache maps.  This change
should protect against failure in those conditions.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🐛 Bugfix

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-08-04 13:59:19 -06:00 committed by GitHub
parent 179edfc08e
commit b9b58e8c30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 58 additions and 46 deletions

View File

@ -362,7 +362,7 @@ func (suite *OneDriveIntgSuite) TestCreateGetDeleteFolder() {
} }
caches := NewRestoreCaches(nil) caches := NewRestoreCaches(nil)
caches.DriveIDToDriveInfo[driveID] = driveInfo{rootFolderID: ptr.Val(rootFolder.GetId())} caches.DriveIDToDriveInfo.Store(driveID, driveInfo{rootFolderID: ptr.Val(rootFolder.GetId())})
rh := NewRestoreHandler(suite.ac) rh := NewRestoreHandler(suite.ac)

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/drives" "github.com/microsoftgraph/msgraph-sdk-go/drives"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/puzpuzpuz/xsync/v2"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -18,9 +19,9 @@ import (
func getParentMetadata( func getParentMetadata(
parentPath path.Path, parentPath path.Path,
parentDirToMeta map[string]metadata.Metadata, parentDirToMeta *xsync.MapOf[string, metadata.Metadata],
) (metadata.Metadata, error) { ) (metadata.Metadata, error) {
parentMeta, ok := parentDirToMeta[parentPath.String()] parentMeta, ok := parentDirToMeta.Load(parentPath.String())
if !ok { if !ok {
drivePath, err := path.ToDrivePath(parentPath) drivePath, err := path.ToDrivePath(parentPath)
if err != nil { if err != nil {
@ -88,7 +89,7 @@ func getCollectionMetadata(
func computePreviousLinkShares( func computePreviousLinkShares(
ctx context.Context, ctx context.Context,
originDir path.Path, originDir path.Path,
parentMetas map[string]metadata.Metadata, parentMetas *xsync.MapOf[string, metadata.Metadata],
) ([]metadata.LinkShare, error) { ) ([]metadata.LinkShare, error) {
linkShares := []metadata.LinkShare{} linkShares := []metadata.LinkShare{}
ctx = clues.Add(ctx, "origin_dir", originDir) ctx = clues.Add(ctx, "origin_dir", originDir)
@ -110,7 +111,7 @@ func computePreviousLinkShares(
break break
} }
meta, ok := parentMetas[parent.String()] meta, ok := parentMetas.Load(parent.String())
if !ok { if !ok {
return nil, clues.New("no metadata found in parent").WithClues(ictx) return nil, clues.New("no metadata found in parent").WithClues(ictx)
} }
@ -138,7 +139,7 @@ func computePreviousMetadata(
ctx context.Context, ctx context.Context,
originDir path.Path, originDir path.Path,
// map parent dir -> parent's metadata // map parent dir -> parent's metadata
parentMetas map[string]metadata.Metadata, parentMetas *xsync.MapOf[string, metadata.Metadata],
) (metadata.Metadata, error) { ) (metadata.Metadata, error) {
var ( var (
parent path.Path parent path.Path
@ -167,7 +168,7 @@ func computePreviousMetadata(
return metadata.Metadata{}, nil return metadata.Metadata{}, nil
} }
meta, ok = parentMetas[parent.String()] meta, ok = parentMetas.Load(parent.String())
if !ok { if !ok {
return metadata.Metadata{}, clues.New("no metadata found for parent folder: " + parent.String()).WithClues(ictx) return metadata.Metadata{}, clues.New("no metadata found for parent folder: " + parent.String()).WithClues(ictx)
} }
@ -191,7 +192,7 @@ func UpdatePermissions(
driveID string, driveID string,
itemID string, itemID string,
permAdded, permRemoved []metadata.Permission, permAdded, permRemoved []metadata.Permission,
oldPermIDToNewID map[string]string, oldPermIDToNewID *xsync.MapOf[string, string],
) error { ) error {
// The ordering of the operations is important here. We first // The ordering of the operations is important here. We first
// remove all the removed permissions and then add the added ones. // remove all the removed permissions and then add the added ones.
@ -206,7 +207,7 @@ func UpdatePermissions(
// this is bad citizenship, and could end up consuming a lot of // this is bad citizenship, and could end up consuming a lot of
// system resources if servicers leak client connections (sockets, etc). // system resources if servicers leak client connections (sockets, etc).
pid, ok := oldPermIDToNewID[p.ID] pid, ok := oldPermIDToNewID.Load(p.ID)
if !ok { if !ok {
return clues.New("no new permission id").WithClues(ctx) return clues.New("no new permission id").WithClues(ctx)
} }
@ -270,7 +271,7 @@ func UpdatePermissions(
return clues.Stack(err) return clues.Stack(err)
} }
oldPermIDToNewID[p.ID] = ptr.Val(newPerm.GetValue()[0].GetId()) oldPermIDToNewID.Store(p.ID, ptr.Val(newPerm.GetValue()[0].GetId()))
} }
return nil return nil
@ -287,7 +288,7 @@ func UpdateLinkShares(
driveID string, driveID string,
itemID string, itemID string,
lsAdded, lsRemoved []metadata.LinkShare, lsAdded, lsRemoved []metadata.LinkShare,
oldLinkShareIDToNewID map[string]string, oldLinkShareIDToNewID *xsync.MapOf[string, string],
) (bool, error) { ) (bool, error) {
// You can only delete inherited sharing links the first time you // You can only delete inherited sharing links the first time you
// create a sharing link which is done using // create a sharing link which is done using
@ -366,7 +367,7 @@ func UpdateLinkShares(
return alreadyDeleted, clues.Stack(err) return alreadyDeleted, clues.Stack(err)
} }
oldLinkShareIDToNewID[ls.ID] = ptr.Val(newLS.GetId()) oldLinkShareIDToNewID.Store(ls.ID, ptr.Val(newLS.GetId()))
} }
// It is possible to have empty link shares even though we should // It is possible to have empty link shares even though we should

View File

@ -4,6 +4,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/puzpuzpuz/xsync/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -156,7 +157,12 @@ func runComputeParentPermissionsTest(
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
m, err := computePreviousMetadata(ctx, test.item, test.parentPerms) input := xsync.NewMapOf[metadata.Metadata]()
for k, v := range test.parentPerms {
input.Store(k, v)
}
m, err := computePreviousMetadata(ctx, test.item, input)
require.NoError(t, err, "compute permissions") require.NoError(t, err, "compute permissions")
assert.Equal(t, m, test.meta) assert.Equal(t, m, test.meta)

View File

@ -218,7 +218,7 @@ func RestoreCollection(
} }
caches.collisionKeyToItemID = collisionKeyToItemID caches.collisionKeyToItemID = collisionKeyToItemID
caches.ParentDirToMeta[dc.FullPath().String()] = colMeta caches.ParentDirToMeta.Store(dc.FullPath().String(), colMeta)
items := dc.Items(ctx, errs) items := dc.Items(ctx, errs)
semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).ItemUpload()) semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).ItemUpload())
@ -391,7 +391,7 @@ func restoreItem(
} }
trimmedPath := strings.TrimSuffix(itemPath.String(), metadata.DirMetaFileSuffix) trimmedPath := strings.TrimSuffix(itemPath.String(), metadata.DirMetaFileSuffix)
caches.ParentDirToMeta[trimmedPath] = meta caches.ParentDirToMeta.Store(trimmedPath, meta)
return details.ItemInfo{}, true, nil return details.ItemInfo{}, true, nil
} }
@ -680,10 +680,11 @@ func createRestoreFolders(
caches *restoreCaches, caches *restoreCaches,
) (string, error) { ) (string, error) {
var ( var (
driveID = drivePath.DriveID driveID = drivePath.DriveID
folders = restoreDir.Elements() folders = restoreDir.Elements()
location = path.Builder{}.Append(driveID) location = path.Builder{}.Append(driveID)
parentFolderID = caches.DriveIDToDriveInfo[drivePath.DriveID].rootFolderID parentFolderMeta, _ = caches.DriveIDToDriveInfo.Load(drivePath.DriveID)
parentFolderID = parentFolderMeta.rootFolderID
) )
ctx = clues.Add( ctx = clues.Add(
@ -1121,7 +1122,7 @@ func ensureDriveExists(
// the drive might already be cached by ID. it's okay // the drive might already be cached by ID. it's okay
// if the name has changed. the ID is a better reference // if the name has changed. the ID is a better reference
// anyway. // anyway.
if di, ok := caches.DriveIDToDriveInfo[driveID]; ok { if di, ok := caches.DriveIDToDriveInfo.Load(driveID); ok {
return di, nil return di, nil
} }
@ -1137,7 +1138,7 @@ func ensureDriveExists(
oldName, ok := caches.BackupDriveIDName.NameOf(driveID) oldName, ok := caches.BackupDriveIDName.NameOf(driveID)
if ok { if ok {
// check for drives that currently have the same name // check for drives that currently have the same name
if di, ok := caches.DriveNameToDriveInfo[oldName]; ok { if di, ok := caches.DriveNameToDriveInfo.Load(oldName); ok {
return di, nil return di, nil
} }
@ -1172,5 +1173,7 @@ func ensureDriveExists(
return driveInfo{}, clues.Wrap(err, "adding drive to cache").OrNil() return driveInfo{}, clues.Wrap(err, "adding drive to cache").OrNil()
} }
return caches.DriveIDToDriveInfo[ptr.Val(newDrive.GetId())], nil di, _ := caches.DriveIDToDriveInfo.Load(ptr.Val(newDrive.GetId()))
return di, nil
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/puzpuzpuz/xsync/v2"
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
@ -23,12 +24,12 @@ type driveInfo struct {
type restoreCaches struct { type restoreCaches struct {
BackupDriveIDName idname.Cacher BackupDriveIDName idname.Cacher
collisionKeyToItemID map[string]api.DriveItemIDType collisionKeyToItemID map[string]api.DriveItemIDType
DriveIDToDriveInfo map[string]driveInfo DriveIDToDriveInfo *xsync.MapOf[string, driveInfo]
DriveNameToDriveInfo map[string]driveInfo DriveNameToDriveInfo *xsync.MapOf[string, driveInfo]
Folders *folderCache Folders *folderCache
OldLinkShareIDToNewID map[string]string OldLinkShareIDToNewID *xsync.MapOf[string, string]
OldPermIDToNewID map[string]string OldPermIDToNewID *xsync.MapOf[string, string]
ParentDirToMeta map[string]metadata.Metadata ParentDirToMeta *xsync.MapOf[string, metadata.Metadata]
pool sync.Pool pool sync.Pool
} }
@ -52,8 +53,8 @@ func (rc *restoreCaches) AddDrive(
di.rootFolderID = ptr.Val(root.GetId()) di.rootFolderID = ptr.Val(root.GetId())
rc.DriveIDToDriveInfo[di.id] = di rc.DriveIDToDriveInfo.Store(di.id, di)
rc.DriveNameToDriveInfo[di.name] = di rc.DriveNameToDriveInfo.Store(di.name, di)
return nil return nil
} }
@ -99,12 +100,12 @@ func NewRestoreCaches(
return &restoreCaches{ return &restoreCaches{
BackupDriveIDName: backupDriveIDNames, BackupDriveIDName: backupDriveIDNames,
collisionKeyToItemID: map[string]api.DriveItemIDType{}, collisionKeyToItemID: map[string]api.DriveItemIDType{},
DriveIDToDriveInfo: map[string]driveInfo{}, DriveIDToDriveInfo: xsync.NewMapOf[driveInfo](),
DriveNameToDriveInfo: map[string]driveInfo{}, DriveNameToDriveInfo: xsync.NewMapOf[driveInfo](),
Folders: NewFolderCache(), Folders: NewFolderCache(),
OldLinkShareIDToNewID: map[string]string{}, OldLinkShareIDToNewID: xsync.NewMapOf[string](),
OldPermIDToNewID: map[string]string{}, OldPermIDToNewID: xsync.NewMapOf[string](),
ParentDirToMeta: map[string]metadata.Metadata{}, ParentDirToMeta: xsync.NewMapOf[metadata.Metadata](),
// Buffer pool for uploads // Buffer pool for uploads
pool: sync.Pool{ pool: sync.Pool{
New: func() any { New: func() any {

View File

@ -682,12 +682,12 @@ func (suite *RestoreUnitSuite) TestRestoreCaches_AddDrive() {
test.expectErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
if test.checkValues { if test.checkValues {
idResult := rc.DriveIDToDriveInfo[driveID] idResult, _ := rc.DriveIDToDriveInfo.Load(driveID)
assert.Equal(t, driveID, idResult.id, "drive id") assert.Equal(t, driveID, idResult.id, "drive id")
assert.Equal(t, name, idResult.name, "drive name") assert.Equal(t, name, idResult.name, "drive name")
assert.Equal(t, test.expectID, idResult.rootFolderID, "root folder id") assert.Equal(t, test.expectID, idResult.rootFolderID, "root folder id")
nameResult := rc.DriveNameToDriveInfo[name] nameResult, _ := rc.DriveNameToDriveInfo.Load(name)
assert.Equal(t, driveID, nameResult.id, "drive id") assert.Equal(t, driveID, nameResult.id, "drive id")
assert.Equal(t, name, nameResult.name, "drive name") assert.Equal(t, name, nameResult.name, "drive name")
assert.Equal(t, test.expectID, nameResult.rootFolderID, "root folder id") assert.Equal(t, test.expectID, nameResult.rootFolderID, "root folder id")
@ -783,16 +783,16 @@ func (suite *RestoreUnitSuite) TestRestoreCaches_Populate() {
err := rc.Populate(ctx, gdparf, "shmoo") err := rc.Populate(ctx, gdparf, "shmoo")
test.expectErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
assert.Len(t, rc.DriveIDToDriveInfo, test.expectLen) assert.Equal(t, rc.DriveIDToDriveInfo.Size(), test.expectLen)
assert.Len(t, rc.DriveNameToDriveInfo, test.expectLen) assert.Equal(t, rc.DriveNameToDriveInfo.Size(), test.expectLen)
if test.checkValues { if test.checkValues {
idResult := rc.DriveIDToDriveInfo[driveID] idResult, _ := rc.DriveIDToDriveInfo.Load(driveID)
assert.Equal(t, driveID, idResult.id, "drive id") assert.Equal(t, driveID, idResult.id, "drive id")
assert.Equal(t, name, idResult.name, "drive name") assert.Equal(t, name, idResult.name, "drive name")
assert.Equal(t, rfID, idResult.rootFolderID, "root folder id") assert.Equal(t, rfID, idResult.rootFolderID, "root folder id")
nameResult := rc.DriveNameToDriveInfo[name] nameResult, _ := rc.DriveNameToDriveInfo.Load(name)
assert.Equal(t, driveID, nameResult.id, "drive id") assert.Equal(t, driveID, nameResult.id, "drive id")
assert.Equal(t, name, nameResult.name, "drive name") assert.Equal(t, name, nameResult.name, "drive name")
assert.Equal(t, rfID, nameResult.rootFolderID, "root folder id") assert.Equal(t, rfID, nameResult.rootFolderID, "root folder id")
@ -868,8 +868,8 @@ func (suite *RestoreUnitSuite) TestEnsureDriveExists() {
id: id, id: id,
name: name, name: name,
} }
rc.DriveIDToDriveInfo[id] = di rc.DriveIDToDriveInfo.Store(id, di)
rc.DriveNameToDriveInfo[name] = di rc.DriveNameToDriveInfo.Store(name, di)
return rc return rc
} }
@ -883,8 +883,8 @@ func (suite *RestoreUnitSuite) TestEnsureDriveExists() {
id: "diff", id: "diff",
name: name, name: name,
} }
rc.DriveIDToDriveInfo["diff"] = di rc.DriveIDToDriveInfo.Store("diff", di)
rc.DriveNameToDriveInfo[name] = di rc.DriveNameToDriveInfo.Store(name, di)
return rc return rc
} }
@ -1050,7 +1050,7 @@ func (suite *RestoreUnitSuite) TestEnsureDriveExists() {
assert.Equal(t, test.expectName, di.name, "ensured drive has expected name") assert.Equal(t, test.expectName, di.name, "ensured drive has expected name")
assert.Equal(t, test.expectID, di.id, "ensured drive has expected id") assert.Equal(t, test.expectID, di.id, "ensured drive has expected id")
nameResult := rc.DriveNameToDriveInfo[test.expectName] nameResult, _ := rc.DriveNameToDriveInfo.Load(test.expectName)
assert.Equal(t, test.expectName, nameResult.name, "found drive entry with expected name") assert.Equal(t, test.expectName, nameResult.name, "found drive entry with expected name")
} }
}) })

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/drives" "github.com/microsoftgraph/msgraph-sdk-go/drives"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/puzpuzpuz/xsync/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -334,7 +335,7 @@ func runDriveIncrementalTest(
newFileName = "new_file.txt" newFileName = "new_file.txt"
newFileID string newFileID string
permissionIDMappings = map[string]string{} permissionIDMappings = xsync.NewMapOf[string]()
writePerm = metadata.Permission{ writePerm = metadata.Permission{
ID: "perm-id", ID: "perm-id",
Roles: []string{"write"}, Roles: []string{"write"},