consume old deltas on exchange mail backup (#1768)

## Description

When backing up exchange data, parse the
metadata collection of delta urls from prior runs
(if any exist) and pass those tokens along to the
fetch functions for re-use.

## Type of change

- [x] 🌻 Feature

## Issue(s)

* #1725

## Test Plan

- [x]  Unit test
This commit is contained in:
Keepers 2022-12-13 12:03:33 -07:00 committed by GitHub
parent d8172de67e
commit 414d2a490f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 255 additions and 77 deletions

View File

@ -32,7 +32,7 @@ import (
func (gc *GraphConnector) DataCollections(
ctx context.Context,
sels selectors.Selector,
metadataCols []data.Collection,
metadata []data.Collection,
) ([]data.Collection, error) {
ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String()))
defer end()
@ -42,11 +42,9 @@ func (gc *GraphConnector) DataCollections(
return nil, err
}
// serialize metadata into maps here
switch sels.Service {
case selectors.ServiceExchange:
return gc.ExchangeDataCollection(ctx, sels)
return gc.ExchangeDataCollection(ctx, sels, metadata)
case selectors.ServiceOneDrive:
return gc.OneDriveDataCollections(ctx, sels)
case selectors.ServiceSharePoint:
@ -119,6 +117,7 @@ func verifyBackupInputs(sels selectors.Selector, userPNs, siteIDs []string) erro
func (gc *GraphConnector) createExchangeCollections(
ctx context.Context,
scope selectors.ExchangeScope,
deltas map[string]string,
) ([]data.Collection, error) {
var (
errs *multierror.Error
@ -152,7 +151,8 @@ func (gc *GraphConnector) createExchangeCollections(
collections,
gc.UpdateStatus,
resolver,
scope)
scope,
deltas)
if err != nil {
return nil, errors.Wrap(err, "filling collections")
@ -178,6 +178,7 @@ func (gc *GraphConnector) createExchangeCollections(
func (gc *GraphConnector) ExchangeDataCollection(
ctx context.Context,
selector selectors.Selector,
metadata []data.Collection,
) ([]data.Collection, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
@ -190,9 +191,14 @@ func (gc *GraphConnector) ExchangeDataCollection(
errs error
)
_, deltas, err := exchange.ParseMetadataCollections(ctx, metadata)
if err != nil {
return nil, err
}
for _, scope := range scopes {
// Creates a map of collections based on scope
dcs, err := gc.createExchangeCollections(ctx, scope)
dcs, err := gc.createExchangeCollections(ctx, scope, deltas)
if err != nil {
user := scope.Get(selectors.ExchangeUser)
return nil, support.WrapAndAppend(user[0], err, errs)

View File

@ -104,7 +104,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t))
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t), nil)
require.NoError(t, err)
// Categories with delta endpoints will produce a collection for metadata
// as well as the actual data pulled.
@ -283,7 +283,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch()
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections, err := gc.createExchangeCollections(ctx, test.scope)
collections, err := gc.createExchangeCollections(ctx, test.scope, nil)
require.NoError(t, err)
for _, c := range collections {
@ -302,6 +302,77 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch()
}
}
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() {
ctx, flush := tester.NewContext()
defer flush()
var (
userID = tester.M365UserID(suite.T())
gc = loadConnector(ctx, suite.T(), Users)
)
tests := []struct {
name string
scope selectors.ExchangeScope
}{
{
name: "Mail",
scope: selectors.NewExchangeBackup().MailFolders(
[]string{userID},
[]string{exchange.DefaultMailFolder},
selectors.PrefixMatch(),
)[0],
},
{
name: "Contacts",
scope: selectors.NewExchangeBackup().ContactFolders(
[]string{userID},
[]string{exchange.DefaultContactFolder},
selectors.PrefixMatch(),
)[0],
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
// get collections without providing any delta history (ie: full backup)
collections, err := gc.createExchangeCollections(ctx, test.scope, nil)
require.NoError(t, err)
assert.Less(t, 1, len(collections), "retrieved metadata and data collections")
var metadata data.Collection
for _, coll := range collections {
if coll.FullPath().Service() == path.ExchangeMetadataService {
metadata = coll
}
}
require.NotNil(t, metadata, "collections contains a metadata collection")
_, deltas, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata})
require.NoError(t, err)
// now do another backup with the previous delta tokens,
// which should only contain the difference.
collections, err = gc.createExchangeCollections(ctx, test.scope, deltas)
require.NoError(t, err)
// TODO(keepers): this isn't a very useful test at the moment. It needs to
// investigate the items in the original and delta collections to at least
// assert some minimum assumptions, such as "deltas should retrieve fewer items".
// Delta usage is commented out at the moment, anyway. So this is currently
// a sanity check that the minimum behavior won't break.
for _, coll := range collections {
if coll.FullPath().Service() != path.ExchangeMetadataService {
ec, ok := coll.(*exchange.Collection)
require.True(t, ok, "collection is *exchange.Collection")
assert.NotNil(t, ec)
}
}
})
}
}
// TestMailSerializationRegression verifies that all mail data stored in the
// test account can be successfully downloaded into bytes and restored into
// M365 mail objects
@ -313,7 +384,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializ
connector := loadConnector(ctx, t, Users)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0])
collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil)
require.NoError(t, err)
for _, edc := range collection {
@ -356,7 +427,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSeria
scope := selectors.
NewExchangeBackup().
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
collections, err := connector.createExchangeCollections(ctx, scope)
collections, err := connector.createExchangeCollections(ctx, scope, nil)
require.NoError(t, err)
return collections
@ -423,7 +494,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
getCollection: func(t *testing.T) []data.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0])
collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil)
require.NoError(t, err)
return collections
@ -435,7 +506,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
getCollection: func(t *testing.T) []data.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0])
collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil)
require.NoError(t, err)
return collections
@ -468,31 +539,6 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
}
}
// TestAccessOfInboxAllUsers verifies that GraphConnector can
// support `--users *` for backup operations. Selector.DiscreteScopes
// returns all of the users within one scope. Only users who have
// messages in their inbox will have a collection returned.
// The final test insures that more than a 75% of the user collections are
// returned. If an error was experienced, the test will fail overall
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestAccessOfInboxAllUsers() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
connector := loadConnector(ctx, t, Users)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
scopes := sel.DiscreteScopes(connector.GetUsers())
for _, scope := range scopes {
users := scope.Get(selectors.ExchangeUser)
standard := (len(users) / 4) * 3
collections, err := connector.createExchangeCollections(ctx, scope)
require.NoError(t, err)
suite.Greater(len(collections), standard)
}
}
// ---------------------------------------------------------------------------
// CreateSharePointCollection tests
// ---------------------------------------------------------------------------

