produce previousPaths metadata (#1799)

## Description

Adds an additional metadata collection: a folder
id to path string mapping.  This collection is
created on backup, and retrieved along with
the delta metadata on the next backup, but is
not yet parsed or utilzed downstream.

## Type of change

- [x] 🌻 Feature

## Issue(s)

* #1726

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2022-12-16 11:09:25 -07:00 committed by GitHub
parent 016b924757
commit 703acbdcf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 358 additions and 252 deletions

View File

@ -520,9 +520,15 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections := test.getCollection(t)
require.Equal(t, len(collections), 1)
edc := collections[0]
assert.Equal(t, edc.FullPath().Folder(), test.expected)
require.Equal(t, len(collections), 2)
for _, edc := range collections {
if edc.FullPath().Service() != path.ExchangeMetadataService {
assert.Equal(t, test.expected, edc.FullPath().Folder())
} else {
assert.Equal(t, "", edc.FullPath().Folder())
}
streamChannel := edc.Items()
for stream := range streamChannel {
@ -534,6 +540,7 @@ func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerial
assert.NotNil(t, event)
assert.NoError(t, err, "experienced error parsing event bytes: "+buf.String())
}
}
status := connector.AwaitStatus()
suite.NotNil(status)

View File

@ -8,8 +8,20 @@ import (
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/path"
)
// MetadataFileNames produces the category-specific set of filenames used to
// store graph metadata such as delta tokens and folderID->path references.
func MetadataFileNames(cat path.CategoryType) []string {
switch cat {
case path.EmailCategory, path.ContactsCategory:
return []string{graph.DeltaTokenFileName, graph.PreviousPathFileName}
default:
return []string{graph.PreviousPathFileName}
}
}
// ParseMetadataCollections produces two maps:
// 1- paths: folderID->filePath, used to look up previous folder pathing
// in case of a name change or relocation.

View File

@ -28,12 +28,65 @@ func TestDataCollectionsUnitSuite(t *testing.T) {
}
func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
t := suite.T()
ctx, flush := tester.NewContext()
type fileValues struct {
fileName string
value string
}
table := []struct {
name string
data []fileValues
expectDeltas map[string]string
}{
{
name: "delta urls",
data: []fileValues{
{graph.DeltaTokenFileName, "delta-link"},
},
expectDeltas: map[string]string{
"key": "delta-link",
},
},
{
name: "delta urls with special chars",
data: []fileValues{
{graph.DeltaTokenFileName, "`!@#$%^&*()_[]{}/\"\\"},
},
expectDeltas: map[string]string{
"key": "`!@#$%^&*()_[]{}/\"\\",
},
},
{
name: "delta urls with escaped chars",
data: []fileValues{
{graph.DeltaTokenFileName, `\n\r\t\b\f\v\0\\`},
},
expectDeltas: map[string]string{
"key": "\\n\\r\\t\\b\\f\\v\\0\\\\",
},
},
{
name: "delta urls with newline char runes",
data: []fileValues{
// rune(92) = \, rune(110) = n. Ensuring it's not possible to
// error in serializing/deserializing and produce a single newline
// character from those two runes.
{graph.DeltaTokenFileName, string([]rune{rune(92), rune(110)})},
},
expectDeltas: map[string]string{
"key": "\\n",
},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
bs, err := json.Marshal(map[string]string{"key": "token"})
colls := []data.Collection{}
for _, d := range test.data {
bs, err := json.Marshal(map[string]string{"key": d.value})
require.NoError(t, err)
p, err := path.Builder{}.ToServiceCategoryMetadataPath(
@ -44,12 +97,17 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
)
require.NoError(t, err)
item := []graph.MetadataItem{graph.NewMetadataItem(graph.DeltaTokenFileName, bs)}
mdcoll := graph.NewMetadataCollection(p, item, func(cos *support.ConnectorOperationStatus) {})
colls := []data.Collection{mdcoll}
item := []graph.MetadataItem{graph.NewMetadataItem(d.fileName, bs)}
coll := graph.NewMetadataCollection(p, item, func(cos *support.ConnectorOperationStatus) {})
colls = append(colls, coll)
}
_, deltas, err := ParseMetadataCollections(ctx, colls)
require.NoError(t, err)
assert.NotEmpty(t, deltas, "delta urls")
assert.Equal(t, "token", deltas["key"])
assert.NotEmpty(t, deltas, "deltas")
for k, v := range test.expectDeltas {
assert.Equal(t, v, deltas[k], "deltas elements")
}
})
}
}

