allow skipping error cases in exchange backup

utilize the previously introduced canSkipItemFailure interface to create
skip records for items during backup.
This commit is contained in:
ryanfkeepers 2024-02-13 16:39:03 -07:00
parent dd71a5528a
commit 368a246596
6 changed files with 367 additions and 17 deletions

View File

@ -296,6 +296,7 @@ func populateCollections(
cl), cl),
qp.ProtectedResource.ID(), qp.ProtectedResource.ID(),
bh.itemHandler(), bh.itemHandler(),
bh,
addAndRem.Added, addAndRem.Added,
addAndRem.Removed, addAndRem.Removed,
// TODO: produce a feature flag that allows selective // TODO: produce a feature flag that allows selective

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/errs/core" "github.com/alcionai/corso/src/pkg/errs/core"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
@ -68,21 +69,21 @@ func getItemAndInfo(
ctx context.Context, ctx context.Context,
getter itemGetterSerializer, getter itemGetterSerializer,
userID string, userID string,
id string, itemID string,
useImmutableIDs bool, useImmutableIDs bool,
parentPath string, parentPath string,
) ([]byte, *details.ExchangeInfo, error) { ) ([]byte, *details.ExchangeInfo, error) {
item, info, err := getter.GetItem( item, info, err := getter.GetItem(
ctx, ctx,
userID, userID,
id, itemID,
fault.New(true)) // temporary way to force a failFast error fault.New(true)) // temporary way to force a failFast error
if err != nil { if err != nil {
return nil, nil, clues.WrapWC(ctx, err, "fetching item"). return nil, nil, clues.WrapWC(ctx, err, "fetching item").
Label(fault.LabelForceNoBackupCreation) Label(fault.LabelForceNoBackupCreation)
} }
itemData, err := getter.Serialize(ctx, item, userID, id) itemData, err := getter.Serialize(ctx, item, userID, itemID)
if err != nil { if err != nil {
return nil, nil, clues.WrapWC(ctx, err, "serializing item") return nil, nil, clues.WrapWC(ctx, err, "serializing item")
} }
@ -108,6 +109,7 @@ func NewCollection(
bc data.BaseCollection, bc data.BaseCollection,
user string, user string,
items itemGetterSerializer, items itemGetterSerializer,
canSkipFailChecker canSkipItemFailurer,
origAdded map[string]time.Time, origAdded map[string]time.Time,
origRemoved []string, origRemoved []string,
validModTimes bool, validModTimes bool,
@ -140,6 +142,7 @@ func NewCollection(
added: added, added: added,
removed: removed, removed: removed,
getter: items, getter: items,
skipChecker: canSkipFailChecker,
statusUpdater: statusUpdater, statusUpdater: statusUpdater,
} }
} }
@ -150,6 +153,7 @@ func NewCollection(
added: added, added: added,
removed: removed, removed: removed,
getter: items, getter: items,
skipChecker: canSkipFailChecker,
statusUpdater: statusUpdater, statusUpdater: statusUpdater,
counter: counter, counter: counter,
} }
@ -167,7 +171,8 @@ type prefetchCollection struct {
// removed is a list of item IDs that were deleted from, or moved out, of a container // removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{} removed map[string]struct{}
getter itemGetterSerializer getter itemGetterSerializer
skipChecker canSkipItemFailurer
statusUpdater support.StatusUpdater statusUpdater support.StatusUpdater
} }
@ -194,11 +199,12 @@ func (col *prefetchCollection) streamItems(
wg sync.WaitGroup wg sync.WaitGroup
progressMessage chan<- struct{} progressMessage chan<- struct{}
user = col.user user = col.user
dataCategory = col.Category().String()
) )
ctx = clues.Add( ctx = clues.Add(
ctx, ctx,
"category", col.Category().String()) "category", dataCategory)
defer func() { defer func() {
close(stream) close(stream)
@ -227,7 +233,7 @@ func (col *prefetchCollection) streamItems(
defer close(semaphoreCh) defer close(semaphoreCh)
// delete all removed items // delete all removed items
for id := range col.removed { for itemID := range col.removed {
semaphoreCh <- struct{}{} semaphoreCh <- struct{}{}
wg.Add(1) wg.Add(1)
@ -247,7 +253,7 @@ func (col *prefetchCollection) streamItems(
if progressMessage != nil { if progressMessage != nil {
progressMessage <- struct{}{} progressMessage <- struct{}{}
} }
}(id) }(itemID)
} }
var ( var (
@ -256,7 +262,7 @@ func (col *prefetchCollection) streamItems(
) )
// add any new items // add any new items
for id := range col.added { for itemID := range col.added {
if el.Failure() != nil { if el.Failure() != nil {
break break
} }
@ -277,8 +283,24 @@ func (col *prefetchCollection) streamItems(
col.Opts().ToggleFeatures.ExchangeImmutableIDs, col.Opts().ToggleFeatures.ExchangeImmutableIDs,
parentPath) parentPath)
if err != nil { if err != nil {
// pulled outside the switch due to multiple return values.
cause, canSkip := col.skipChecker.CanSkipItemFailure(
err,
user,
id,
col.BaseCollection.Opts())
// Handle known error cases // Handle known error cases
switch { switch {
case canSkip:
// this is a special case handler that allows the item to be skipped
// instead of producing an error.
errs.AddSkip(ctx, fault.FileSkip(
cause,
dataCategory,
id,
id,
nil))
case errors.Is(err, core.ErrNotFound): case errors.Is(err, core.ErrNotFound):
// Don't report errors for deleted items as there's no way for us to // Don't report errors for deleted items as there's no way for us to
// back up data that is gone. Record it as a "success", since there's // back up data that is gone. Record it as a "success", since there's
@ -349,7 +371,7 @@ func (col *prefetchCollection) streamItems(
if progressMessage != nil { if progressMessage != nil {
progressMessage <- struct{}{} progressMessage <- struct{}{}
} }
}(id) }(itemID)
} }
wg.Wait() wg.Wait()
@ -377,7 +399,8 @@ type lazyFetchCollection struct {
// removed is a list of item IDs that were deleted from, or moved out, of a container // removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{} removed map[string]struct{}
getter itemGetterSerializer getter itemGetterSerializer
skipChecker canSkipItemFailurer
statusUpdater support.StatusUpdater statusUpdater support.StatusUpdater
@ -404,8 +427,7 @@ func (col *lazyFetchCollection) streamItems(
var ( var (
success int64 success int64
progressMessage chan<- struct{} progressMessage chan<- struct{}
user = col.user
user = col.user
) )
defer func() { defer func() {
@ -459,10 +481,13 @@ func (col *lazyFetchCollection) streamItems(
&lazyItemGetter{ &lazyItemGetter{
userID: user, userID: user,
itemID: id, itemID: id,
category: col.FullPath().Category(),
getter: col.getter, getter: col.getter,
modTime: modTime, modTime: modTime,
immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs, immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs,
parentPath: parentPath, parentPath: parentPath,
skipChecker: col.skipChecker,
opts: col.BaseCollection.Opts(),
}, },
id, id,
modTime, modTime,
@ -481,9 +506,12 @@ type lazyItemGetter struct {
getter itemGetterSerializer getter itemGetterSerializer
userID string userID string
itemID string itemID string
category path.CategoryType
parentPath string parentPath string
modTime time.Time modTime time.Time
immutableIDs bool immutableIDs bool
skipChecker canSkipItemFailurer
opts control.Options
} }
func (lig *lazyItemGetter) GetData( func (lig *lazyItemGetter) GetData(
@ -498,6 +526,24 @@ func (lig *lazyItemGetter) GetData(
lig.immutableIDs, lig.immutableIDs,
lig.parentPath) lig.parentPath)
if err != nil { if err != nil {
cause, canSkip := lig.skipChecker.CanSkipItemFailure(
err,
lig.userID,
lig.itemID,
lig.opts)
if canSkip {
errs.AddSkip(ctx, fault.FileSkip(
cause,
lig.category.String(),
lig.itemID,
lig.itemID,
nil))
return nil, nil, false, clues.
NewWC(ctx, "error marked as skippable by handler").
Label(graph.LabelsSkippable)
}
// If an item was deleted then return an empty file so we don't fail // If an item was deleted then return an empty file so we don't fail
// the backup and return a sentinel error when asked for ItemInfo so // the backup and return a sentinel error when asked for ItemInfo so
// we don't display the item in the backup. // we don't display the item in the backup.

View File

@ -153,6 +153,7 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
count.New()), count.New()),
"u", "u",
mock.DefaultItemGetSerialize(), mock.DefaultItemGetSerialize(),
mock.NeverCanSkipFailChecker(),
nil, nil,
nil, nil,
colType.validModTimes, colType.validModTimes,
@ -298,6 +299,7 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_Items() {
count.New()), count.New()),
"", "",
&mock.ItemGetSerialize{}, &mock.ItemGetSerialize{},
mock.NeverCanSkipFailChecker(),
test.added, test.added,
maps.Keys(test.removed), maps.Keys(test.removed),
false, false,
@ -333,6 +335,126 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_Items() {
} }
} }
func (suite *CollectionUnitSuite) TestPrefetchCollection_Items_skipFailure() {
var (
t = suite.T()
start = time.Now().Add(-time.Second)
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
added map[string]time.Time
removed map[string]struct{}
expectItemCount int
expectSkippedCount int
}{
{
name: "no items",
},
{
name: "only added items",
added: map[string]time.Time{
"fisher": {},
"flannigan": {},
"fitzbog": {},
},
expectItemCount: 0,
expectSkippedCount: 3,
},
{
name: "only removed items",
removed: map[string]struct{}{
"princess": {},
"poppy": {},
"petunia": {},
},
expectItemCount: 3,
expectSkippedCount: 0,
},
{
name: "added and removed items",
added: map[string]time.Time{
"general": {},
},
removed: map[string]struct{}{
"general": {},
"goose": {},
"grumbles": {},
},
expectItemCount: 3,
// not 1, because general is removed from the added
// map due to being in the removed map
expectSkippedCount: 0,
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
)
ctx, flush := tester.NewContext(t)
defer flush()
col := NewCollection(
data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
"",
&mock.ItemGetSerialize{
SerializeErr: assert.AnError,
},
mock.AlwaysCanSkipFailChecker(),
test.added,
maps.Keys(test.removed),
false,
statusUpdater,
count.New())
for item := range col.Items(ctx, errs) {
itemCount++
_, rok := test.removed[item.ID()]
if rok {
assert.True(t, item.Deleted(), "removals should be marked as deleted")
dimt, ok := item.(data.ItemModTime)
require.True(t, ok, "item implements data.ItemModTime")
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
}
_, aok := test.added[item.ID()]
if !rok && aok {
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
}
assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID())
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
test.expectItemCount,
itemCount,
"should see all expected items")
assert.Len(t, errs.Skipped(), test.expectSkippedCount)
})
}
}
// This test verifies skipped error cases are handled correctly by collection enumeration // This test verifies skipped error cases are handled correctly by collection enumeration
func (suite *CollectionUnitSuite) TestCollection_SkippedErrors() { func (suite *CollectionUnitSuite) TestCollection_SkippedErrors() {
var ( var (
@ -398,6 +520,7 @@ func (suite *CollectionUnitSuite) TestCollection_SkippedErrors() {
count.New()), count.New()),
"", "",
test.itemGetter, test.itemGetter,
mock.NeverCanSkipFailChecker(),
test.added, test.added,
nil, nil,
false, false,
@ -530,6 +653,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
count.New()), count.New()),
"", "",
mlg, mlg,
mock.NeverCanSkipFailChecker(),
test.added, test.added,
maps.Keys(test.removed), maps.Keys(test.removed),
true, true,
@ -589,6 +713,154 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
} }
} }
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_skipFailure() {
var (
t = suite.T()
start = time.Now().Add(-time.Second)
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
added map[string]time.Time
removed map[string]struct{}
expectItemCount int
expectSkippedCount int
expectReads []string
}{
{
name: "no items",
},
{
name: "only added items",
added: map[string]time.Time{
"fisher": start.Add(time.Minute),
"flannigan": start.Add(2 * time.Minute),
"fitzbog": start.Add(3 * time.Minute),
},
expectItemCount: 3,
expectSkippedCount: 2,
expectReads: []string{
"fisher",
"fitzbog",
},
},
{
name: "only removed items",
removed: map[string]struct{}{
"princess": {},
"poppy": {},
"petunia": {},
},
expectItemCount: 3,
expectSkippedCount: 0,
},
{
name: "added and removed items",
added: map[string]time.Time{
"general": {},
},
removed: map[string]struct{}{
"general": {},
"goose": {},
"grumbles": {},
},
expectItemCount: 3,
// not 1, because general is removed from the added
// map due to being in the removed map
expectSkippedCount: 0,
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
)
ctx, flush := tester.NewContext(t)
defer flush()
mlg := &mockLazyItemGetterSerializer{
ItemGetSerialize: &mock.ItemGetSerialize{
SerializeErr: assert.AnError,
},
}
defer mlg.check(t, test.expectReads)
col := NewCollection(
data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
"",
mlg,
mock.AlwaysCanSkipFailChecker(),
test.added,
maps.Keys(test.removed),
true,
statusUpdater,
count.New())
for item := range col.Items(ctx, errs) {
itemCount++
_, rok := test.removed[item.ID()]
if rok {
assert.True(t, item.Deleted(), "removals should be marked as deleted")
dimt, ok := item.(data.ItemModTime)
require.True(t, ok, "item implements data.ItemModTime")
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
}
modTime, aok := test.added[item.ID()]
if !rok && aok {
// Item's mod time should be what's passed into the collection
// initializer.
assert.Implements(t, (*data.ItemModTime)(nil), item)
assert.Equal(t, modTime, item.(data.ItemModTime).ModTime(), "item mod time")
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
// Check if the test want's us to read the item's data so the lazy
// data fetch is executed.
if slices.Contains(test.expectReads, item.ID()) {
r := item.ToReader()
_, err := io.ReadAll(r)
assert.Error(t, err, clues.ToCore(err))
assert.ErrorContains(t, err, "marked as skippable", clues.ToCore(err))
assert.True(t, clues.HasLabel(err, graph.LabelsSkippable), clues.ToCore(err))
r.Close()
}
}
assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID())
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
test.expectItemCount,
itemCount,
"should see all expected items")
assert.Len(t, errs.Skipped(), test.expectSkippedCount)
})
}
}
func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() { func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() {
t := suite.T() t := suite.T()

View File

@ -5,6 +5,7 @@ import (
"slices" "slices"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"

View File

@ -3,11 +3,11 @@ package exchange
import ( import (
"testing" "testing"
"github.com/alcionai/clues"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
@ -54,7 +54,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() {
err: nil, err: nil,
opts: control.Options{ opts: control.Options{
SkipTheseEventsOnInstance503: map[string][]string{ SkipTheseEventsOnInstance503: map[string][]string{
"foo": []string{"bar", "baz"}, "foo": {"bar", "baz"},
}, },
}, },
expect: assert.False, expect: assert.False,
@ -64,7 +64,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() {
err: clues.New("fix me I'm wrong"), err: clues.New("fix me I'm wrong"),
opts: control.Options{ opts: control.Options{
SkipTheseEventsOnInstance503: map[string][]string{ SkipTheseEventsOnInstance503: map[string][]string{
"foo": []string{"bar", "baz"}, "foo": {"bar", "baz"},
}, },
}, },
expect: assert.False, expect: assert.False,
@ -74,7 +74,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() {
err: clues.New("fix me I'm wrong"), err: clues.New("fix me I'm wrong"),
opts: control.Options{ opts: control.Options{
SkipTheseEventsOnInstance503: map[string][]string{ SkipTheseEventsOnInstance503: map[string][]string{
resourceID: []string{"bar", "baz"}, resourceID: {"bar", "baz"},
}, },
}, },
expect: assert.False, expect: assert.False,
@ -84,7 +84,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() {
err: clues.New("fix me I'm wrong"), err: clues.New("fix me I'm wrong"),
opts: control.Options{ opts: control.Options{
SkipTheseEventsOnInstance503: map[string][]string{ SkipTheseEventsOnInstance503: map[string][]string{
resourceID: []string{"bar", itemID}, resourceID: {"bar", itemID},
}, },
}, },
expect: assert.True, expect: assert.True,

View File

@ -6,10 +6,15 @@ import (
"github.com/microsoft/kiota-abstractions-go/serialization" "github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
) )
// ---------------------------------------------------------------------------
// get and serialize item mock
// ---------------------------------------------------------------------------
type ItemGetSerialize struct { type ItemGetSerialize struct {
GetData serialization.Parsable GetData serialization.Parsable
GetCount int GetCount int
@ -44,3 +49,28 @@ func (m *ItemGetSerialize) Serialize(
func DefaultItemGetSerialize() *ItemGetSerialize { func DefaultItemGetSerialize() *ItemGetSerialize {
return &ItemGetSerialize{} return &ItemGetSerialize{}
} }
// ---------------------------------------------------------------------------
// can skip item failure mock
// ---------------------------------------------------------------------------
type canSkipFailChecker struct {
canSkip bool
}
func (m canSkipFailChecker) CanSkipItemFailure(
error,
string,
string,
control.Options,
) (fault.SkipCause, bool) {
return fault.SkipCause("testing"), m.canSkip
}
func NeverCanSkipFailChecker() *canSkipFailChecker {
return &canSkipFailChecker{}
}
func AlwaysCanSkipFailChecker() *canSkipFailChecker {
return &canSkipFailChecker{true}
}