consume prevPath on next backup (#1802)
## Description Parses the previousPaths metadata collections along with deltas, and hands paths down to exchange backup collection producers. Does not yet scrutinize previous/current path diffs. ## Type of change - [x] 🌻 Feature ## Issue(s) * #1726 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
3209cd1abf
commit
99b0f51980
16
src/internal/common/maps.go
Normal file
16
src/internal/common/maps.go
Normal file
@ -0,0 +1,16 @@
|
||||
package common
|
||||
|
||||
// UnionMaps produces a new map containing all the values of the other
|
||||
// maps. The last maps have the highes priority. Key collisions with
|
||||
// earlier maps will favor the last map with that key.
|
||||
func UnionMaps[K comparable, V any](ms ...map[K]V) map[K]V {
|
||||
r := map[K]V{}
|
||||
|
||||
for _, m := range ms {
|
||||
for k, v := range m {
|
||||
r[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
@ -126,7 +126,7 @@ func verifyBackupInputs(sels selectors.Selector, userPNs, siteIDs []string) erro
|
||||
func (gc *GraphConnector) createExchangeCollections(
|
||||
ctx context.Context,
|
||||
scope selectors.ExchangeScope,
|
||||
deltas map[string]string,
|
||||
dps exchange.DeltaPaths,
|
||||
ctrlOpts control.Options,
|
||||
) ([]data.Collection, error) {
|
||||
var (
|
||||
@ -161,7 +161,7 @@ func (gc *GraphConnector) createExchangeCollections(
|
||||
gc.UpdateStatus,
|
||||
resolver,
|
||||
scope,
|
||||
deltas,
|
||||
dps,
|
||||
ctrlOpts)
|
||||
|
||||
if err != nil {
|
||||
@ -202,14 +202,15 @@ func (gc *GraphConnector) ExchangeDataCollection(
|
||||
errs error
|
||||
)
|
||||
|
||||
_, deltas, err := exchange.ParseMetadataCollections(ctx, metadata)
|
||||
cdps, err := exchange.ParseMetadataCollections(ctx, metadata)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, scope := range scopes {
|
||||
// Creates a map of collections based on scope
|
||||
dcs, err := gc.createExchangeCollections(ctx, scope, deltas, control.Options{})
|
||||
dps := cdps[scope.Category().PathType()]
|
||||
|
||||
dcs, err := gc.createExchangeCollections(ctx, scope, dps, control.Options{})
|
||||
if err != nil {
|
||||
user := scope.Get(selectors.ExchangeUser)
|
||||
return nil, support.WrapAndAppend(user[0], err, errs)
|
||||
|
||||
@ -286,7 +286,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch()
|
||||
|
||||
for _, test := range tests {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
collections, err := gc.createExchangeCollections(ctx, test.scope, nil, control.Options{})
|
||||
collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, c := range collections {
|
||||
@ -338,7 +338,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() {
|
||||
for _, test := range tests {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
// get collections without providing any delta history (ie: full backup)
|
||||
collections, err := gc.createExchangeCollections(ctx, test.scope, nil, control.Options{})
|
||||
collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{})
|
||||
require.NoError(t, err)
|
||||
assert.Less(t, 1, len(collections), "retrieved metadata and data collections")
|
||||
|
||||
@ -352,12 +352,14 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() {
|
||||
|
||||
require.NotNil(t, metadata, "collections contains a metadata collection")
|
||||
|
||||
_, deltas, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata})
|
||||
cdps, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata})
|
||||
require.NoError(t, err)
|
||||
|
||||
dps := cdps[test.scope.Category().PathType()]
|
||||
|
||||
// now do another backup with the previous delta tokens,
|
||||
// which should only contain the difference.
|
||||
collections, err = gc.createExchangeCollections(ctx, test.scope, deltas, control.Options{})
|
||||
collections, err = gc.createExchangeCollections(ctx, test.scope, dps, control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// TODO(keepers): this isn't a very useful test at the moment. It needs to
|
||||
@ -383,11 +385,15 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializ
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
t := suite.T()
|
||||
connector := loadConnector(ctx, t, Users)
|
||||
sel := selectors.NewExchangeBackup()
|
||||
var (
|
||||
t = suite.T()
|
||||
connector = loadConnector(ctx, t, Users)
|
||||
sel = selectors.NewExchangeBackup()
|
||||
)
|
||||
|
||||
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
||||
collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil, control.Options{})
|
||||
|
||||
collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], exchange.DeltaPaths{}, control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, edc := range collection {
|
||||
@ -396,9 +402,11 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializ
|
||||
// Verify that each message can be restored
|
||||
for stream := range streamChannel {
|
||||
buf := &bytes.Buffer{}
|
||||
|
||||
read, err := buf.ReadFrom(stream.ToReader())
|
||||
assert.NoError(t, err)
|
||||
assert.NotZero(t, read)
|
||||
|
||||
message, err := support.CreateMessageFromBytes(buf.Bytes())
|
||||
assert.NotNil(t, message)
|
||||
assert.NoError(t, err)
|
||||
@ -430,7 +438,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSeria
|
||||
scope := selectors.
|
||||
NewExchangeBackup().
|
||||
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
|
||||
collections, err := connector.createExchangeCollections(ctx, scope, nil, control.Options{})
|
||||
collections, err := connector.createExchangeCollections(ctx, scope, exchange.DeltaPaths{}, control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
return collections
|
||||
@ -497,7 +505,12 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
|
||||
getCollection: func(t *testing.T) []data.Collection {
|
||||
sel := selectors.NewExchangeBackup()
|
||||
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
|
||||
collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil, control.Options{})
|
||||
|
||||
collections, err := connector.createExchangeCollections(
|
||||
ctx,
|
||||
sel.Scopes()[0],
|
||||
exchange.DeltaPaths{},
|
||||
control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
return collections
|
||||
@ -509,7 +522,12 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
|
||||
getCollection: func(t *testing.T) []data.Collection {
|
||||
sel := selectors.NewExchangeBackup()
|
||||
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
|
||||
collections, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], nil, control.Options{})
|
||||
|
||||
collections, err := connector.createExchangeCollections(
|
||||
ctx,
|
||||
sel.Scopes()[0],
|
||||
exchange.DeltaPaths{},
|
||||
control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
return collections
|
||||
@ -538,7 +556,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
|
||||
assert.NotZero(t, read)
|
||||
event, err := support.CreateEventFromBytes(buf.Bytes())
|
||||
assert.NotNil(t, event)
|
||||
assert.NoError(t, err, "experienced error parsing event bytes: "+buf.String())
|
||||
assert.NoError(t, err, "creating event from bytes: "+buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -22,45 +22,75 @@ func MetadataFileNames(cat path.CategoryType) []string {
|
||||
}
|
||||
}
|
||||
|
||||
// ParseMetadataCollections produces two maps:
|
||||
// 1- paths: folderID->filePath, used to look up previous folder pathing
|
||||
// in case of a name change or relocation.
|
||||
// 2- deltas: folderID->deltaToken, used to look up previous delta token
|
||||
// retrievals.
|
||||
type CatDeltaPaths map[path.CategoryType]DeltaPaths
|
||||
|
||||
type DeltaPaths struct {
|
||||
deltas map[string]string
|
||||
paths map[string]string
|
||||
}
|
||||
|
||||
func makeDeltaPaths() DeltaPaths {
|
||||
return DeltaPaths{
|
||||
deltas: map[string]string{},
|
||||
paths: map[string]string{},
|
||||
}
|
||||
}
|
||||
|
||||
// ParseMetadataCollections produces a map of structs holding delta
|
||||
// and path lookup maps.
|
||||
func ParseMetadataCollections(
|
||||
ctx context.Context,
|
||||
colls []data.Collection,
|
||||
) (map[string]string, map[string]string, error) {
|
||||
var (
|
||||
paths = map[string]string{}
|
||||
deltas = map[string]string{}
|
||||
)
|
||||
) (CatDeltaPaths, error) {
|
||||
cdp := CatDeltaPaths{
|
||||
path.ContactsCategory: makeDeltaPaths(),
|
||||
path.EmailCategory: makeDeltaPaths(),
|
||||
path.EventsCategory: makeDeltaPaths(),
|
||||
}
|
||||
|
||||
for _, coll := range colls {
|
||||
items := coll.Items()
|
||||
var (
|
||||
breakLoop bool
|
||||
items = coll.Items()
|
||||
category = coll.FullPath().Category()
|
||||
)
|
||||
|
||||
for {
|
||||
var breakLoop bool
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, nil, errors.Wrap(ctx.Err(), "parsing collection metadata")
|
||||
return nil, errors.Wrap(ctx.Err(), "parsing collection metadata")
|
||||
|
||||
case item, ok := <-items:
|
||||
if !ok {
|
||||
breakLoop = true
|
||||
break
|
||||
}
|
||||
|
||||
switch item.UUID() {
|
||||
// case graph.PreviousPathFileName:
|
||||
case graph.DeltaTokenFileName:
|
||||
err := json.NewDecoder(item.ToReader()).Decode(&deltas)
|
||||
m := map[string]string{}
|
||||
cdps := cdp[category]
|
||||
|
||||
err := json.NewDecoder(item.ToReader()).Decode(&m)
|
||||
if err != nil {
|
||||
return nil, nil, errors.New("parsing delta token map")
|
||||
return nil, errors.New("decoding metadata json")
|
||||
}
|
||||
|
||||
breakLoop = true
|
||||
switch item.UUID() {
|
||||
case graph.PreviousPathFileName:
|
||||
if len(cdps.paths) > 0 {
|
||||
return nil, errors.Errorf("multiple versions of %s path metadata", category)
|
||||
}
|
||||
|
||||
cdps.paths = m
|
||||
|
||||
case graph.DeltaTokenFileName:
|
||||
if len(cdps.deltas) > 0 {
|
||||
return nil, errors.Errorf("multiple versions of %s delta metadata", category)
|
||||
}
|
||||
|
||||
cdps.deltas = m
|
||||
}
|
||||
|
||||
cdp[category] = cdps
|
||||
}
|
||||
|
||||
if breakLoop {
|
||||
@ -69,5 +99,5 @@ func ParseMetadataCollections(
|
||||
}
|
||||
}
|
||||
|
||||
return paths, deltas, nil
|
||||
return cdp, nil
|
||||
}
|
||||
|
||||
@ -37,6 +37,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
name string
|
||||
data []fileValues
|
||||
expectDeltas map[string]string
|
||||
expectPaths map[string]string
|
||||
expectError assert.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "delta urls",
|
||||
@ -46,6 +48,47 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
expectDeltas: map[string]string{
|
||||
"key": "delta-link",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "multiple delta urls",
|
||||
data: []fileValues{
|
||||
{graph.DeltaTokenFileName, "delta-link"},
|
||||
{graph.DeltaTokenFileName, "delta-link-2"},
|
||||
},
|
||||
expectError: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "previous path",
|
||||
data: []fileValues{
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expectPaths: map[string]string{
|
||||
"key": "prev-path",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "multiple previous paths",
|
||||
data: []fileValues{
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
{graph.PreviousPathFileName, "prev-path-2"},
|
||||
},
|
||||
expectError: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "delta urls and previous paths",
|
||||
data: []fileValues{
|
||||
{graph.DeltaTokenFileName, "delta-link"},
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expectDeltas: map[string]string{
|
||||
"key": "delta-link",
|
||||
},
|
||||
expectPaths: map[string]string{
|
||||
"key": "prev-path",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "delta urls with special chars",
|
||||
@ -55,6 +98,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
expectDeltas: map[string]string{
|
||||
"key": "`!@#$%^&*()_[]{}/\"\\",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "delta urls with escaped chars",
|
||||
@ -64,6 +108,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
expectDeltas: map[string]string{
|
||||
"key": "\\n\\r\\t\\b\\f\\v\\0\\\\",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "delta urls with newline char runes",
|
||||
@ -76,6 +121,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
expectDeltas: map[string]string{
|
||||
"key": "\\n",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
@ -102,12 +148,19 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
colls = append(colls, coll)
|
||||
}
|
||||
|
||||
_, deltas, err := ParseMetadataCollections(ctx, colls)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, deltas, "deltas")
|
||||
cdps, err := ParseMetadataCollections(ctx, colls)
|
||||
test.expectError(t, err)
|
||||
|
||||
emails := cdps[path.EmailCategory]
|
||||
deltas, paths := emails.deltas, emails.paths
|
||||
|
||||
for k, v := range test.expectDeltas {
|
||||
assert.Equal(t, v, deltas[k], "deltas elements")
|
||||
}
|
||||
|
||||
for k, v := range test.expectPaths {
|
||||
assert.Equal(t, v, paths[k], "deltas elements")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,17 +58,21 @@ type Collection struct {
|
||||
|
||||
collectionType optionIdentifier
|
||||
statusUpdater support.StatusUpdater
|
||||
// FullPath is the slice representation of the action context passed down through the hierarchy.
|
||||
// The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
|
||||
ctrl control.Options
|
||||
|
||||
// FullPath is the current hierarchical path used by this collection.
|
||||
fullPath path.Path
|
||||
|
||||
ctrl control.Options
|
||||
// PrevPath is the previous hierarchical path used by this collection.
|
||||
// It may be the same as fullPath, if the folder was not renamed or
|
||||
// moved. It will be empty on its first retrieval.
|
||||
prevPath path.Path
|
||||
}
|
||||
|
||||
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
|
||||
func NewCollection(
|
||||
user string,
|
||||
fullPath path.Path,
|
||||
fullPath, prevPath path.Path,
|
||||
collectionType optionIdentifier,
|
||||
service graph.Servicer,
|
||||
statusUpdater support.StatusUpdater,
|
||||
@ -81,6 +85,7 @@ func NewCollection(
|
||||
service: service,
|
||||
statusUpdater: statusUpdater,
|
||||
fullPath: fullPath,
|
||||
prevPath: prevPath,
|
||||
collectionType: collectionType,
|
||||
ctrl: ctrlOpts,
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
@ -30,7 +31,7 @@ func FilterContainersAndFillCollections(
|
||||
statusUpdater support.StatusUpdater,
|
||||
resolver graph.ContainerResolver,
|
||||
scope selectors.ExchangeScope,
|
||||
oldDeltas map[string]string,
|
||||
dps DeltaPaths,
|
||||
ctrlOpts control.Options,
|
||||
) error {
|
||||
var (
|
||||
@ -38,7 +39,7 @@ func FilterContainersAndFillCollections(
|
||||
oi = CategoryToOptionIdentifier(qp.Category)
|
||||
// folder ID -> delta url for folder.
|
||||
deltaURLs = map[string]string{}
|
||||
prevPaths = map[string]string{}
|
||||
currPaths = map[string]string{}
|
||||
)
|
||||
|
||||
for _, c := range resolver.Items() {
|
||||
@ -46,12 +47,26 @@ func FilterContainersAndFillCollections(
|
||||
return errs
|
||||
}
|
||||
|
||||
cID := *c.GetId()
|
||||
|
||||
dirPath, ok := pathAndMatch(qp, c, scope)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
cID := *c.GetId()
|
||||
var prevPath path.Path
|
||||
|
||||
if ps, ok := dps.paths[cID]; ok {
|
||||
// see below for the issue with building paths for root
|
||||
// folders that have no displayName.
|
||||
ps = strings.TrimSuffix(ps, rootFolderAlias)
|
||||
|
||||
if pp, err := path.FromDataLayerPath(ps, false); err != nil {
|
||||
logger.Ctx(ctx).Error("parsing previous path string")
|
||||
} else {
|
||||
prevPath = pp
|
||||
}
|
||||
}
|
||||
|
||||
// Create only those that match
|
||||
service, err := createService(qp.Credentials)
|
||||
@ -63,6 +78,7 @@ func FilterContainersAndFillCollections(
|
||||
edc := NewCollection(
|
||||
qp.ResourceOwner,
|
||||
dirPath,
|
||||
prevPath,
|
||||
oi,
|
||||
service,
|
||||
statusUpdater,
|
||||
@ -76,7 +92,7 @@ func FilterContainersAndFillCollections(
|
||||
continue
|
||||
}
|
||||
|
||||
jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, cID, oldDeltas[cID])
|
||||
jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, cID, dps.deltas[cID])
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
}
|
||||
@ -89,11 +105,11 @@ func FilterContainersAndFillCollections(
|
||||
|
||||
// add the current path for the container ID to be used in the next backup
|
||||
// as the "previous path", for reference in case of a rename or relocation.
|
||||
prevPaths[cID] = dirPath.Folder()
|
||||
currPaths[cID] = dirPath.Folder()
|
||||
}
|
||||
|
||||
entries := []graph.MetadataCollectionEntry{
|
||||
graph.NewMetadataEntry(graph.PreviousPathFileName, prevPaths),
|
||||
graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths),
|
||||
}
|
||||
|
||||
if len(deltaURLs) > 0 {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user