diff --git a/src/internal/kopia/snapshot_manager.go b/src/internal/kopia/snapshot_manager.go new file mode 100644 index 000000000..12f768fbd --- /dev/null +++ b/src/internal/kopia/snapshot_manager.go @@ -0,0 +1,242 @@ +package kopia + +import ( + "context" + "sort" + + "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/snapshot" + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +const ( + // Kopia does not do comparisons properly for empty tags right now so add some + // placeholder value to them. + defaultTagValue = "0" + + // Kopia CLI prefixes all user tags with "tag:"[1]. Maintaining this will + // ensure we don't accidentally take reserved tags and that tags can be + // displayed with kopia CLI. + // (permalinks) + // [1] https://github.com/kopia/kopia/blob/05e729a7858a6e86cb48ba29fb53cb6045efce2b/cli/command_snapshot_create.go#L169 + userTagPrefix = "tag:" +) + +type snapshotManager interface { + FindManifests( + ctx context.Context, + tags map[string]string, + ) ([]*manifest.EntryMetadata, error) + LoadSnapshots(ctx context.Context, ids []manifest.ID) ([]*snapshot.Manifest, error) +} + +type ownersCats struct { + resourceOwners map[string]struct{} + serviceCats map[string]struct{} +} + +func serviceCatTag(p path.Path) string { + return p.Service().String() + p.Category().String() +} + +func makeTagKV(k string) (string, string) { + return userTagPrefix + k, defaultTagValue +} + +// tagsFromStrings returns a map[string]string with tags for all ownersCats +// passed in. Currently uses placeholder values for each tag because there can +// be multiple instances of resource owners and categories in a single snapshot. +func tagsFromStrings(oc *ownersCats) map[string]string { + res := make(map[string]string, len(oc.serviceCats)+len(oc.resourceOwners)) + + for k := range oc.serviceCats { + tk, tv := makeTagKV(k) + res[tk] = tv + } + + for k := range oc.resourceOwners { + tk, tv := makeTagKV(k) + res[tk] = tv + } + + return res +} + +// getLastIdx searches for manifests contained in both foundMans and metas +// and returns the most recent complete manifest index. If no complete manifest +// is in both lists returns -1. +func getLastIdx( + foundMans map[manifest.ID]*snapshot.Manifest, + metas []*manifest.EntryMetadata, +) int { + // Minor optimization: the current code seems to return the entries from + // earliest timestamp to latest (this is undocumented). Sort in the same + // fashion so that we don't incur a bunch of swaps. + sort.Slice(metas, func(i, j int) bool { + return metas[i].ModTime.Before(metas[j].ModTime) + }) + + // Search newest to oldest. + for i := len(metas) - 1; i >= 0; i-- { + m := foundMans[metas[i].ID] + if m == nil || len(m.IncompleteReason) > 0 { + continue + } + + return i + } + + return -1 +} + +// manifestsSinceLastComplete searches through mans and returns the most recent +// complete manifest (if one exists) and maybe the most recent incomplete +// manifest. If the newest incomplete manifest is more recent than the newest +// complete manifest then adds it to the returned list. Otherwise no incomplete +// manifest is returned. Returns nil if there are no complete or incomplete +// manifests in mans. +func manifestsSinceLastComplete( + mans []*snapshot.Manifest, +) []*snapshot.Manifest { + var ( + res []*snapshot.Manifest + foundIncomplete = false + ) + + // Manifests should maintain the sort order of the original IDs that were used + // to fetch the data, but just in case sort oldest to newest. + mans = snapshot.SortByTime(mans, false) + + for i := len(mans) - 1; i >= 0; i-- { + m := mans[i] + + if len(m.IncompleteReason) > 0 { + if !foundIncomplete { + foundIncomplete = true + + res = append(res, m) + } + + continue + } + + // Once we find a complete snapshot we're done, even if we haven't + // found an incomplete one yet. + res = append(res, m) + + break + } + + return res +} + +// fetchPrevManifests returns the most recent, as-of-yet unfound complete and +// (maybe) incomplete manifests in metas. If the most recent incomplete manifest +// is older than the most recent complete manifest no incomplete manifest is +// returned. If only incomplete manifests exists, returns the most recent one. +// Returns no manifests if an error occurs. +func fetchPrevManifests( + ctx context.Context, + sm snapshotManager, + foundMans map[manifest.ID]*snapshot.Manifest, + tags map[string]string, +) ([]*snapshot.Manifest, error) { + metas, err := sm.FindManifests(ctx, tags) + if err != nil { + return nil, errors.Wrap(err, "fetching manifest metas by tag") + } + + if len(metas) == 0 { + return nil, nil + } + + lastCompleteIdx := getLastIdx(foundMans, metas) + + // We have a complete cached snapshot and it's the most recent. No need + // to do anything else. + if lastCompleteIdx == len(metas)-1 { + return nil, nil + } + + // TODO(ashmrtn): Remainder of the function can be simplified if we can inject + // different tags to the snapshot checkpoints than the complete snapshot. + + // Fetch all manifests newer than the oldest complete snapshot. A little + // wasteful as we may also re-fetch the most recent incomplete manifest, but + // it reduces the complexity of returning the most recent incomplete manifest + // if it is newer than the most recent complete manifest. + ids := make([]manifest.ID, 0, len(metas)-(lastCompleteIdx+1)) + for i := lastCompleteIdx + 1; i < len(metas); i++ { + ids = append(ids, metas[i].ID) + } + + mans, err := sm.LoadSnapshots(ctx, ids) + if err != nil { + return nil, errors.Wrap(err, "fetching previous manifests") + } + + return manifestsSinceLastComplete(mans), nil +} + +// fetchPrevSnapshotManifests returns a set of manifests for complete and maybe +// incomplete snapshots for the given (resource owner, service, category) +// tuples. Up to two manifests can be returned per tuple: one complete and one +// incomplete. An incomplete manifest may be returned if it is newer than the +// newest complete manifest for the tuple. Manifests are deduped such that if +// multiple tuples match the same manifest it will only be returned once. +// +// TODO(ashmrtn): Use to get previous manifests so backup can find previously +// uploaded versions of a file. +func fetchPrevSnapshotManifests( + ctx context.Context, + sm snapshotManager, + oc *ownersCats, +) []*snapshot.Manifest { + mans := map[manifest.ID]*snapshot.Manifest{} + + // For each serviceCat/resource owner pair that we will be backing up, see if + // there's a previous incomplete snapshot and/or a previous complete snapshot + // we can pass in. Can be expanded to return more than the most recent + // snapshots, but may require more memory at runtime. + for serviceCat := range oc.serviceCats { + serviceTagKey, serviceTagValue := makeTagKV(serviceCat) + + for resourceOwner := range oc.resourceOwners { + resourceOwnerTagKey, resourceOwnerTagValue := makeTagKV(resourceOwner) + + tags := map[string]string{ + serviceTagKey: serviceTagValue, + resourceOwnerTagKey: resourceOwnerTagValue, + } + + found, err := fetchPrevManifests(ctx, sm, mans, tags) + if err != nil { + logger.Ctx(ctx).Warnw( + "fetching previous snapshot manifests for service/category/resource owner", + "error", + err, + "service/category", + serviceCat, + ) + + // Snapshot can still complete fine, just not as efficient. + continue + } + + // If we found more recent snapshots then add them. + for _, m := range found { + mans[m.ID] = m + } + } + } + + res := make([]*snapshot.Manifest, 0, len(mans)) + for _, m := range mans { + res = append(res, m) + } + + return res +} diff --git a/src/internal/kopia/snapshot_manager_test.go b/src/internal/kopia/snapshot_manager_test.go new file mode 100644 index 000000000..cfd046f01 --- /dev/null +++ b/src/internal/kopia/snapshot_manager_test.go @@ -0,0 +1,540 @@ +package kopia + +import ( + "context" + "testing" + "time" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/snapshot" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/path" +) + +const ( + testCompleteMan = false + testIncompleteMan = !testCompleteMan +) + +var ( + testT1 = time.Now() + testT2 = testT1.Add(1 * time.Hour) + testT3 = testT2.Add(1 * time.Hour) + + testID1 = manifest.ID("snap1") + testID2 = manifest.ID("snap2") + testID3 = manifest.ID("snap3") + + testMail = path.ExchangeService.String() + path.EmailCategory.String() + testEvents = path.ExchangeService.String() + path.EventsCategory.String() + testUser1 = "user1" + testUser2 = "user2" + testUser3 = "user3" + + testAllUsersAllCats = &ownersCats{ + resourceOwners: map[string]struct{}{ + testUser1: {}, + testUser2: {}, + testUser3: {}, + }, + serviceCats: map[string]struct{}{ + testMail: {}, + testEvents: {}, + }, + } + testAllUsersMail = &ownersCats{ + resourceOwners: map[string]struct{}{ + testUser1: {}, + testUser2: {}, + testUser3: {}, + }, + serviceCats: map[string]struct{}{ + testMail: {}, + }, + } +) + +type manifestInfo struct { + // We don't currently use the values in the tags. + tags map[string]struct{} + metadata *manifest.EntryMetadata + man *snapshot.Manifest +} + +func newManifestInfo( + id manifest.ID, + modTime time.Time, + incomplete bool, + tags ...string, +) manifestInfo { + incompleteStr := "" + if incomplete { + incompleteStr = "checkpoint" + } + + structTags := make(map[string]struct{}, len(tags)) + + for _, t := range tags { + tk, _ := makeTagKV(t) + structTags[tk] = struct{}{} + } + + return manifestInfo{ + tags: structTags, + metadata: &manifest.EntryMetadata{ + ID: id, + ModTime: modTime, + }, + man: &snapshot.Manifest{ + ID: id, + StartTime: fs.UTCTimestamp(modTime.UnixNano()), + IncompleteReason: incompleteStr, + }, + } +} + +type mockSnapshotManager struct { + data []manifestInfo + loadCallback func(ids []manifest.ID) +} + +func matchesTags(mi manifestInfo, tags map[string]string) bool { + for k := range tags { + if _, ok := mi.tags[k]; !ok { + return false + } + } + + return true +} + +func (msm *mockSnapshotManager) FindManifests( + ctx context.Context, + tags map[string]string, +) ([]*manifest.EntryMetadata, error) { + if msm == nil { + return nil, assert.AnError + } + + res := []*manifest.EntryMetadata{} + + for _, mi := range msm.data { + if matchesTags(mi, tags) { + res = append(res, mi.metadata) + } + } + + return res, nil +} + +func (msm *mockSnapshotManager) LoadSnapshots( + ctx context.Context, + ids []manifest.ID, +) ([]*snapshot.Manifest, error) { + if msm == nil { + return nil, assert.AnError + } + + // Allow checking set of IDs passed in. + if msm.loadCallback != nil { + msm.loadCallback(ids) + } + + res := []*snapshot.Manifest{} + + for _, id := range ids { + for _, mi := range msm.data { + if mi.man.ID == id { + res = append(res, mi.man) + } + } + } + + return res, nil +} + +type SnapshotFetchUnitSuite struct { + suite.Suite +} + +func TestSnapshotFetchUnitSuite(t *testing.T) { + suite.Run(t, new(SnapshotFetchUnitSuite)) +} + +func (suite *SnapshotFetchUnitSuite) TestFetchPrevSnapshots() { + table := []struct { + name string + input *ownersCats + data []manifestInfo + // Use this to denote which manifests in data should be expected. Allows + // defining data in a table while not repeating things between data and + // expected. + expectedIdxs []int + // Expected number of times a manifest should try to be loaded from kopia. + // Used to check that caching is functioning properly. + expectedLoadCounts map[manifest.ID]int + }{ + { + name: "AllOneSnapshot", + input: testAllUsersAllCats, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testEvents, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{0}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 1, + }, + }, + { + name: "SplitByCategory", + input: testAllUsersAllCats, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testCompleteMan, + testEvents, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{0, 1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 1, + testID2: 1, + }, + }, + { + name: "IncompleteNewerThanComplete", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testIncompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{0, 1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 1, + testID2: 3, + }, + }, + { + name: "IncompleteOlderThanComplete", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testIncompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 1, + testID2: 1, + }, + }, + { + name: "OnlyIncomplete", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testIncompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{0}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 3, + }, + }, + { + name: "NewestComplete", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 1, + testID2: 1, + }, + }, + { + name: "NewestIncomplete", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testIncompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testIncompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + }, + expectedIdxs: []int{1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 3, + testID2: 3, + }, + }, + { + name: "SomeCachedSomeNewer", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testCompleteMan, + testMail, + testUser3, + ), + }, + expectedIdxs: []int{0, 1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 2, + testID2: 1, + }, + }, + { + name: "SomeCachedSomeNewerIncomplete", + input: testAllUsersMail, + data: []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testUser1, + testUser2, + testUser3, + ), + newManifestInfo( + testID2, + testT2, + testIncompleteMan, + testMail, + testUser3, + ), + }, + expectedIdxs: []int{0, 1}, + expectedLoadCounts: map[manifest.ID]int{ + testID1: 1, + testID2: 1, + }, + }, + { + name: "NoMatches", + input: testAllUsersMail, + data: nil, + expectedIdxs: nil, + // Stop failure for nil-map comparison. + expectedLoadCounts: map[manifest.ID]int{}, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + + msm := &mockSnapshotManager{ + data: test.data, + } + + loadCounts := map[manifest.ID]int{} + msm.loadCallback = func(ids []manifest.ID) { + for _, id := range ids { + loadCounts[id]++ + } + } + + snaps := fetchPrevSnapshotManifests(ctx, msm, test.input) + + expected := make([]*snapshot.Manifest, 0, len(test.expectedIdxs)) + for _, i := range test.expectedIdxs { + expected = append(expected, test.data[i].man) + } + + assert.ElementsMatch(t, expected, snaps) + + // Need to manually check because we don't know the order the + // user/service/category labels will be iterated over. For some tests this + // could cause more loads than the ideal case. + assert.Len(t, loadCounts, len(test.expectedLoadCounts)) + for id, count := range loadCounts { + assert.GreaterOrEqual(t, test.expectedLoadCounts[id], count) + } + }) + } +} + +// mockErrorSnapshotManager returns an error the first time LoadSnapshot and +// FindSnapshot are called. After that it passes the calls through to the +// contained snapshotManager. +type mockErrorSnapshotManager struct { + retFindErr bool + retLoadErr bool + sm snapshotManager +} + +func (msm *mockErrorSnapshotManager) FindManifests( + ctx context.Context, + tags map[string]string, +) ([]*manifest.EntryMetadata, error) { + if !msm.retFindErr { + msm.retFindErr = true + return nil, assert.AnError + } + + return msm.sm.FindManifests(ctx, tags) +} + +func (msm *mockErrorSnapshotManager) LoadSnapshots( + ctx context.Context, + ids []manifest.ID, +) ([]*snapshot.Manifest, error) { + if !msm.retLoadErr { + msm.retLoadErr = true + return nil, assert.AnError + } + + return msm.sm.LoadSnapshots(ctx, ids) +} + +func (suite *SnapshotFetchUnitSuite) TestFetchPrevSnapshotsWorksWithErrors() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + + input := testAllUsersMail + mockData := []manifestInfo{ + newManifestInfo( + testID1, + testT1, + testCompleteMan, + testMail, + testUser1, + ), + newManifestInfo( + testID2, + testT2, + testCompleteMan, + testMail, + testUser2, + ), + newManifestInfo( + testID3, + testT3, + testCompleteMan, + testMail, + testUser3, + ), + } + + expected := []*snapshot.Manifest{mockData[2].man} + + msm := &mockErrorSnapshotManager{ + sm: &mockSnapshotManager{ + data: mockData, + }, + } + + snaps := fetchPrevSnapshotManifests(ctx, msm, input) + + assert.ElementsMatch(t, expected, snaps) +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index af1ebdcb7..3420cc4e0 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -522,32 +522,6 @@ func (w Wrapper) BackupCollections( return s, progress.deets, nil } -type ownersCats struct { - resourceOwners map[string]struct{} - serviceCats map[string]struct{} -} - -func serviceCatTag(p path.Path) string { - return p.Service().String() + p.Category().String() -} - -// tagsFromStrings returns a map[string]string with the union of both maps -// passed in. Currently uses empty values for each tag because there can be -// multiple instances of resource owners and categories in a single snapshot. -func tagsFromStrings(oc *ownersCats) map[string]string { - res := make(map[string]string, len(oc.serviceCats)+len(oc.resourceOwners)) - - for k := range oc.serviceCats { - res[k] = "" - } - - for k := range oc.resourceOwners { - res[k] = "" - } - - return res -} - func (w Wrapper) makeSnapshotWithRoot( ctx context.Context, root fs.Directory, diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 8bf2b155f..797c81883 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -851,11 +851,18 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { 42, ), } - expectedTags := map[string]string{ - serviceCatTag(suite.testPath1): "", - suite.testPath1.ResourceOwner(): "", - serviceCatTag(suite.testPath2): "", - suite.testPath2.ResourceOwner(): "", + + baseTagKeys := []string{ + serviceCatTag(suite.testPath1), + suite.testPath1.ResourceOwner(), + serviceCatTag(suite.testPath2), + suite.testPath2.ResourceOwner(), + } + expectedTags := map[string]string{} + + for _, k := range baseTagKeys { + tk, tv := makeTagKV(k) + expectedTags[tk] = tv } stats, deets, err := suite.w.BackupCollections(suite.ctx, collections, path.ExchangeService)