WIP 316 kopia upload errors (#330)

* Update to kopia with required callback

* Support structs for materializing backup details

Kopia will not allow us to pass data to it that should be passed back to
us in the `FinishedFile` callback. To work around this, create a small
thread-safe support struct that handles information about files kopia is
currently processing. Entries are removed from the set when kopia is
done with them and if no error occurred, the item's info will be added
to the BackupDetails.

* Switch to best attempt for iterating through files

Defaulting to "best-attempt" error handling where all data that didn't
result in an error is handed to kopia and then all errors encountered
are returned at the end.

* Test for uploads that have an error

Simple error reading a file. BackupDetails should not contain
information about the file that had the error (needs update to kopia
code and this code to pass). All other files should be present in kopia
and in BackupDetails.

Co-authored-by: Danny <danny@alcion.ai>
This commit is contained in:
ashmrtn 2022-08-23 08:18:29 -07:00 committed by GitHub
parent eca16e4a69
commit e8e4bf0914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 325 additions and 38 deletions

View File

@ -2,7 +2,7 @@ module github.com/alcionai/corso
go 1.18
replace github.com/kopia/kopia => github.com/kopia/kopia v0.11.4-0.20220819163352-5ad8f1cf38a3
replace github.com/kopia/kopia => github.com/kopia/kopia v0.11.4-0.20220822194227-5c88bcf1a6e7
require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0

View File

@ -52,7 +52,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aws/aws-sdk-go v1.44.80 h1:jEXGecSgPdvM5KnyDsSgFhZSm7WwaTp4h544Im4SfhI=
github.com/aws/aws-sdk-go v1.44.81 h1:C8oBZ+a+ka0qk3Q24MohQIFq0tkbO8IAu5tfpAMKVWE=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@ -225,8 +225,8 @@ github.com/klauspost/reedsolomon v1.10.0 h1:MonMtg979rxSHjwtsla5dZLhreS0Lu42AyQ2
github.com/klauspost/reedsolomon v1.10.0/go.mod h1:qHMIzMkuZUWqIh8mS/GruPdo3u0qwX2jk/LH440ON7Y=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kopia/kopia v0.11.4-0.20220819163352-5ad8f1cf38a3 h1:10M0vnKBCqQqKhbUowEXrD0minuv4Gbh1aCF2GotV60=
github.com/kopia/kopia v0.11.4-0.20220819163352-5ad8f1cf38a3/go.mod h1:YO48laHllfGEHM1PtLcY66PYLFB9XIqru4bPmtZn8i0=
github.com/kopia/kopia v0.11.4-0.20220822194227-5c88bcf1a6e7 h1:CJaI4frTo1+ayoCa/imv8F3VPQbkWyr7U3KBI5PPjaI=
github.com/kopia/kopia v0.11.4-0.20220822194227-5c88bcf1a6e7/go.mod h1:ckJEq1c7KJcK1ZgqMRy+r+VpE/Z6iUzioZb/0KSBhWw=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

View File

@ -57,8 +57,8 @@ func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {
defer close(res)
for i := 0; i < medc.messageCount; i++ {
res <- &MockExchangeData{
medc.Names[i],
io.NopCloser(bytes.NewReader(medc.Data[i])),
ID: medc.Names[i],
Reader: io.NopCloser(bytes.NewReader(medc.Data[i])),
}
}
}()
@ -68,8 +68,9 @@ func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {
// ExchangeData represents a single item retrieved from exchange
type MockExchangeData struct {
ID string
Reader io.ReadCloser
ID string
Reader io.ReadCloser
ReadErr error
}
func (med *MockExchangeData) UUID() string {
@ -77,6 +78,10 @@ func (med *MockExchangeData) UUID() string {
}
func (med *MockExchangeData) ToReader() io.ReadCloser {
if med.ReadErr != nil {
return io.NopCloser(errReader{med.ReadErr})
}
return med.Reader
}
@ -127,3 +132,11 @@ func GetMockEventBytes(subject string) []byte {
"Review + Lunch\",\"type\":\"singleInstance\",\"webLink\":\"https://outlook.office365.com/owa/?itemid=AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAENAADSEBNbUIB9RL6ePDeF3FIYAAAAAG76AAA%3D&exvsurl=1&path=/calendar/item\"}"
return []byte(event)
}
type errReader struct {
readErr error
}
func (er errReader) Read([]byte) (int, error) {
return 0, er.readErr
}

View File

@ -2,11 +2,13 @@ package mockconnector_test
import (
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/mockconnector"
@ -50,3 +52,53 @@ func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection_NewExchange
assert.NotNil(t, something)
}
}
type MockExchangeDataSuite struct {
suite.Suite
}
func TestMockExchangeDataSuite(t *testing.T) {
suite.Run(t, new(MockExchangeDataSuite))
}
func (suite *MockExchangeDataSuite) TestMockExchangeData() {
data := []byte("foo")
id := "bar"
table := []struct {
name string
reader *mockconnector.MockExchangeData
check require.ErrorAssertionFunc
}{
{
name: "NoError",
reader: &mockconnector.MockExchangeData{
ID: id,
Reader: io.NopCloser(bytes.NewReader(data)),
},
check: require.NoError,
},
{
name: "Error",
reader: &mockconnector.MockExchangeData{
ID: id,
ReadErr: assert.AnError,
},
check: require.Error,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
assert.Equal(t, id, test.reader.UUID())
buf, err := ioutil.ReadAll(test.reader.ToReader())
test.check(t, err)
if err != nil {
return
}
assert.Equal(t, data, buf)
})
}
}

View File

@ -3,6 +3,7 @@ package kopia
import (
"context"
"path"
"sync"
"github.com/hashicorp/go-multierror"
"github.com/kopia/kopia/fs"
@ -50,6 +51,58 @@ func manifestToStats(man *snapshot.Manifest) BackupStats {
}
}
type itemDetails struct {
info details.ItemInfo
repoRef string
}
type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Details
mu sync.RWMutex
}
// Kopia interface function used as a callback when kopia finishes processing a
// file.
func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedFile(relativePath, err)
// Whether it succeeded or failed, remove the entry from our pending set so we
// don't leak references.
defer func() {
cp.mu.Lock()
defer cp.mu.Unlock()
delete(cp.pending, relativePath)
}()
if err != nil {
return
}
d := cp.get(relativePath)
if d == nil {
return
}
cp.deets.Add(d.repoRef, d.info)
}
func (cp *corsoProgress) put(k string, v *itemDetails) {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.pending[k] = v
}
func (cp *corsoProgress) get(k string) *itemDetails {
cp.mu.RLock()
defer cp.mu.RUnlock()
return cp.pending[k]
}
func NewWrapper(c *conn) (*Wrapper, error) {
if err := c.wrap(); err != nil {
return nil, errors.Wrap(err, "creating Wrapper")
@ -79,9 +132,13 @@ func (w *Wrapper) Close(ctx context.Context) error {
func getStreamItemFunc(
staticEnts []fs.Entry,
streamedEnts data.Collection,
snapshotDetails *details.Details,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
// Collect all errors and return them at the end so that iteration for this
// directory doesn't end early.
var errs *multierror.Error
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := cb(ctx, d); err != nil {
@ -101,22 +158,37 @@ func getStreamItemFunc(
case e, ok := <-items:
if !ok {
return nil
return errs.ErrorOrNil()
}
itemPath := path.Join(append(streamedEnts.FullPath(), e.UUID())...)
ei, ok := e.(data.StreamInfo)
if !ok {
return errors.New("item does not implement DataStreamInfo")
errs = multierror.Append(
errs, errors.Errorf("item %q does not implement DataStreamInfo", itemPath))
logger.Ctx(ctx).Errorw(
"item does not implement DataStreamInfo; skipping", "path", itemPath)
continue
}
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
p := path.Join(append(streamedEnts.FullPath()[1:], e.UUID())...)
d := &itemDetails{info: ei.Info(), repoRef: itemPath}
progress.put(p, d)
entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader())
if err := cb(ctx, entry); err != nil {
return errors.Wrap(err, "executing callback")
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath))
return errs.ErrorOrNil()
}
// Populate BackupDetails
ep := append(streamedEnts.FullPath(), e.UUID())
snapshotDetails.Add(path.Join(ep...), ei.Info())
}
}
}
@ -124,13 +196,13 @@ func getStreamItemFunc(
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(dirName string, dir *treeMap, snapshotDetails *details.Details) (fs.Directory, error) {
func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) {
// Need to build the directory tree from the leaves up because intermediate
// directories need to have all their entries at creation time.
var childDirs []fs.Entry
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir, snapshotDetails)
child, err := buildKopiaDirs(childName, childDir, progress)
if err != nil {
return nil, err
}
@ -140,7 +212,7 @@ func buildKopiaDirs(dirName string, dir *treeMap, snapshotDetails *details.Detai
return virtualfs.NewStreamingDirectory(
dirName,
getStreamItemFunc(childDirs, dir.collection, snapshotDetails),
getStreamItemFunc(childDirs, dir.collection, progress),
), nil
}
@ -162,7 +234,7 @@ func newTreeMap() *treeMap {
func inflateDirTree(
ctx context.Context,
collections []data.Collection,
snapshotDetails *details.Details,
progress *corsoProgress,
) (fs.Directory, error) {
roots := make(map[string]*treeMap)
@ -222,7 +294,7 @@ func inflateDirTree(
var res fs.Directory
for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir, snapshotDetails)
tmp, err := buildKopiaDirs(dirName, dir, progress)
if err != nil {
return nil, err
}
@ -241,25 +313,28 @@ func (w Wrapper) BackupCollections(
return nil, nil, errNotConnected
}
snapshotDetails := &details.Details{}
progress := &corsoProgress{
pending: map[string]*itemDetails{},
deets: &details.Details{},
}
dirTree, err := inflateDirTree(ctx, collections, snapshotDetails)
dirTree, err := inflateDirTree(ctx, collections, progress)
if err != nil {
return nil, nil, errors.Wrap(err, "building kopia directories")
}
stats, err := w.makeSnapshotWithRoot(ctx, dirTree, snapshotDetails)
stats, err := w.makeSnapshotWithRoot(ctx, dirTree, progress)
if err != nil {
return nil, nil, err
}
return stats, snapshotDetails, nil
return stats, progress.deets, nil
}
func (w Wrapper) makeSnapshotWithRoot(
ctx context.Context,
root fs.Directory,
snapshotDetails *details.Details,
progress *corsoProgress,
) (*BackupStats, error) {
var man *snapshot.Manifest
@ -280,14 +355,25 @@ func (w Wrapper) makeSnapshotWithRoot(
Path: root.Name(),
}
policyTree, err := policy.TreeForSource(innerCtx, w.c, si)
trueVal := policy.OptionalBool(true)
errPolicy := &policy.Policy{
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
IgnoreFileErrors: &trueVal,
IgnoreDirectoryErrors: &trueVal,
},
}
policyTree, err := policy.TreeForSourceWithOverride(innerCtx, w.c, si, errPolicy)
if err != nil {
err = errors.Wrap(err, "get policy tree")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
// By default Uploader is best-attempt.
u := snapshotfs.NewUploader(rw)
progress.UploadProgress = u.Progress
u.Progress = progress
man, err = u.Upload(innerCtx, root, policyTree, si)
if err != nil {
err = errors.Wrap(err, "uploading data")
@ -459,7 +545,8 @@ func walkDirectory(
files = append(files, e)
default:
errs = multierror.Append(errs, errors.Errorf("unexpected item type %T", e))
logger.Ctx(ctx).Warnf("unexpected item of type %T; skipping", e)
logger.Ctx(ctx).Errorw(
"unexpected item type; skipping", "type", e)
}
return nil
@ -507,7 +594,8 @@ func restoreSubtree(
fileFullPath := path.Join(append(append([]string{}, fullPath...), f.Name())...)
errs = multierror.Append(
errs, errors.Wrapf(err, "getting reader for file %q", fileFullPath))
logger.Ctx(ctx).Warnf("skipping file %q", fileFullPath)
logger.Ctx(ctx).Errorw(
"unable to get file reader; skipping", "path", fileFullPath)
continue
}

View File

@ -12,6 +12,7 @@ import (
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@ -88,6 +89,89 @@ func testForFiles(
// ---------------
// unit tests
// ---------------
type CorsoProgressUnitSuite struct {
suite.Suite
}
func TestCorsoProgressUnitSuite(t *testing.T) {
suite.Run(t, new(CorsoProgressUnitSuite))
}
func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
type testInfo struct {
info *itemDetails
err error
}
targetFileName := "testFile"
deets := &itemDetails{details.ItemInfo{}, targetFileName}
table := []struct {
name string
cachedItems map[string]testInfo
expectedLen int
err error
}{
{
name: "DetailsExist",
cachedItems: map[string]testInfo{
targetFileName: {
info: deets,
err: nil,
},
},
expectedLen: 1,
},
{
name: "PendingNoDetails",
cachedItems: map[string]testInfo{
targetFileName: {
info: nil,
err: nil,
},
},
expectedLen: 0,
},
{
name: "HadError",
cachedItems: map[string]testInfo{
targetFileName: {
info: deets,
err: assert.AnError,
},
},
expectedLen: 0,
},
{
name: "NotPending",
expectedLen: 0,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
for k, v := range test.cachedItems {
cp.put(k, v.info)
}
require.Len(t, cp.pending, len(test.cachedItems))
for k, v := range test.cachedItems {
cp.FinishedFile(k, v.err)
}
assert.Empty(t, cp.pending)
assert.Len(t, bd.Entries, test.expectedLen)
})
}
}
type KopiaUnitSuite struct {
suite.Suite
}
@ -117,7 +201,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
user2: 42,
}
snapshotDetails := &details.Details{}
progress := &corsoProgress{pending: map[string]*itemDetails{}}
collections := []data.Collection{
mockconnector.NewMockExchangeCollection(
@ -138,7 +222,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
// - user2
// - emails
// - 42 separate files
dirTree, err := inflateDirTree(ctx, collections, snapshotDetails)
dirTree, err := inflateDirTree(ctx, collections, progress)
require.NoError(suite.T(), err)
assert.Equal(suite.T(), dirTree.Name(), tenant)
@ -169,7 +253,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
totalFileCount += c
}
assert.Len(suite.T(), snapshotDetails.Entries, totalFileCount)
assert.Len(suite.T(), progress.pending, totalFileCount)
}
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
@ -180,7 +264,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
expectedFileCount := 42
snapshotDetails := &details.Details{}
progress := &corsoProgress{pending: map[string]*itemDetails{}}
collections := []data.Collection{
mockconnector.NewMockExchangeCollection(
[]string{emails},
@ -191,7 +275,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
// Returned directory structure should look like:
// - emails
// - 42 separate files
dirTree, err := inflateDirTree(ctx, collections, snapshotDetails)
dirTree, err := inflateDirTree(ctx, collections, progress)
require.NoError(suite.T(), err)
assert.Equal(suite.T(), dirTree.Name(), emails)
@ -243,9 +327,9 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_MixedDirectory() {
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
snapshotDetails := &details.Details{}
progress := &corsoProgress{pending: map[string]*itemDetails{}}
dirTree, err := inflateDirTree(ctx, test.layout, snapshotDetails)
dirTree, err := inflateDirTree(ctx, test.layout, progress)
require.NoError(t, err)
assert.Equal(t, testTenant, dirTree.Name())
@ -323,8 +407,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
ctx := context.Background()
suite.T().Run(test.name, func(t *testing.T) {
snapshotDetails := &details.Details{}
_, err := inflateDirTree(ctx, test.layout, snapshotDetails)
_, err := inflateDirTree(ctx, test.layout, nil)
assert.Error(t, err)
})
}
@ -528,6 +611,57 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
testForFiles(t, expected, result)
}
func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
t := suite.T()
collections := []data.Collection{
&kopiaDataCollection{
path: testPath,
streams: []data.Stream{
&mockconnector.MockExchangeData{
ID: testFileName,
Reader: io.NopCloser(bytes.NewReader(testFileData)),
},
&mockconnector.MockExchangeData{
ID: testFileName2,
Reader: io.NopCloser(bytes.NewReader(testFileData2)),
},
},
},
&kopiaDataCollection{
path: testPath2,
streams: []data.Stream{
&mockconnector.MockExchangeData{
ID: testFileName3,
Reader: io.NopCloser(bytes.NewReader(testFileData3)),
},
&mockconnector.MockExchangeData{
ID: testFileName4,
ReadErr: assert.AnError,
},
&mockconnector.MockExchangeData{
ID: testFileName5,
Reader: io.NopCloser(bytes.NewReader(testFileData5)),
},
&mockconnector.MockExchangeData{
ID: testFileName6,
Reader: io.NopCloser(bytes.NewReader(testFileData6)),
},
},
},
}
stats, rp, err := suite.w.BackupCollections(suite.ctx, collections)
require.NoError(t, err)
assert.Equal(t, 0, stats.ErrorCount)
assert.Equal(t, 5, stats.TotalFileCount)
assert.Equal(t, 5, stats.TotalDirectoryCount)
assert.Equal(t, 1, stats.IgnoredErrorCount)
assert.False(t, stats.Incomplete)
assert.Len(t, rp.Entries, 5)
}
type KopiaSimpleRepoIntegrationSuite struct {
suite.Suite
w *Wrapper