View File

@ -1,7 +1,6 @@
package exchange
import (
"encoding/json"
"testing"
absser "github.com/microsoft/kiota-abstractions-go/serialization"
@ -15,97 +14,9 @@ import (
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
)
type ExchangeIteratorUnitSuite struct {
suite.Suite
}
func TestExchangeIteratorUnitSuite(t *testing.T) {
suite.Run(t, new(ExchangeIteratorUnitSuite))
}
func (suite *ExchangeIteratorUnitSuite) TestMakeMetadataCollection() {
tenant := "a-tenant"
user := "a-user"
table := []struct {
name string
cat path.CategoryType
tokens map[string]string
collectionCheck assert.ValueAssertionFunc
errCheck assert.ErrorAssertionFunc
}{
{
name: "EmptyTokens",
cat: path.EmailCategory,
tokens: nil,
collectionCheck: assert.Nil,
errCheck: assert.NoError,
},
{
name: "Tokens",
cat: path.EmailCategory,
tokens: map[string]string{
"hello": "world",
"hola": "mundo",
},
collectionCheck: assert.NotNil,
errCheck: assert.NoError,
},
{
name: "BadCategory",
cat: path.FilesCategory,
tokens: map[string]string{
"hello": "world",
"hola": "mundo",
},
collectionCheck: assert.Nil,
errCheck: assert.Error,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
col, err := makeMetadataCollection(
tenant,
user,
test.cat,
test.tokens,
func(*support.ConnectorOperationStatus) {},
)
test.errCheck(t, err)
if err != nil {
return
}
test.collectionCheck(t, col)
if col == nil {
return
}
itemCount := 0
for item := range col.Items() {
gotMap := map[string]string{}
decoder := json.NewDecoder(item.ToReader())
itemCount++
err := decoder.Decode(&gotMap)
if !assert.NoError(t, err) {
continue
}
assert.Equal(t, test.tokens, gotMap)
}
assert.Equal(t, 1, itemCount)
})
}
}
type ExchangeIteratorSuite struct {
suite.Suite
}

View File

