require complete previous data for increment (#1869)
## Description If an exchange collection only has previous delta info, or only previous path info, and not the other, remove that entry from the parsed metadata map, causing a full backup of the containere. ## Does this PR need a docs update or release note? - [x] ⛔ No ## Type of change - [x] 🌻 Feature ## Issue(s) * #1804 ## Test Plan - [x] ⚡ Unit test
This commit is contained in:
parent
fbbf6aa84c
commit
2b0ccdc8de
@ -24,16 +24,31 @@ func MetadataFileNames(cat path.CategoryType) []string {
|
||||
|
||||
type CatDeltaPaths map[path.CategoryType]DeltaPaths
|
||||
|
||||
type DeltaPaths struct {
|
||||
deltas map[string]string
|
||||
paths map[string]string
|
||||
type DeltaPaths map[string]DeltaPath
|
||||
|
||||
func (dps DeltaPaths) AddDelta(k, d string) {
|
||||
dp, ok := dps[k]
|
||||
if !ok {
|
||||
dp = DeltaPath{}
|
||||
}
|
||||
|
||||
func makeDeltaPaths() DeltaPaths {
|
||||
return DeltaPaths{
|
||||
deltas: map[string]string{},
|
||||
paths: map[string]string{},
|
||||
dp.delta = d
|
||||
dps[k] = dp
|
||||
}
|
||||
|
||||
func (dps DeltaPaths) AddPath(k, p string) {
|
||||
dp, ok := dps[k]
|
||||
if !ok {
|
||||
dp = DeltaPath{}
|
||||
}
|
||||
|
||||
dp.path = p
|
||||
dps[k] = dp
|
||||
}
|
||||
|
||||
type DeltaPath struct {
|
||||
delta string
|
||||
path string
|
||||
}
|
||||
|
||||
// ParseMetadataCollections produces a map of structs holding delta
|
||||
@ -42,10 +57,19 @@ func ParseMetadataCollections(
|
||||
ctx context.Context,
|
||||
colls []data.Collection,
|
||||
) (CatDeltaPaths, error) {
|
||||
// cdp stores metadata
|
||||
cdp := CatDeltaPaths{
|
||||
path.ContactsCategory: makeDeltaPaths(),
|
||||
path.EmailCategory: makeDeltaPaths(),
|
||||
path.EventsCategory: makeDeltaPaths(),
|
||||
path.ContactsCategory: {},
|
||||
path.EmailCategory: {},
|
||||
path.EventsCategory: {},
|
||||
}
|
||||
|
||||
// found tracks the metadata we've loaded, to make sure we don't
|
||||
// fetch overlapping copies.
|
||||
found := map[path.CategoryType]map[string]struct{}{
|
||||
path.ContactsCategory: {},
|
||||
path.EmailCategory: {},
|
||||
path.EventsCategory: {},
|
||||
}
|
||||
|
||||
for _, coll := range colls {
|
||||
@ -66,8 +90,10 @@ func ParseMetadataCollections(
|
||||
break
|
||||
}
|
||||
|
||||
m := map[string]string{}
|
||||
cdps := cdp[category]
|
||||
var (
|
||||
m = map[string]string{}
|
||||
cdps = cdp[category]
|
||||
)
|
||||
|
||||
err := json.NewDecoder(item.ToReader()).Decode(&m)
|
||||
if err != nil {
|
||||
@ -76,18 +102,26 @@ func ParseMetadataCollections(
|
||||
|
||||
switch item.UUID() {
|
||||
case graph.PreviousPathFileName:
|
||||
if len(cdps.paths) > 0 {
|
||||
if _, ok := found[category]["path"]; ok {
|
||||
return nil, errors.Errorf("multiple versions of %s path metadata", category)
|
||||
}
|
||||
|
||||
cdps.paths = m
|
||||
for k, p := range m {
|
||||
cdps.AddPath(k, p)
|
||||
}
|
||||
|
||||
found[category]["path"] = struct{}{}
|
||||
|
||||
case graph.DeltaURLsFileName:
|
||||
if len(cdps.deltas) > 0 {
|
||||
if _, ok := found[category]["delta"]; ok {
|
||||
return nil, errors.Errorf("multiple versions of %s delta metadata", category)
|
||||
}
|
||||
|
||||
cdps.deltas = m
|
||||
for k, d := range m {
|
||||
cdps.AddDelta(k, d)
|
||||
}
|
||||
|
||||
found[category]["delta"] = struct{}{}
|
||||
}
|
||||
|
||||
cdp[category] = cdps
|
||||
@ -99,5 +133,16 @@ func ParseMetadataCollections(
|
||||
}
|
||||
}
|
||||
|
||||
// Remove any entries that contain a path or a delta, but not both.
|
||||
// That metadata is considered incomplete, and needs to incur a
|
||||
// complete backup on the next run.
|
||||
for _, dps := range cdp {
|
||||
for k, dp := range dps {
|
||||
if len(dp.delta) == 0 || len(dp.path) == 0 {
|
||||
delete(dps, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cdp, nil
|
||||
}
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
package exchange
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -36,18 +35,15 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
table := []struct {
|
||||
name string
|
||||
data []fileValues
|
||||
expectDeltas map[string]string
|
||||
expectPaths map[string]string
|
||||
expect map[string]DeltaPath
|
||||
expectError assert.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "delta urls",
|
||||
name: "delta urls only",
|
||||
data: []fileValues{
|
||||
{graph.DeltaURLsFileName, "delta-link"},
|
||||
},
|
||||
expectDeltas: map[string]string{
|
||||
"key": "delta-link",
|
||||
},
|
||||
expect: map[string]DeltaPath{},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
@ -59,13 +55,11 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
expectError: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "previous path",
|
||||
name: "previous path only",
|
||||
data: []fileValues{
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expectPaths: map[string]string{
|
||||
"key": "prev-path",
|
||||
},
|
||||
expect: map[string]DeltaPath{},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
@ -82,21 +76,43 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
{graph.DeltaURLsFileName, "delta-link"},
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expectDeltas: map[string]string{
|
||||
"key": "delta-link",
|
||||
expect: map[string]DeltaPath{
|
||||
"key": {
|
||||
delta: "delta-link",
|
||||
path: "prev-path",
|
||||
},
|
||||
expectPaths: map[string]string{
|
||||
"key": "prev-path",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "delta urls and empty previous paths",
|
||||
data: []fileValues{
|
||||
{graph.DeltaURLsFileName, "delta-link"},
|
||||
{graph.PreviousPathFileName, ""},
|
||||
},
|
||||
expect: map[string]DeltaPath{},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "empty delta urls and previous paths",
|
||||
data: []fileValues{
|
||||
{graph.DeltaURLsFileName, ""},
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expect: map[string]DeltaPath{},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "delta urls with special chars",
|
||||
data: []fileValues{
|
||||
{graph.DeltaURLsFileName, "`!@#$%^&*()_[]{}/\"\\"},
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expect: map[string]DeltaPath{
|
||||
"key": {
|
||||
delta: "`!@#$%^&*()_[]{}/\"\\",
|
||||
path: "prev-path",
|
||||
},
|
||||
expectDeltas: map[string]string{
|
||||
"key": "`!@#$%^&*()_[]{}/\"\\",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
@ -104,9 +120,13 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
name: "delta urls with escaped chars",
|
||||
data: []fileValues{
|
||||
{graph.DeltaURLsFileName, `\n\r\t\b\f\v\0\\`},
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expect: map[string]DeltaPath{
|
||||
"key": {
|
||||
delta: "\\n\\r\\t\\b\\f\\v\\0\\\\",
|
||||
path: "prev-path",
|
||||
},
|
||||
expectDeltas: map[string]string{
|
||||
"key": "\\n\\r\\t\\b\\f\\v\\0\\\\",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
@ -117,9 +137,13 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
// error in serializing/deserializing and produce a single newline
|
||||
// character from those two runes.
|
||||
{graph.DeltaURLsFileName, string([]rune{rune(92), rune(110)})},
|
||||
{graph.PreviousPathFileName, "prev-path"},
|
||||
},
|
||||
expect: map[string]DeltaPath{
|
||||
"key": {
|
||||
delta: "\\n",
|
||||
path: "prev-path",
|
||||
},
|
||||
expectDeltas: map[string]string{
|
||||
"key": "\\n",
|
||||
},
|
||||
expectError: assert.NoError,
|
||||
},
|
||||
@ -129,45 +153,33 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
colls := []data.Collection{}
|
||||
entries := []graph.MetadataCollectionEntry{}
|
||||
|
||||
for _, d := range test.data {
|
||||
bs, err := json.Marshal(map[string]string{"key": d.value})
|
||||
require.NoError(t, err)
|
||||
entries = append(
|
||||
entries,
|
||||
graph.NewMetadataEntry(d.fileName, map[string]string{"key": d.value}))
|
||||
}
|
||||
|
||||
p, err := path.Builder{}.ToServiceCategoryMetadataPath(
|
||||
coll, err := graph.MakeMetadataCollection(
|
||||
"t", "u",
|
||||
path.ExchangeService,
|
||||
path.EmailCategory,
|
||||
false,
|
||||
entries,
|
||||
func(cos *support.ConnectorOperationStatus) {},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
item := []graph.MetadataItem{graph.NewMetadataItem(d.fileName, bs)}
|
||||
coll := graph.NewMetadataCollection(p, item, func(cos *support.ConnectorOperationStatus) {})
|
||||
colls = append(colls, coll)
|
||||
}
|
||||
|
||||
cdps, err := ParseMetadataCollections(ctx, colls)
|
||||
cdps, err := ParseMetadataCollections(ctx, []data.Collection{coll})
|
||||
test.expectError(t, err)
|
||||
|
||||
emails := cdps[path.EmailCategory]
|
||||
deltas, paths := emails.deltas, emails.paths
|
||||
|
||||
if len(test.expectDeltas) > 0 {
|
||||
assert.Len(t, deltas, len(test.expectDeltas), "deltas len")
|
||||
}
|
||||
assert.Len(t, emails, len(test.expect))
|
||||
|
||||
if len(test.expectPaths) > 0 {
|
||||
assert.Len(t, paths, len(test.expectPaths), "paths len")
|
||||
}
|
||||
|
||||
for k, v := range test.expectDeltas {
|
||||
assert.Equal(t, v, deltas[k], "deltas elements")
|
||||
}
|
||||
|
||||
for k, v := range test.expectPaths {
|
||||
assert.Equal(t, v, paths[k], "paths elements")
|
||||
for k, v := range emails {
|
||||
assert.Equal(t, v.delta, emails[k].delta, "delta")
|
||||
assert.Equal(t, v.path, emails[k].path, "path")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -62,16 +62,30 @@ func FilterContainersAndFillCollections(
|
||||
currPaths = map[string]string{}
|
||||
// copy of previousPaths. any folder found in the resolver get
|
||||
// deleted from this map, leaving only the deleted maps behind
|
||||
deletedPaths = common.CopyMap(dps.paths)
|
||||
deletedPaths = common.CopyMap(dps)
|
||||
)
|
||||
|
||||
getJobs, err := getFetchIDFunc(qp.Category)
|
||||
if err != nil {
|
||||
return support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
}
|
||||
|
||||
for _, c := range resolver.Items() {
|
||||
if ctrlOpts.FailFast && errs != nil {
|
||||
return errs
|
||||
}
|
||||
|
||||
// cannot be moved out of the loop,
|
||||
// else we run into state issues.
|
||||
service, err := createService(qp.Credentials)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
continue
|
||||
}
|
||||
|
||||
cID := *c.GetId()
|
||||
|
||||
// this folder exists (probably), do not delete it.
|
||||
delete(deletedPaths, cID)
|
||||
|
||||
// Only create a collection if the path matches the scope.
|
||||
@ -80,43 +94,32 @@ func FilterContainersAndFillCollections(
|
||||
continue
|
||||
}
|
||||
|
||||
var prevPath path.Path
|
||||
var (
|
||||
dp = dps[cID]
|
||||
prevDelta = dp.delta
|
||||
prevPathStr = dp.path
|
||||
prevPath path.Path
|
||||
)
|
||||
|
||||
if p, ok := dps.paths[cID]; ok {
|
||||
var err error
|
||||
if prevPath, err = pathFromPrevString(p); err != nil {
|
||||
if len(prevPathStr) > 0 {
|
||||
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
|
||||
logger.Ctx(ctx).Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
service, err := createService(qp.Credentials)
|
||||
jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
continue
|
||||
}
|
||||
|
||||
fetchFunc, err := getFetchIDFunc(qp.Category)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
continue
|
||||
}
|
||||
|
||||
var deletedInFlight bool
|
||||
|
||||
jobs, delta, err := fetchFunc(ctx, service, qp.ResourceOwner, cID, dps.deltas[cID])
|
||||
if err != nil && !errors.Is(err, errContainerDeleted) {
|
||||
deletedInFlight = true
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
}
|
||||
|
||||
if len(delta) > 0 {
|
||||
deltaURLs[cID] = delta
|
||||
}
|
||||
|
||||
// Delay creating the new container so we can handle setting the current
|
||||
// path correctly if the folder was deleted.
|
||||
if deletedInFlight {
|
||||
// race conditions happen, the container might get
|
||||
// deleted while this process in flight.
|
||||
if errors.Is(err, errContainerDeleted) {
|
||||
currPath = nil
|
||||
} else {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
}
|
||||
}
|
||||
|
||||
if len(currDelta) > 0 {
|
||||
deltaURLs[cID] = currDelta
|
||||
}
|
||||
|
||||
edc := NewCollection(
|
||||
@ -130,7 +133,7 @@ func FilterContainersAndFillCollections(
|
||||
)
|
||||
collections[cID] = &edc
|
||||
|
||||
if deletedInFlight {
|
||||
if edc.State() == data.DeletedState {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -145,14 +148,14 @@ func FilterContainersAndFillCollections(
|
||||
// relocations and renames will have removed the dir by id earlier. What's
|
||||
// left in deletedPaths are only the previous paths that did not appear as
|
||||
// children of the root.
|
||||
for fID, ps := range deletedPaths {
|
||||
for fID, dp := range deletedPaths {
|
||||
service, err := createService(qp.Credentials)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
continue
|
||||
}
|
||||
|
||||
prevPath, err := pathFromPrevString(ps)
|
||||
prevPath, err := pathFromPrevString(dp.path)
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Error(err)
|
||||
continue
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user