View File

@ -0,0 +1,61 @@
package exchange
import (
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/data"
)
// ParseMetadataCollections produces two maps:
// 1- paths: folderID->filePath, used to look up previous folder pathing
// in case of a name change or relocation.
// 2- deltas: folderID->deltaToken, used to look up previous delta token
// retrievals.
func ParseMetadataCollections(
ctx context.Context,
colls []data.Collection,
) (map[string]string, map[string]string, error) {
var (
paths = map[string]string{}
deltas = map[string]string{}
)
for _, coll := range colls {
items := coll.Items()
for {
var breakLoop bool
select {
case <-ctx.Done():
return nil, nil, errors.Wrap(ctx.Err(), "parsing collection metadata")
case item, ok := <-items:
if !ok {
breakLoop = true
break
}
switch item.UUID() {
// case graph.PreviousPathFileName:
case graph.DeltaTokenFileName:
err := json.NewDecoder(item.ToReader()).Decode(&deltas)
if err != nil {
return nil, nil, errors.New("parsing delta token map")
}
breakLoop = true
}
}
if breakLoop {
break
}
}
}
return paths, deltas, nil
}

View File

@ -0,0 +1,55 @@
package exchange
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/path"
)
// ---------------------------------------------------------------------------
// Unit tests
// ---------------------------------------------------------------------------
type DataCollectionsUnitSuite struct {
suite.Suite
}
func TestDataCollectionsUnitSuite(t *testing.T) {
suite.Run(t, new(DataCollectionsUnitSuite))
}
func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
t := suite.T()
ctx, flush := tester.NewContext()
defer flush()
bs, err := json.Marshal(map[string]string{"key": "token"})
require.NoError(t, err)
p, err := path.Builder{}.ToServiceCategoryMetadataPath(
"t", "u",
path.ExchangeService,
path.EmailCategory,
false,
)
require.NoError(t, err)
item := []graph.MetadataItem{graph.NewMetadataItem(graph.DeltaTokenFileName, bs)}
mdcoll := graph.NewMetadataCollection(p, item, func(cos *support.ConnectorOperationStatus) {})
colls := []data.Collection{mdcoll}
_, deltas, err := ParseMetadataCollections(ctx, colls)
require.NoError(t, err)
assert.NotEmpty(t, deltas, "delta urls")
assert.Equal(t, "token", deltas["key"])
}