@ -1,9 +1,7 @@
package exchange
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
@ -20,53 +18,6 @@ import (
"github.com/alcionai/corso/src/pkg/selectors"
)
const (
metadataKey = "metadata"
)
// makeMetadataCollection creates a metadata collection that has a file
// containing all the delta tokens in tokens. Returns nil if the map does not
// have any entries.
//
// TODO(ashmrtn): Expand this/break it out into multiple functions so that we
// can also store map[container ID]->full container path in a file in the
// metadata collection.
func makeMetadataCollection(
tenant string,
user string,
cat path.CategoryType,
tokens map[string]string,
statusUpdater support.StatusUpdater,
) (data.Collection, error) {
if len(tokens) == 0 {
return nil, nil
}
buf := &bytes.Buffer{}
encoder := json.NewEncoder(buf)
if err := encoder.Encode(tokens); err != nil {
return nil, errors.Wrap(err, "serializing delta tokens")
}
p, err := path.Builder{}.ToServiceCategoryMetadataPath(
tenant,
user,
path.ExchangeService,
cat,
false,
)
if err != nil {
return nil, errors.Wrap(err, "making path")
}
return graph.NewMetadataCollection(
p,
[]graph.MetadataItem{graph.NewMetadataItem(graph.DeltaTokenFileName, buf.Bytes())},
statusUpdater,
), nil
}
// FilterContainersAndFillCollections is a utility function
// that places the M365 object ids belonging to specific directories
// into a Collection. Messages outside of those directories are omitted.
@ -84,84 +35,82 @@ func FilterContainersAndFillCollections(
) error {
var (
errs error
collectionType = CategoryToOptionIdentifier(qp.Category)
oi = CategoryToOptionIdentifier(qp.Category)
// folder ID -> delta url for folder.
deltaURLs = map[string]string{}
prevPaths = map[string]string{}
)
for _, c := range resolver.Items() {
if ctrlOpts.FailFast && errs != nil {
return errs
}
dirPath, ok := pathAndMatch(qp, c, scope)
if !ok {
continue
}
cID := *c.GetId()
// Create only those that match
service, err := createService(qp.Credentials)
if err != nil {
errs = support.WrapAndAppend(
qp.ResourceOwner+" FilterContainerAndFillCollection",
err,
errs)
if ctrlOpts.FailFast {
return errs
}
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
continue
}
edc := NewCollection(
qp.ResourceOwner,
dirPath,
collectionType,
oi,
service,
statusUpdater,
ctrlOpts,
)
collections[*c.GetId()] = &edc
collections[cID] = &edc
fetchFunc, err := getFetchIDFunc(qp.Category)
if err != nil {
errs = support.WrapAndAppend(
qp.ResourceOwner,
err,
errs)
if ctrlOpts.FailFast {
return errs
}
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
continue
}
dirID := *c.GetId()
oldDelta := oldDeltas[dirID]
jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, dirID, oldDelta)
jobs, delta, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, cID, oldDeltas[cID])
if err != nil {
errs = support.WrapAndAppend(
qp.ResourceOwner,
err,
errs,
)
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
}
edc.jobs = append(edc.jobs, jobs...)
if len(delta) > 0 {
deltaURLs[dirID] = delta
}
deltaURLs[cID] = delta
}
col, err := makeMetadataCollection(
// add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation.
prevPaths[cID] = dirPath.Folder()
}
entries := []graph.MetadataCollectionEntry{
graph.NewMetadataEntry(graph.PreviousPathFileName, prevPaths),
}
if len(deltaURLs) > 0 {
entries = append(entries, graph.NewMetadataEntry(graph.DeltaTokenFileName, deltaURLs))
}
if col, err := graph.MakeMetadataCollection(
qp.Credentials.AzureTenantID,
qp.ResourceOwner,
path.ExchangeService,
qp.Category,
deltaURLs,
entries,
statusUpdater,
)
if err != nil {
); err != nil {
errs = support.WrapAndAppend("making metadata collection", err, errs)
} else if col != nil {
collections[metadataKey] = col
collections["metadata"] = col
}
return errs

View File

