diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index 5fe5befae..5b26f06ea 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -105,13 +106,19 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection suite.T().Run(test.name, func(t *testing.T) { collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t)) require.NoError(t, err) - assert.Equal(t, len(collection), 1) - channel := collection[0].Items() - for object := range channel { - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(object.ToReader()) - assert.NoError(t, err, "received a buf.Read error") + // Categories with delta endpoints will produce a collection for metadata + // as well as the actual data pulled. + assert.GreaterOrEqual(t, len(collection), 1, "expected 1 <= num collections <= 2") + assert.GreaterOrEqual(t, 2, len(collection), "expected 1 <= num collections <= 2") + + for _, col := range collection { + for object := range col.Items() { + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(object.ToReader()) + assert.NoError(t, err, "received a buf.Read error") + } } + status := connector.AwaitStatus() assert.NotZero(t, status.Successful) t.Log(status.String()) @@ -280,6 +287,10 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch() require.NoError(t, err) for _, c := range collections { + if c.FullPath().Service() == path.ExchangeMetadataService { + continue + } + require.NotEmpty(t, c.FullPath().Folder()) folder := c.FullPath().Folder() @@ -356,22 +367,36 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSeria for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { edcs := test.getCollection(t) - require.Equal(t, len(edcs), 1) - edc := edcs[0] - assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder) - streamChannel := edc.Items() - count := 0 - for stream := range streamChannel { - buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - contact, err := support.CreateContactFromBytes(buf.Bytes()) - assert.NotNil(t, contact) - assert.NoError(t, err, "error on converting contact bytes: "+buf.String()) - count++ + require.GreaterOrEqual(t, len(edcs), 1, "expected 1 <= num collections <= 2") + require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2") + + for _, edc := range edcs { + isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService + count := 0 + + for stream := range edc.Items() { + buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + + if isMetadata { + continue + } + + contact, err := support.CreateContactFromBytes(buf.Bytes()) + assert.NotNil(t, contact) + assert.NoError(t, err, "error on converting contact bytes: "+buf.String()) + count++ + } + + if isMetadata { + continue + } + + assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder) + assert.NotZero(t, count) } - assert.NotZero(t, count) status := connector.AwaitStatus() suite.NotNil(status) diff --git a/src/internal/connector/exchange/iterators_test.go b/src/internal/connector/exchange/iterators_test.go index 795b302aa..a686a04c1 100644 --- a/src/internal/connector/exchange/iterators_test.go +++ b/src/internal/connector/exchange/iterators_test.go @@ -1,6 +1,7 @@ package exchange import ( + "encoding/json" "testing" absser "github.com/microsoft/kiota-abstractions-go/serialization" @@ -14,9 +15,97 @@ import ( "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" ) +type ExchangeIteratorUnitSuite struct { + suite.Suite +} + +func TestExchangeIteratorUnitSuite(t *testing.T) { + suite.Run(t, new(ExchangeIteratorUnitSuite)) +} + +func (suite *ExchangeIteratorUnitSuite) TestMakeMetadataCollection() { + tenant := "a-tenant" + user := "a-user" + + table := []struct { + name string + cat path.CategoryType + tokens map[string]string + collectionCheck assert.ValueAssertionFunc + errCheck assert.ErrorAssertionFunc + }{ + { + name: "EmptyTokens", + cat: path.EmailCategory, + tokens: nil, + collectionCheck: assert.Nil, + errCheck: assert.NoError, + }, + { + name: "Tokens", + cat: path.EmailCategory, + tokens: map[string]string{ + "hello": "world", + "hola": "mundo", + }, + collectionCheck: assert.NotNil, + errCheck: assert.NoError, + }, + { + name: "BadCategory", + cat: path.FilesCategory, + tokens: map[string]string{ + "hello": "world", + "hola": "mundo", + }, + collectionCheck: assert.Nil, + errCheck: assert.Error, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + col, err := makeMetadataCollection( + tenant, + user, + test.cat, + test.tokens, + func(*support.ConnectorOperationStatus) {}, + ) + + test.errCheck(t, err) + if err != nil { + return + } + + test.collectionCheck(t, col) + if col == nil { + return + } + + itemCount := 0 + for item := range col.Items() { + gotMap := map[string]string{} + decoder := json.NewDecoder(item.ToReader()) + itemCount++ + + err := decoder.Decode(&gotMap) + if !assert.NoError(t, err) { + continue + } + + assert.Equal(t, test.tokens, gotMap) + } + + assert.Equal(t, 1, itemCount) + }) + } +} + type ExchangeIteratorSuite struct { suite.Suite } diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 5eb22fae4..c1e0f1d26 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -1,7 +1,9 @@ package exchange import ( + "bytes" "context" + "encoding/json" "fmt" "strings" @@ -22,6 +24,8 @@ import ( const ( nextLinkKey = "@odata.nextLink" deltaLinkKey = "@odata.deltaLink" + + metadataKey = "metadata" ) // getAdditionalDataString gets a string value from the AdditionalData map. If @@ -43,6 +47,49 @@ func getAdditionalDataString( return *value } +// makeMetadataCollection creates a metadata collection that has a file +// containing all the delta tokens in tokens. Returns nil if the map does not +// have any entries. +// +// TODO(ashmrtn): Expand this/break it out into multiple functions so that we +// can also store map[container ID]->full container path in a file in the +// metadata collection. +func makeMetadataCollection( + tenant string, + user string, + cat path.CategoryType, + tokens map[string]string, + statusUpdater support.StatusUpdater, +) (data.Collection, error) { + if len(tokens) == 0 { + return nil, nil + } + + buf := &bytes.Buffer{} + encoder := json.NewEncoder(buf) + + if err := encoder.Encode(tokens); err != nil { + return nil, errors.Wrap(err, "serializing delta tokens") + } + + p, err := path.Builder{}.ToServiceCategoryMetadataPath( + tenant, + user, + path.ExchangeService, + cat, + false, + ) + if err != nil { + return nil, errors.Wrap(err, "making path") + } + + return graph.NewMetadataCollection( + p, + []graph.MetadataItem{graph.NewMetadataItem(graph.DeltaTokenFileName, buf.Bytes())}, + statusUpdater, + ), nil +} + // FilterContainersAndFillCollections is a utility function // that places the M365 object ids belonging to specific directories // into a Collection. Messages outside of those directories are omitted. @@ -59,6 +106,8 @@ func FilterContainersAndFillCollections( var ( errs error collectionType = CategoryToOptionIdentifier(qp.Category) + // folder ID -> delta token for folder. + deltaTokens = map[string]string{} ) for _, c := range resolver.Items() { @@ -103,7 +152,7 @@ func FilterContainersAndFillCollections( continue } - jobs, _, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId()) + jobs, token, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId()) if err != nil { errs = support.WrapAndAppend( qp.ResourceOwner, @@ -113,6 +162,23 @@ func FilterContainersAndFillCollections( } edc.jobs = append(edc.jobs, jobs...) + + if len(token) > 0 { + deltaTokens[*c.GetId()] = token + } + } + + col, err := makeMetadataCollection( + qp.Credentials.AzureTenantID, + qp.ResourceOwner, + qp.Category, + deltaTokens, + statusUpdater, + ) + if err != nil { + errs = support.WrapAndAppend("making metadata collection", err, errs) + } else if col != nil { + collections[metadataKey] = col } return errs diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index 7281dbe94..a02978e42 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -9,6 +9,10 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) +// DeltaTokenFileName is the name of the file containing delta token(s) for a +// given endpoint. The endpoint granularity varies by service. +const DeltaTokenFileName = "delta" + type QueryParams struct { Category path.CategoryType ResourceOwner string diff --git a/src/internal/connector/graph_connector_helper_test.go b/src/internal/connector/graph_connector_helper_test.go index 9fcf43d2b..3551fec49 100644 --- a/src/internal/connector/graph_connector_helper_test.go +++ b/src/internal/connector/graph_connector_helper_test.go @@ -684,9 +684,10 @@ func checkCollections( expectedItems int, expected map[string]map[string][]byte, got []data.Collection, -) { +) int { collectionsWithItems := []data.Collection{} + skipped := 0 gotItems := 0 for _, returned := range got { @@ -699,6 +700,18 @@ func checkCollections( // because otherwise we'll deadlock waiting for GC status. Unexpected or // missing collection paths will be reported by checkHasCollections. for item := range returned.Items() { + // Skip metadata collections as they aren't directly related to items to + // backup. Don't add them to the item count either since the item count + // is for actual pull items. + // TODO(ashmrtn): Should probably eventually check some data in metadata + // collections. + if service == path.ExchangeMetadataService || + service == path.OneDriveMetadataService || + service == path.SharePointMetadataService { + skipped++ + continue + } + gotItems++ if expectedColData == nil { @@ -715,6 +728,10 @@ func checkCollections( assert.Equal(t, expectedItems, gotItems, "expected items") checkHasCollections(t, expected, collectionsWithItems) + + // Return how many metadata files were skipped so we can account for it in the + // check on GraphConnector status. + return skipped } type destAndCats struct { diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 0fbf0db03..d7f6fd4f2 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -394,11 +394,11 @@ func runRestoreBackupTest( // Pull the data prior to waiting for the status as otherwise it will // deadlock. - checkCollections(t, totalItems, expectedData, dcs) + skipped := checkCollections(t, totalItems, expectedData, dcs) status = backupGC.AwaitStatus() - assert.Equal(t, totalItems, status.ObjectCount, "status.ObjectCount") - assert.Equal(t, totalItems, status.Successful, "status.Successful") + assert.Equal(t, totalItems+skipped, status.ObjectCount, "status.ObjectCount") + assert.Equal(t, totalItems+skipped, status.Successful, "status.Successful") } func (suite *GraphConnectorIntegrationSuite) TestRestoreAndBackup() { @@ -862,11 +862,11 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames // Pull the data prior to waiting for the status as otherwise it will // deadlock. - checkCollections(t, allItems, allExpectedData, dcs) + skipped := checkCollections(t, allItems, allExpectedData, dcs) status := backupGC.AwaitStatus() - assert.Equal(t, allItems, status.ObjectCount, "status.ObjectCount") - assert.Equal(t, allItems, status.Successful, "status.Successful") + assert.Equal(t, allItems+skipped, status.ObjectCount, "status.ObjectCount") + assert.Equal(t, allItems+skipped, status.Successful, "status.Successful") }) } } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index efba76f98..295196852 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -329,22 +329,20 @@ func getStreamItemFunc( log.Debugw("reading item", "path", itemPath.String()) trace.Log(ctx, "kopia:getStreamItemFunc:item", itemPath.String()) + // Not all items implement StreamInfo. For example, the metadata files + // do not because they don't contain information directly backed up or + // used for restore. If progress does not contain information about a + // finished file it just returns without an error so it's safe to skip + // adding something to it. ei, ok := e.(data.StreamInfo) - if !ok { - errs = multierror.Append( - errs, errors.Errorf("item %q does not implement DataStreamInfo", itemPath)) - - log.Errorw("item does not implement DataStreamInfo; skipping", "path", itemPath) - - continue + if ok { + // Relative path given to us in the callback is missing the root + // element. Add to pending set before calling the callback to avoid race + // conditions when the item is completed. + d := &itemDetails{info: ei.Info(), repoPath: itemPath} + progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } - // Relative path given to us in the callback is missing the root - // element. Add to pending set before calling the callback to avoid race - // conditions when the item is completed. - d := &itemDetails{info: ei.Info(), repoPath: itemPath} - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) - modTime := time.Now() if smt, ok := e.(data.StreamModTime); ok { modTime = smt.ModTime() diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index a85dd27e5..b40c07bba 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -10,13 +10,18 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/connector/exchange" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/events" evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" ) @@ -121,7 +126,7 @@ func prepNewBackupOp( ctx context.Context, bus events.Eventer, sel selectors.Selector, -) (BackupOperation, func()) { +) (BackupOperation, account.Account, *kopia.Wrapper, *kopia.ModelStore, func()) { //revive:enable:context-as-argument acct := tester.NewM365Account(t) @@ -173,7 +178,80 @@ func prepNewBackupOp( t.FailNow() } - return bo, closer + return bo, acct, kw, ms, closer +} + +//revive:disable:context-as-argument +func checkMetadataFilesExist( + t *testing.T, + ctx context.Context, + backupID model.StableID, + kw *kopia.Wrapper, + ms *kopia.ModelStore, + tenant string, + user string, + service path.ServiceType, + category path.CategoryType, + files []string, +) { + //revive:enable:context-as-argument + bup := &backup.Backup{} + + err := ms.Get(ctx, model.BackupSchema, backupID, bup) + if !assert.NoError(t, err) { + return + } + + paths := []path.Path{} + pathsByRef := map[string][]string{} + + for _, fName := range files { + p, err := path.Builder{}. + Append(fName). + ToServiceCategoryMetadataPath(tenant, user, service, category, true) + if !assert.NoError(t, err, "bad metadata path") { + continue + } + + dir, err := p.Dir() + if !assert.NoError(t, err, "parent path") { + continue + } + + paths = append(paths, p) + pathsByRef[dir.ShortRef()] = append(pathsByRef[dir.ShortRef()], fName) + } + + cols, err := kw.RestoreMultipleItems(ctx, bup.SnapshotID, paths, nil) + assert.NoError(t, err) + + for _, col := range cols { + itemNames := []string{} + + for item := range col.Items() { + assert.Implements(t, (*data.StreamSize)(nil), item) + + s := item.(data.StreamSize) + assert.Greaterf( + t, + s.Size(), + int64(0), + "empty metadata file: %s/%s", + col.FullPath(), + item.UUID(), + ) + + itemNames = append(itemNames, item.UUID()) + } + + assert.ElementsMatchf( + t, + pathsByRef[col.FullPath().ShortRef()], + itemNames, + "collection %s missing expected files", + col.FullPath(), + ) + } } type BackupOpIntegrationSuite struct { @@ -245,48 +323,64 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { tests := []struct { name string - selectFunc func() *selectors.Selector + selectFunc func() *selectors.ExchangeBackup }{ { name: "Integration Exchange.Mail", - selectFunc: func() *selectors.Selector { + selectFunc: func() *selectors.ExchangeBackup { sel := selectors.NewExchangeBackup() sel.Include(sel.MailFolders([]string{m365UserID}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - return &sel.Selector + return sel }, }, { name: "Integration Exchange.Contacts", - selectFunc: func() *selectors.Selector { + selectFunc: func() *selectors.ExchangeBackup { sel := selectors.NewExchangeBackup() sel.Include(sel.ContactFolders( []string{m365UserID}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())) - return &sel.Selector + return sel }, }, { name: "Integration Exchange.Events", - selectFunc: func() *selectors.Selector { + selectFunc: func() *selectors.ExchangeBackup { sel := selectors.NewExchangeBackup() sel.Include(sel.EventCalendars([]string{m365UserID}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) - return &sel.Selector + return sel }, }, } for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { mb := evmock.NewBus() - bo, closer := prepNewBackupOp(t, ctx, mb, *test.selectFunc()) + sel := test.selectFunc() + bo, acct, kw, ms, closer := prepNewBackupOp(t, ctx, mb, sel.Selector) defer closer() + failed := false + require.NoError(t, bo.Run(ctx)) require.NotEmpty(t, bo.Results) require.NotEmpty(t, bo.Results.BackupID) - assert.Equalf(t, Completed, bo.Status, "backup status %s is not Completed", bo.Status) + + if !assert.Equalf( + t, + Completed, + bo.Status, + "backup status %s is not Completed", + bo.Status, + ) { + failed = true + } + + if !assert.Less(t, 0, bo.Results.ItemsWritten) { + failed = true + } + assert.Less(t, 0, bo.Results.ItemsRead) - assert.Less(t, 0, bo.Results.ItemsWritten) assert.Less(t, int64(0), bo.Results.BytesRead, "bytes read") assert.Less(t, int64(0), bo.Results.BytesUploaded, "bytes uploaded") assert.Equal(t, 1, bo.Results.ResourceOwners) @@ -297,6 +391,37 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { assert.Equal(t, mb.CalledWith[events.BackupStart][0][events.BackupID], bo.Results.BackupID, "backupID pre-declaration") + + // Check that metadata files with delta tokens were created. Currently + // these files will only be made for contacts and email in Exchange if any + // items were backed up. Events does not support delta queries. + if failed { + return + } + + m365, err := acct.M365Config() + require.NoError(t, err) + + for _, scope := range sel.Scopes() { + cat := scope.Category().PathType() + + if cat != path.EmailCategory && cat != path.ContactsCategory { + return + } + + checkMetadataFilesExist( + t, + ctx, + bo.Results.BackupID, + kw, + ms, + m365.AzureTenantID, + m365UserID, + path.ExchangeService, + cat, + []string{graph.DeltaTokenFileName}, + ) + } }) } } @@ -314,7 +439,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDrive() { sel.Include(sel.Users([]string{m365UserID})) - bo, closer := prepNewBackupOp(t, ctx, mb, sel.Selector) + bo, _, _, _, closer := prepNewBackupOp(t, ctx, mb, sel.Selector) defer closer() require.NoError(t, bo.Run(ctx)) @@ -347,7 +472,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_sharePoint() { sel.Include(sel.Sites([]string{siteID})) - bo, closer := prepNewBackupOp(t, ctx, mb, sel.Selector) + bo, _, _, _, closer := prepNewBackupOp(t, ctx, mb, sel.Selector) defer closer() require.NoError(t, bo.Run(ctx)) diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index e604ef505..67f02946d 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -198,7 +198,9 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { require.NotEmpty(t, bo.Results.BackupID) suite.backupID = bo.Results.BackupID - suite.numItems = bo.Results.ItemsWritten + // Remove delta metadata files for contacts and email as they are not part of + // the data restored. + suite.numItems = bo.Results.ItemsWritten - 2 } func (suite *RestoreOpIntegrationSuite) TearDownSuite() {