Persist Exchange delta tokens (#1722)

## Description

Begin persisting Exchange delta tokens for data categories that support delta queries. Tokens are stored in a single file as a `map[M365 container ID]token` where both the container ID and token are of type `string`. The file is located in the kopia snapshot that has all the other backup data at the path `tenant-id/{service}Metadata/user/category/delta`. No information about the delta token file is stored in backup details.

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* closes #1685 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-08 10:01:00 -08:00 committed by GitHub
parent 4c976298d4
commit e15d86e82f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 383 additions and 57 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
)
@ -105,13 +106,19 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
suite.T().Run(test.name, func(t *testing.T) {
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t))
require.NoError(t, err)
assert.Equal(t, len(collection), 1)
channel := collection[0].Items()
for object := range channel {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.NoError(t, err, "received a buf.Read error")
// Categories with delta endpoints will produce a collection for metadata
// as well as the actual data pulled.
assert.GreaterOrEqual(t, len(collection), 1, "expected 1 <= num collections <= 2")
assert.GreaterOrEqual(t, 2, len(collection), "expected 1 <= num collections <= 2")
for _, col := range collection {
for object := range col.Items() {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.NoError(t, err, "received a buf.Read error")
}
}
status := connector.AwaitStatus()
assert.NotZero(t, status.Successful)
t.Log(status.String())
@ -280,6 +287,10 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch()
require.NoError(t, err)
for _, c := range collections {
if c.FullPath().Service() == path.ExchangeMetadataService {
continue
}
require.NotEmpty(t, c.FullPath().Folder())
folder := c.FullPath().Folder()
@ -356,22 +367,36 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSeria
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
edcs := test.getCollection(t)
require.Equal(t, len(edcs), 1)
edc := edcs[0]
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
streamChannel := edc.Items()
count := 0
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
contact, err := support.CreateContactFromBytes(buf.Bytes())
assert.NotNil(t, contact)
assert.NoError(t, err, "error on converting contact bytes: "+buf.String())
count++
require.GreaterOrEqual(t, len(edcs), 1, "expected 1 <= num collections <= 2")
require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2")
for _, edc := range edcs {
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
count := 0
for stream := range edc.Items() {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
if isMetadata {
continue
}
contact, err := support.CreateContactFromBytes(buf.Bytes())
assert.NotNil(t, contact)
assert.NoError(t, err, "error on converting contact bytes: "+buf.String())
count++
}
if isMetadata {
continue
}
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
assert.NotZero(t, count)
}
assert.NotZero(t, count)
status := connector.AwaitStatus()
suite.NotNil(status)

View File

@ -1,6 +1,7 @@
package exchange
import (
"encoding/json"
"testing"
absser "github.com/microsoft/kiota-abstractions-go/serialization"
@ -14,9 +15,97 @@ import (
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
)
type ExchangeIteratorUnitSuite struct {
suite.Suite
}
func TestExchangeIteratorUnitSuite(t *testing.T) {
suite.Run(t, new(ExchangeIteratorUnitSuite))
}
func (suite *ExchangeIteratorUnitSuite) TestMakeMetadataCollection() {
tenant := "a-tenant"
user := "a-user"
table := []struct {
name string
cat path.CategoryType
tokens map[string]string
collectionCheck assert.ValueAssertionFunc
errCheck assert.ErrorAssertionFunc
}{
{
name: "EmptyTokens",
cat: path.EmailCategory,
tokens: nil,
collectionCheck: assert.Nil,
errCheck: assert.NoError,
},
{
name: "Tokens",
cat: path.EmailCategory,
tokens: map[string]string{
"hello": "world",
"hola": "mundo",
},
collectionCheck: assert.NotNil,
errCheck: assert.NoError,
},
{
name: "BadCategory",
cat: path.FilesCategory,
tokens: map[string]string{
"hello": "world",
"hola": "mundo",
},
collectionCheck: assert.Nil,
errCheck: assert.Error,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
col, err := makeMetadataCollection(
tenant,
user,
test.cat,
test.tokens,
func(*support.ConnectorOperationStatus) {},
)
test.errCheck(t, err)
if err != nil {
return
}
test.collectionCheck(t, col)
if col == nil {
return
}
itemCount := 0
for item := range col.Items() {
gotMap := map[string]string{}
decoder := json.NewDecoder(item.ToReader())
itemCount++
err := decoder.Decode(&gotMap)
if !assert.NoError(t, err) {
continue
}
assert.Equal(t, test.tokens, gotMap)
}
assert.Equal(t, 1, itemCount)
})
}
}
type ExchangeIteratorSuite struct {
suite.Suite
}

View File

@ -1,7 +1,9 @@
package exchange
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
@ -22,6 +24,8 @@ import (
const (
nextLinkKey = "@odata.nextLink"
deltaLinkKey = "@odata.deltaLink"
metadataKey = "metadata"
)
// getAdditionalDataString gets a string value from the AdditionalData map. If
@ -43,6 +47,49 @@ func getAdditionalDataString(
return *value
}
// makeMetadataCollection creates a metadata collection that has a file
// containing all the delta tokens in tokens. Returns nil if the map does not
// have any entries.
//
// TODO(ashmrtn): Expand this/break it out into multiple functions so that we
// can also store map[container ID]->full container path in a file in the
// metadata collection.
func makeMetadataCollection(
tenant string,
user string,
cat path.CategoryType,
tokens map[string]string,
statusUpdater support.StatusUpdater,
) (data.Collection, error) {
if len(tokens) == 0 {
return nil, nil
}
buf := &bytes.Buffer{}
encoder := json.NewEncoder(buf)
if err := encoder.Encode(tokens); err != nil {
return nil, errors.Wrap(err, "serializing delta tokens")
}
p, err := path.Builder{}.ToServiceCategoryMetadataPath(
tenant,
user,
path.ExchangeService,
cat,
false,
)
if err != nil {
return nil, errors.Wrap(err, "making path")
}
return graph.NewMetadataCollection(
p,
[]graph.MetadataItem{graph.NewMetadataItem(graph.DeltaTokenFileName, buf.Bytes())},
statusUpdater,
), nil
}
// FilterContainersAndFillCollections is a utility function
// that places the M365 object ids belonging to specific directories
// into a Collection. Messages outside of those directories are omitted.
@ -59,6 +106,8 @@ func FilterContainersAndFillCollections(
var (
errs error
collectionType = CategoryToOptionIdentifier(qp.Category)
// folder ID -> delta token for folder.
deltaTokens = map[string]string{}
)
for _, c := range resolver.Items() {
@ -103,7 +152,7 @@ func FilterContainersAndFillCollections(
continue
}
jobs, _, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId())
jobs, token, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId())
if err != nil {
errs = support.WrapAndAppend(
qp.ResourceOwner,
@ -113,6 +162,23 @@ func FilterContainersAndFillCollections(
}
edc.jobs = append(edc.jobs, jobs...)
if len(token) > 0 {
deltaTokens[*c.GetId()] = token
}
}
col, err := makeMetadataCollection(
qp.Credentials.AzureTenantID,
qp.ResourceOwner,
qp.Category,
deltaTokens,
statusUpdater,
)
if err != nil {
errs = support.WrapAndAppend("making metadata collection", err, errs)
} else if col != nil {
collections[metadataKey] = col
}
return errs

View File

@ -9,6 +9,10 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)
// DeltaTokenFileName is the name of the file containing delta token(s) for a
// given endpoint. The endpoint granularity varies by service.
const DeltaTokenFileName = "delta"
type QueryParams struct {
Category path.CategoryType
ResourceOwner string

View File

@ -684,9 +684,10 @@ func checkCollections(
expectedItems int,
expected map[string]map[string][]byte,
got []data.Collection,
) {
) int {
collectionsWithItems := []data.Collection{}
skipped := 0
gotItems := 0
for _, returned := range got {
@ -699,6 +700,18 @@ func checkCollections(
// because otherwise we'll deadlock waiting for GC status. Unexpected or
// missing collection paths will be reported by checkHasCollections.
for item := range returned.Items() {
// Skip metadata collections as they aren't directly related to items to
// backup. Don't add them to the item count either since the item count
// is for actual pull items.
// TODO(ashmrtn): Should probably eventually check some data in metadata
// collections.
if service == path.ExchangeMetadataService ||
service == path.OneDriveMetadataService ||
service == path.SharePointMetadataService {
skipped++
continue
}
gotItems++
if expectedColData == nil {
@ -715,6 +728,10 @@ func checkCollections(
assert.Equal(t, expectedItems, gotItems, "expected items")
checkHasCollections(t, expected, collectionsWithItems)
// Return how many metadata files were skipped so we can account for it in the
// check on GraphConnector status.
return skipped
}
type destAndCats struct {

View File

@ -394,11 +394,11 @@ func runRestoreBackupTest(
// Pull the data prior to waiting for the status as otherwise it will
// deadlock.
checkCollections(t, totalItems, expectedData, dcs)
skipped := checkCollections(t, totalItems, expectedData, dcs)
status = backupGC.AwaitStatus()
assert.Equal(t, totalItems, status.ObjectCount, "status.ObjectCount")
assert.Equal(t, totalItems, status.Successful, "status.Successful")
assert.Equal(t, totalItems+skipped, status.ObjectCount, "status.ObjectCount")
assert.Equal(t, totalItems+skipped, status.Successful, "status.Successful")
}
func (suite *GraphConnectorIntegrationSuite) TestRestoreAndBackup() {
@ -862,11 +862,11 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
// Pull the data prior to waiting for the status as otherwise it will
// deadlock.
checkCollections(t, allItems, allExpectedData, dcs)
skipped := checkCollections(t, allItems, allExpectedData, dcs)
status := backupGC.AwaitStatus()
assert.Equal(t, allItems, status.ObjectCount, "status.ObjectCount")
assert.Equal(t, allItems, status.Successful, "status.Successful")
assert.Equal(t, allItems+skipped, status.ObjectCount, "status.ObjectCount")
assert.Equal(t, allItems+skipped, status.Successful, "status.Successful")
})
}
}

View File

@ -329,22 +329,20 @@ func getStreamItemFunc(
log.Debugw("reading item", "path", itemPath.String())
trace.Log(ctx, "kopia:getStreamItemFunc:item", itemPath.String())
// Not all items implement StreamInfo. For example, the metadata files
// do not because they don't contain information directly backed up or
// used for restore. If progress does not contain information about a
// finished file it just returns without an error so it's safe to skip
// adding something to it.
ei, ok := e.(data.StreamInfo)
if !ok {
errs = multierror.Append(
errs, errors.Errorf("item %q does not implement DataStreamInfo", itemPath))
log.Errorw("item does not implement DataStreamInfo; skipping", "path", itemPath)
continue
if ok {
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
modTime := time.Now()
if smt, ok := e.(data.StreamModTime); ok {
modTime = smt.ModTime()

View File

@ -10,13 +10,18 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/events"
evmock "github.com/alcionai/corso/src/internal/events/mock"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store"
)
@ -121,7 +126,7 @@ func prepNewBackupOp(
ctx context.Context,
bus events.Eventer,
sel selectors.Selector,
) (BackupOperation, func()) {
) (BackupOperation, account.Account, *kopia.Wrapper, *kopia.ModelStore, func()) {
//revive:enable:context-as-argument
acct := tester.NewM365Account(t)
@ -173,7 +178,80 @@ func prepNewBackupOp(
t.FailNow()
}
return bo, closer
return bo, acct, kw, ms, closer
}
//revive:disable:context-as-argument
func checkMetadataFilesExist(
t *testing.T,
ctx context.Context,
backupID model.StableID,
kw *kopia.Wrapper,
ms *kopia.ModelStore,
tenant string,
user string,
service path.ServiceType,
category path.CategoryType,
files []string,
) {
//revive:enable:context-as-argument
bup := &backup.Backup{}
err := ms.Get(ctx, model.BackupSchema, backupID, bup)
if !assert.NoError(t, err) {
return
}
paths := []path.Path{}
pathsByRef := map[string][]string{}
for _, fName := range files {
p, err := path.Builder{}.
Append(fName).
ToServiceCategoryMetadataPath(tenant, user, service, category, true)
if !assert.NoError(t, err, "bad metadata path") {
continue
}
dir, err := p.Dir()
if !assert.NoError(t, err, "parent path") {
continue
}
paths = append(paths, p)
pathsByRef[dir.ShortRef()] = append(pathsByRef[dir.ShortRef()], fName)
}
cols, err := kw.RestoreMultipleItems(ctx, bup.SnapshotID, paths, nil)
assert.NoError(t, err)
for _, col := range cols {
itemNames := []string{}
for item := range col.Items() {
assert.Implements(t, (*data.StreamSize)(nil), item)
s := item.(data.StreamSize)
assert.Greaterf(
t,
s.Size(),
int64(0),
"empty metadata file: %s/%s",
col.FullPath(),
item.UUID(),
)
itemNames = append(itemNames, item.UUID())
}
assert.ElementsMatchf(
t,
pathsByRef[col.FullPath().ShortRef()],
itemNames,
"collection %s missing expected files",
col.FullPath(),
)
}
}
type BackupOpIntegrationSuite struct {
@ -245,48 +323,64 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
tests := []struct {
name string
selectFunc func() *selectors.Selector
selectFunc func() *selectors.ExchangeBackup
}{
{
name: "Integration Exchange.Mail",
selectFunc: func() *selectors.Selector {
selectFunc: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{m365UserID}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
return &sel.Selector
return sel
},
},
{
name: "Integration Exchange.Contacts",
selectFunc: func() *selectors.Selector {
selectFunc: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup()
sel.Include(sel.ContactFolders(
[]string{m365UserID},
[]string{exchange.DefaultContactFolder},
selectors.PrefixMatch()))
return &sel.Selector
return sel
},
},
{
name: "Integration Exchange.Events",
selectFunc: func() *selectors.Selector {
selectFunc: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{m365UserID}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
return &sel.Selector
return sel
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
mb := evmock.NewBus()
bo, closer := prepNewBackupOp(t, ctx, mb, *test.selectFunc())
sel := test.selectFunc()
bo, acct, kw, ms, closer := prepNewBackupOp(t, ctx, mb, sel.Selector)
defer closer()
failed := false
require.NoError(t, bo.Run(ctx))
require.NotEmpty(t, bo.Results)
require.NotEmpty(t, bo.Results.BackupID)
assert.Equalf(t, Completed, bo.Status, "backup status %s is not Completed", bo.Status)
if !assert.Equalf(
t,
Completed,
bo.Status,
"backup status %s is not Completed",
bo.Status,
) {
failed = true
}
if !assert.Less(t, 0, bo.Results.ItemsWritten) {
failed = true
}
assert.Less(t, 0, bo.Results.ItemsRead)
assert.Less(t, 0, bo.Results.ItemsWritten)
assert.Less(t, int64(0), bo.Results.BytesRead, "bytes read")
assert.Less(t, int64(0), bo.Results.BytesUploaded, "bytes uploaded")
assert.Equal(t, 1, bo.Results.ResourceOwners)
@ -297,6 +391,37 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
assert.Equal(t,
mb.CalledWith[events.BackupStart][0][events.BackupID],
bo.Results.BackupID, "backupID pre-declaration")
// Check that metadata files with delta tokens were created. Currently
// these files will only be made for contacts and email in Exchange if any
// items were backed up. Events does not support delta queries.
if failed {
return
}
m365, err := acct.M365Config()
require.NoError(t, err)
for _, scope := range sel.Scopes() {
cat := scope.Category().PathType()
if cat != path.EmailCategory && cat != path.ContactsCategory {
return
}
checkMetadataFilesExist(
t,
ctx,
bo.Results.BackupID,
kw,
ms,
m365.AzureTenantID,
m365UserID,
path.ExchangeService,
cat,
[]string{graph.DeltaTokenFileName},
)
}
})
}
}
@ -314,7 +439,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDrive() {
sel.Include(sel.Users([]string{m365UserID}))
bo, closer := prepNewBackupOp(t, ctx, mb, sel.Selector)
bo, _, _, _, closer := prepNewBackupOp(t, ctx, mb, sel.Selector)
defer closer()
require.NoError(t, bo.Run(ctx))
@ -347,7 +472,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_sharePoint() {
sel.Include(sel.Sites([]string{siteID}))
bo, closer := prepNewBackupOp(t, ctx, mb, sel.Selector)
bo, _, _, _, closer := prepNewBackupOp(t, ctx, mb, sel.Selector)
defer closer()
require.NoError(t, bo.Run(ctx))

View File

@ -198,7 +198,9 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
require.NotEmpty(t, bo.Results.BackupID)
suite.backupID = bo.Results.BackupID
suite.numItems = bo.Results.ItemsWritten
// Remove delta metadata files for contacts and email as they are not part of
// the data restored.
suite.numItems = bo.Results.ItemsWritten - 2
}
func (suite *RestoreOpIntegrationSuite) TearDownSuite() {