@ -3,8 +3,11 @@ package graph
import (
"bytes"
"context"
"encoding/json"
"io"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/path"
@ -24,6 +27,78 @@ type MetadataCollection struct {
statusUpdater support.StatusUpdater
}
// MetadataCollecionEntry describes a file that should get added to a metadata
// collection. The Data value will be encoded into json as part of a
// transformation into a MetadataItem.
type MetadataCollectionEntry struct {
fileName string
data any
}
func NewMetadataEntry(fileName string, mData any) MetadataCollectionEntry {
return MetadataCollectionEntry{fileName, mData}
}
func (mce MetadataCollectionEntry) toMetadataItem() (MetadataItem, error) {
if len(mce.fileName) == 0 {
return MetadataItem{}, errors.New("missing metadata filename")
}
if mce.data == nil {
return MetadataItem{}, errors.New("missing metadata")
}
buf := &bytes.Buffer{}
encoder := json.NewEncoder(buf)
if err := encoder.Encode(mce.data); err != nil {
return MetadataItem{}, errors.Wrap(err, "serializing metadata")
}
return NewMetadataItem(mce.fileName, buf.Bytes()), nil
}
// MakeMetadataCollection creates a metadata collection that has a file
// containing all the provided metadata as a single json object. Returns
// nil if the map does not have any entries.
func MakeMetadataCollection(
tenant, resourceOwner string,
service path.ServiceType,
cat path.CategoryType,
metadata []MetadataCollectionEntry,
statusUpdater support.StatusUpdater,
) (data.Collection, error) {
if len(metadata) == 0 {
return nil, nil
}
p, err := path.Builder{}.ToServiceCategoryMetadataPath(
tenant,
resourceOwner,
service,
cat,
false,
)
if err != nil {
return nil, errors.Wrap(err, "making metadata path")
}
items := make([]MetadataItem, 0, len(metadata))
for _, md := range metadata {
item, err := md.toMetadataItem()
if err != nil {
return nil, err
}
items = append(items, item)
}
coll := NewMetadataCollection(p, items, statusUpdater)
return coll, nil
}
func NewMetadataCollection(
p path.Path,
items []MetadataItem,

View File

@ -1,14 +1,15 @@
package graph_test
package graph
import (
"encoding/json"
"io"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/pkg/path"
)
@ -34,7 +35,7 @@ func (suite *MetadataCollectionUnitSuite) TestFullPath() {
)
require.NoError(t, err)
c := graph.NewMetadataCollection(p, nil, nil)
c := NewMetadataCollection(p, nil, nil)
assert.Equal(t, p.String(), c.FullPath().String())
}
@ -58,10 +59,10 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
"Requires same number of items and data",
)
items := []graph.MetadataItem{}
items := []MetadataItem{}
for i := 0; i < len(itemNames); i++ {
items = append(items, graph.NewMetadataItem(itemNames[i], itemData[i]))
items = append(items, NewMetadataItem(itemNames[i], itemData[i]))
}
p, err := path.Builder{}.
@ -74,7 +75,7 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
)
require.NoError(t, err)
c := graph.NewMetadataCollection(
c := NewMetadataCollection(
p,
items,
func(c *support.ConnectorOperationStatus) {
@ -100,3 +101,93 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
assert.ElementsMatch(t, itemNames, gotNames)
assert.ElementsMatch(t, itemData, gotData)
}
func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() {
tenant := "a-tenant"
user := "a-user"
table := []struct {
name string
service path.ServiceType
cat path.CategoryType
metadata MetadataCollectionEntry
collectionCheck assert.ValueAssertionFunc
errCheck assert.ErrorAssertionFunc
}{
{
name: "EmptyTokens",
service: path.ExchangeService,
cat: path.EmailCategory,
metadata: NewMetadataEntry("", nil),
collectionCheck: assert.Nil,
errCheck: assert.Error,
},
{
name: "Tokens",
service: path.ExchangeService,
cat: path.EmailCategory,
metadata: NewMetadataEntry(
uuid.NewString(),
map[string]string{
"hello": "world",
"hola": "mundo",
}),
collectionCheck: assert.NotNil,
errCheck: assert.NoError,
},
{
name: "BadCategory",
service: path.ExchangeService,
cat: path.FilesCategory,
metadata: NewMetadataEntry(
uuid.NewString(),
map[string]string{
"hello": "world",
"hola": "mundo",
}),
collectionCheck: assert.Nil,
errCheck: assert.Error,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
col, err := MakeMetadataCollection(
tenant,
user,
test.service,
test.cat,
[]MetadataCollectionEntry{test.metadata},
func(*support.ConnectorOperationStatus) {},
)
test.errCheck(t, err)
if err != nil {
return
}
test.collectionCheck(t, col)
if col == nil {
return
}
itemCount := 0
for item := range col.Items() {
assert.Equal(t, test.metadata.fileName, item.UUID())
gotMap := map[string]string{}
decoder := json.NewDecoder(item.ToReader())
itemCount++
err := decoder.Decode(&gotMap)
if !assert.NoError(t, err) {
continue
}
assert.Equal(t, test.metadata.data, gotMap)
}
assert.Equal(t, 1, itemCount)
})
}
}

View File

@ -9,14 +9,19 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)
const (
// DeltaTokenFileName is the name of the file containing delta token(s) for a
// given endpoint. The endpoint granularity varies by service.
const DeltaTokenFileName = "delta"
DeltaTokenFileName = "delta"
// PreviousPathFileName is the name of the file containing previous path(s) for a
// given endpoint.
PreviousPathFileName = "previouspath"
)
// MetadataFileNames produces the standard set of filenames used to store graph
// AllMetadataFileNames produces the standard set of filenames used to store graph
// metadata such as delta tokens and folderID->path references.
func MetadataFileNames() []string {
return []string{DeltaTokenFileName}
func AllMetadataFileNames() []string {
return []string{DeltaTokenFileName, PreviousPathFileName}
}
type QueryParams struct {

View File

@ -219,7 +219,7 @@ func produceManifestsAndMetadata(
return nil, nil, err
}
colls, err := collectMetadata(ctx, kw, graph.MetadataFileNames(), oc, tid, bup.SnapshotID)
colls, err := collectMetadata(ctx, kw, graph.AllMetadataFileNames(), oc, tid, bup.SnapshotID)
if err != nil && !errors.Is(err, kopia.ErrNotFound) {
// prior metadata isn't guaranteed to exist.
// if it doesn't, we'll just have to do a

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/events"
@ -188,8 +187,7 @@ func checkMetadataFilesExist(
backupID model.StableID,
kw *kopia.Wrapper,
ms *kopia.ModelStore,
tenant string,
user string,
tenant, user string,
service path.ServiceType,
category path.CategoryType,
files []string,
@ -328,20 +326,25 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
selectFunc func() *selectors.ExchangeBackup
resourceOwner string
category path.CategoryType
metadataFiles []string
}{
{
name: "Integration Exchange.Mail",
name: "Mail",
selectFunc: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{m365UserID}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
sel.Include(sel.MailFolders(
[]string{m365UserID},
[]string{exchange.DefaultMailFolder},
selectors.PrefixMatch()))
return sel
},
resourceOwner: m365UserID,
category: path.EmailCategory,
metadataFiles: exchange.MetadataFileNames(path.EmailCategory),
},
{
name: "Integration Exchange.Contacts",
name: "Contacts",
selectFunc: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup()
sel.Include(sel.ContactFolders(
@ -353,16 +356,22 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
},
resourceOwner: m365UserID,
category: path.ContactsCategory,
metadataFiles: exchange.MetadataFileNames(path.ContactsCategory),
},
{
name: "Integration Exchange.Events",
name: "Calendar Events",
selectFunc: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{m365UserID}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
sel.Include(sel.EventCalendars(
[]string{m365UserID},
[]string{exchange.DefaultCalendar},
selectors.PrefixMatch()))
return sel
},
resourceOwner: m365UserID,
category: path.EventsCategory,
metadataFiles: exchange.MetadataFileNames(path.EventsCategory),
},
}
for _, test := range tests {
@ -432,19 +441,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
return
}
// Check that metadata files with delta tokens were created. Currently
// these files will only be made for contacts and email in Exchange if any
// items were backed up. Events does not support delta queries.
m365, err := acct.M365Config()
require.NoError(t, err)
for _, scope := range sel.Scopes() {
cat := scope.Category().PathType()
if cat != path.EmailCategory && cat != path.ContactsCategory {
return
}
checkMetadataFilesExist(
t,
ctx,
@ -454,10 +453,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
m365.AzureTenantID,
m365UserID,
path.ExchangeService,
cat,
[]string{graph.DeltaTokenFileName},
test.category,
test.metadataFiles,
)
}
})
}
}

View File

@ -198,9 +198,9 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
require.NotEmpty(t, bo.Results.BackupID)
suite.backupID = bo.Results.BackupID
// Remove delta metadata files for contacts and email as they are not part of
// the data restored.
suite.numItems = bo.Results.ItemsWritten - 2
// Discount metadata files (3 paths, 2 deltas) as
// they are not part of the data restored.
suite.numItems = bo.Results.ItemsWritten - 5
}
func (suite *RestoreOpIntegrationSuite) TearDownSuite() {