View File

@ -113,18 +113,6 @@ func CategoryToOptionIdentifier(category path.CategoryType) optionIdentifier {
// which reduces the overall latency of complex calls
// -----------------------------------------------------------------------
// Delta requests for mail and contacts have the same parameters and config
// structs.
type DeltaRequestBuilderGetQueryParameters struct {
Count *bool `uriparametername:"%24count"`
Filter *string `uriparametername:"%24filter"`
Orderby []string `uriparametername:"%24orderby"`
Search *string `uriparametername:"%24search"`
Select []string `uriparametername:"%24select"`
Skip *int32 `uriparametername:"%24skip"`
Top *int32 `uriparametername:"%24top"`
}
func optionsForFolderMessagesDelta(
moreOps []string,
) (*msuser.UsersItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration, error) {

View File

@ -78,12 +78,13 @@ func FilterContainersAndFillCollections(
statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
scope selectors.ExchangeScope,
oldDeltas map[string]string,
) error {
var (
errs error
collectionType = CategoryToOptionIdentifier(qp.Category)
// folder ID -> delta token for folder.
deltaTokens = map[string]string{}
// folder ID -> delta url for folder.
deltaURLs = map[string]string{}
)
for _, c := range resolver.Items() {
@ -128,7 +129,10 @@ func FilterContainersAndFillCollections(
continue
}
jobs, token, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId())
dirID := *c.GetId()
oldDelta := oldDeltas[dirID]
jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, dirID, oldDelta)
if err != nil {
errs = support.WrapAndAppend(
qp.ResourceOwner,
@ -139,8 +143,8 @@ func FilterContainersAndFillCollections(
edc.jobs = append(edc.jobs, jobs...)
if len(token) > 0 {
deltaTokens[*c.GetId()] = token
if len(delta) > 0 {
deltaURLs[dirID] = delta
}
}
@ -148,7 +152,7 @@ func FilterContainersAndFillCollections(
qp.Credentials.AzureTenantID,
qp.ResourceOwner,
qp.Category,
deltaTokens,
deltaURLs,
statusUpdater,
)
if err != nil {
@ -214,7 +218,7 @@ func IterativeCollectCalendarContainers(
type FetchIDFunc func(
ctx context.Context,
gs graph.Servicer,
user, containerID string,
user, containerID, oldDeltaToken string,
) ([]string, string, error)
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
@ -234,7 +238,7 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
func FetchEventIDsFromCalendar(
ctx context.Context,
gs graph.Servicer,
user, calendarID string,
user, calendarID, oldDelta string,
) ([]string, string, error) {
var (
errs *multierror.Error
@ -288,17 +292,17 @@ func FetchEventIDsFromCalendar(
func FetchContactIDsFromDirectory(
ctx context.Context,
gs graph.Servicer,
user, directoryID string,
user, directoryID, oldDelta string,
) ([]string, string, error) {
var (
errs *multierror.Error
ids []string
deltaToken string
errs *multierror.Error
ids []string
deltaURL string
)
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
if err != nil {
return nil, deltaToken, errors.Wrap(err, "getting query options")
return nil, deltaURL, errors.Wrap(err, "getting query options")
}
builder := gs.Client().
@ -307,10 +311,16 @@ func FetchContactIDsFromDirectory(
Contacts().
Delta()
// TODO(rkeepers): Awaiting full integration of incremental support, else this
// will cause unexpected behavior/errors.
// if len(oldDelta) > 0 {
// builder = msuser.NewUsersItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter())
// }
for {
resp, err := builder.Get(ctx, options)
if err != nil {
return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
@ -329,7 +339,7 @@ func FetchContactIDsFromDirectory(
delta := resp.GetOdataDeltaLink()
if delta != nil && len(*delta) > 0 {
deltaToken = *delta
deltaURL = *delta
}
nextLink := resp.GetOdataNextLink()
@ -340,7 +350,7 @@ func FetchContactIDsFromDirectory(
builder = msuser.NewUsersItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
}
return ids, deltaToken, errs.ErrorOrNil()
return ids, deltaURL, errs.ErrorOrNil()
}
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
@ -348,17 +358,17 @@ func FetchContactIDsFromDirectory(
func FetchMessageIDsFromDirectory(
ctx context.Context,
gs graph.Servicer,
user, directoryID string,
user, directoryID, oldDelta string,
) ([]string, string, error) {
var (
errs *multierror.Error
ids []string
deltaToken string
errs *multierror.Error
ids []string
deltaURL string
)
options, err := optionsForFolderMessagesDelta([]string{"id"})
if err != nil {
return nil, deltaToken, errors.Wrap(err, "getting query options")
return nil, deltaURL, errors.Wrap(err, "getting query options")
}
builder := gs.Client().
@ -367,10 +377,16 @@ func FetchMessageIDsFromDirectory(
Messages().
Delta()
// TODO(rkeepers): Awaiting full integration of incremental support, else this
// will cause unexpected behavior/errors.
// if len(oldDelta) > 0 {
// builder = msuser.NewUsersItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())
// }
for {
resp, err := builder.Get(ctx, options)
if err != nil {
return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
@ -389,7 +405,7 @@ func FetchMessageIDsFromDirectory(
delta := resp.GetOdataDeltaLink()
if delta != nil && len(*delta) > 0 {
deltaToken = *delta
deltaURL = *delta
}
nextLink := resp.GetOdataNextLink()
@ -400,5 +416,5 @@ func FetchMessageIDsFromDirectory(
builder = msuser.NewUsersItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
}
return ids, deltaToken, errs.ErrorOrNil()
return ids, deltaURL, errs.ErrorOrNil()
}

View File

@ -127,7 +127,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
}
}()
_, mdColls, err := produceManifestsAndMetadata(ctx, op.kopia, op.store, op.Selectors, op.account)
mans, mdColls, err := produceManifestsAndMetadata(ctx, op.kopia, op.store, op.Selectors, op.account)
if err != nil {
opStats.readErr = errors.Wrap(err, "connecting to M365")
return opStats.readErr
@ -145,7 +145,13 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
return opStats.readErr
}
opStats.k, backupDetails, err = consumeBackupDataCollections(ctx, op.kopia, op.Selectors, cs, op.Results.BackupID)
opStats.k, backupDetails, err = consumeBackupDataCollections(
ctx,
op.kopia,
op.Selectors,
mans,
cs,
op.Results.BackupID)
if err != nil {
opStats.writeErr = errors.Wrap(err, "backing up service data")
return opStats.writeErr
@ -326,6 +332,7 @@ func consumeBackupDataCollections(
ctx context.Context,
kw *kopia.Wrapper,
sel selectors.Selector,
mans []*snapshot.Manifest,
cs []data.Collection,
backupID model.StableID,
) (*kopia.BackupStats, *details.Details, error) {
@ -341,7 +348,7 @@ func consumeBackupDataCollections(
kopia.TagBackupCategory: "",
}
return kw.BackupCollections(ctx, nil, cs, sel.PathService(), tags)
return kw.BackupCollections(ctx, mans, cs, sel.PathService(), tags)
}
// writes the results metrics to the operation results.

View File

@ -19,7 +19,6 @@ func TestRepositoryModelSuite(t *testing.T) {
if err := tester.RunOnAny(
tester.CorsoCITests,
tester.CorsoRepositoryTests,
"flomp",
); err != nil {
t.Skip(err)
}