From a3dabaa58947d734e28fca6cfdab614e45dd51f7 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Mon, 25 Sep 2023 10:21:26 -0700 Subject: [PATCH] Make generic version of item structs (#4350) Generic structs implement items that have details associated with them. Items that don't have details will need a separate struct since we use a downcast to determine if we can get details. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [x] :broom: Tech Debt/Cleanup #### Issue(s) * #4191 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/data/item.go | 170 ++++++++++++++++ src/internal/data/item_test.go | 343 +++++++++++++++++++++++++++++++++ 2 files changed, 513 insertions(+) create mode 100644 src/internal/data/item.go create mode 100644 src/internal/data/item_test.go diff --git a/src/internal/data/item.go b/src/internal/data/item.go new file mode 100644 index 000000000..2becd287e --- /dev/null +++ b/src/internal/data/item.go @@ -0,0 +1,170 @@ +package data + +import ( + "bytes" + "context" + "io" + "time" + + "github.com/alcionai/clues" + "github.com/spatialcurrent/go-lazy/pkg/lazy" + + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" +) + +func NewDeletedItem(itemID string) Item { + return &prefetchedItem{ + id: itemID, + deleted: true, + // TODO(ashmrtn): This really doesn't need to be set since deleted items are + // never passed to the actual storage engine. Setting it for now so tests + // don't break. + modTime: time.Now(), + } +} + +func NewPrefetchedItem( + reader io.ReadCloser, + itemID string, + info details.ItemInfo, +) Item { + return &prefetchedItem{ + id: itemID, + reader: reader, + info: info, + modTime: info.Modified(), + } +} + +// prefetchedItem represents a single item retrieved from the remote service. +type prefetchedItem struct { + id string + reader io.ReadCloser + info details.ItemInfo + // modTime is the modified time of the item. It should match the modTime in + // info if info is present. Here as a separate field so that deleted items + // don't error out by trying to source it from info. + modTime time.Time + + // deleted flags if this item has been removed in the remote service and + // should be removed in storage as well. + deleted bool +} + +func (i prefetchedItem) ID() string { + return i.id +} + +func (i *prefetchedItem) ToReader() io.ReadCloser { + return i.reader +} + +func (i prefetchedItem) Deleted() bool { + return i.deleted +} + +func (i prefetchedItem) Info() (details.ItemInfo, error) { + return i.info, nil +} + +func (i prefetchedItem) ModTime() time.Time { + return i.modTime +} + +type ItemDataGetter interface { + GetData(context.Context) (io.ReadCloser, *details.ItemInfo, bool, error) +} + +func NewLazyItem( + ctx context.Context, + itemGetter ItemDataGetter, + itemID string, + modTime time.Time, + errs *fault.Bus, +) Item { + return &lazyItem{ + ctx: ctx, + id: itemID, + itemGetter: itemGetter, + modTime: modTime, + errs: errs, + } +} + +// lazyItem represents a single item retrieved from the remote service. It +// lazily fetches the item's data when the first call to ToReader().Read() is +// made. +type lazyItem struct { + ctx context.Context + id string + errs *fault.Bus + itemGetter ItemDataGetter + + modTime time.Time + // info holds the details information for this item. Store a pointer in this + // struct so we can tell if it's been set already or not. + // + // This also helps with garbage collection because now the golang garbage + // collector can collect the lazyItem struct once the storage engine is done + // with it. The ItemInfo struct needs to stick around until the end of the + // backup though as backup details is written last. + info *details.ItemInfo + + delInFlight bool +} + +func (i lazyItem) ID() string { + return i.id +} + +func (i *lazyItem) ToReader() io.ReadCloser { + return lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + reader, info, delInFlight, err := i.itemGetter.GetData(i.ctx) + if err != nil { + err = clues.Stack(err) + i.errs.AddRecoverable(i.ctx, err) + + return nil, err + } + + // If an item was deleted then return an empty file so we don't fail the + // backup and return a sentinel error when asked for ItemInfo so we don't + // display the item in the backup. + // + // The item will be deleted from storage on the next backup when either the + // delta token shows it's removed or we do a full backup (token expired + // etc.) and the item isn't enumerated in that set. + if delInFlight { + logger.Ctx(i.ctx).Info("item not found") + + i.delInFlight = true + + return io.NopCloser(bytes.NewReader([]byte{})), nil + } + + i.info = info + + return reader, nil + }) +} + +func (i lazyItem) Deleted() bool { + return false +} + +func (i lazyItem) Info() (details.ItemInfo, error) { + if i.delInFlight { + return details.ItemInfo{}, clues.Stack(ErrNotFound).WithClues(i.ctx) + } else if i.info == nil { + return details.ItemInfo{}, clues.New("requesting ItemInfo before data retrieval"). + WithClues(i.ctx) + } + + return *i.info, nil +} + +func (i lazyItem) ModTime() time.Time { + return i.modTime +} diff --git a/src/internal/data/item_test.go b/src/internal/data/item_test.go new file mode 100644 index 000000000..c1b91e3df --- /dev/null +++ b/src/internal/data/item_test.go @@ -0,0 +1,343 @@ +package data_test + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" +) + +type errReader struct { + io.ReadCloser + readCount int + errAfter int + err error +} + +func (er *errReader) Read(p []byte) (int, error) { + if er.err != nil && er.readCount == er.errAfter { + return 0, er.err + } + + toRead := len(p) + if er.readCount+toRead > er.errAfter { + toRead = er.errAfter - er.readCount + } + + n, err := er.ReadCloser.Read(p[:toRead]) + er.readCount += n + + return n, err +} + +type ItemUnitSuite struct { + tester.Suite +} + +func TestItemUnitSuite(t *testing.T) { + suite.Run(t, &ItemUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *ItemUnitSuite) TestDeletedItem() { + var ( + t = suite.T() + + id = "foo" + item = data.NewDeletedItem(id) + ) + + assert.Equal(t, id, item.ID(), "ID") + assert.True(t, item.Deleted(), "deleted") +} + +func (suite *ItemUnitSuite) TestPrefetchedItem() { + var ( + id = "foo" + now = time.Now() + + baseData = []byte("hello world") + ) + + table := []struct { + name string + reader io.ReadCloser + info details.ItemInfo + + readErr require.ErrorAssertionFunc + expectData []byte + }{ + { + name: "EmptyReader", + reader: io.NopCloser(bytes.NewReader([]byte{})), + info: details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, + readErr: require.NoError, + expectData: []byte{}, + }, + { + name: "ReaderWithData", + reader: io.NopCloser(bytes.NewReader(baseData)), + info: details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, + readErr: require.NoError, + expectData: baseData, + }, + { + name: "ReaderWithData DifferentService", + reader: io.NopCloser(bytes.NewReader(baseData)), + info: details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, + readErr: require.NoError, + expectData: baseData, + }, + { + name: "ReaderWithData ReadError", + reader: &errReader{ + ReadCloser: io.NopCloser(bytes.NewReader(baseData)), + errAfter: 5, + err: assert.AnError, + }, + info: details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, + readErr: require.Error, + expectData: baseData[:5], + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + item := data.NewPrefetchedItem(test.reader, id, test.info) + + assert.Equal(t, id, item.ID(), "ID") + assert.False(t, item.Deleted(), "deleted") + assert.Equal( + t, + test.info.Modified(), + item.(data.ItemModTime).ModTime(), + "mod time") + + readData, err := io.ReadAll(item.ToReader()) + test.readErr(t, err, clues.ToCore(err), "read error") + assert.Equal(t, test.expectData, readData, "read data") + }) + } +} + +type mockItemDataGetter struct { + getCalled bool + + reader io.ReadCloser + info *details.ItemInfo + delInFlight bool + err error +} + +func (mid *mockItemDataGetter) check(t *testing.T, expectCalled bool) { + assert.Equal(t, expectCalled, mid.getCalled, "GetData() called") +} + +func (mid *mockItemDataGetter) GetData( + context.Context, +) (io.ReadCloser, *details.ItemInfo, bool, error) { + mid.getCalled = true + return mid.reader, mid.info, mid.delInFlight, mid.err +} + +func (suite *ItemUnitSuite) TestLazyItem() { + var ( + id = "foo" + now = time.Now() + + baseData = []byte("hello world") + ) + + table := []struct { + name string + mid *mockItemDataGetter + readErr assert.ErrorAssertionFunc + infoErr assert.ErrorAssertionFunc + expectData []byte + expectBusErr bool + }{ + { + name: "EmptyReader", + mid: &mockItemDataGetter{ + reader: io.NopCloser(bytes.NewReader([]byte{})), + info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, + }, + readErr: assert.NoError, + infoErr: assert.NoError, + expectData: []byte{}, + }, + { + name: "ReaderWithData", + mid: &mockItemDataGetter{ + reader: io.NopCloser(bytes.NewReader(baseData)), + info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, + }, + readErr: assert.NoError, + infoErr: assert.NoError, + expectData: baseData, + }, + { + name: "ReaderWithData", + mid: &mockItemDataGetter{ + reader: io.NopCloser(bytes.NewReader(baseData)), + info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, + }, + readErr: assert.NoError, + infoErr: assert.NoError, + expectData: baseData, + }, + { + name: "ReaderWithData GetDataError", + mid: &mockItemDataGetter{ + err: assert.AnError, + }, + readErr: assert.Error, + infoErr: assert.Error, + expectData: []byte{}, + expectBusErr: true, + }, + { + name: "ReaderWithData ReadError", + mid: &mockItemDataGetter{ + reader: &errReader{ + ReadCloser: io.NopCloser(bytes.NewReader(baseData)), + errAfter: 5, + err: assert.AnError, + }, + info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, + }, + readErr: assert.Error, + infoErr: assert.NoError, + expectData: baseData[:5], + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + errs := fault.New(true) + + defer test.mid.check(t, true) + + item := data.NewLazyItem( + ctx, + test.mid, + id, + now, + errs) + + assert.Equal(t, id, item.ID(), "ID") + assert.False(t, item.Deleted(), "deleted") + assert.Equal( + t, + now, + item.(data.ItemModTime).ModTime(), + "mod time") + + // Read data to execute lazy reader. + readData, err := io.ReadAll(item.ToReader()) + test.readErr(t, err, clues.ToCore(err), "read error") + assert.Equal(t, test.expectData, readData, "read data") + + _, err = item.(data.ItemInfo).Info() + test.infoErr(t, err, "Info(): %v", clues.ToCore(err)) + + e := errs.Errors() + + if !test.expectBusErr { + assert.Nil(t, e.Failure, "hard failure") + assert.Empty(t, e.Recovered, "recovered") + + return + } + + assert.NotNil(t, e.Failure, "hard failure") + }) + } +} + +func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() { + var ( + id = "foo" + now = time.Now() + ) + + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + errs := fault.New(true) + + mid := &mockItemDataGetter{delInFlight: true} + defer mid.check(t, true) + + item := data.NewLazyItem(ctx, mid, id, now, errs) + + assert.Equal(t, id, item.ID(), "ID") + assert.False(t, item.Deleted(), "deleted") + assert.Equal( + t, + now, + item.(data.ItemModTime).ModTime(), + "mod time") + + // Read data to execute lazy reader. + readData, err := io.ReadAll(item.ToReader()) + require.NoError(t, err, clues.ToCore(err), "read error") + assert.Empty(t, readData, "read data") + + _, err = item.(data.ItemInfo).Info() + assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") + + e := errs.Errors() + + assert.Nil(t, e.Failure, "hard failure") + assert.Empty(t, e.Recovered, "recovered") +} + +func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() { + var ( + id = "foo" + now = time.Now() + ) + + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + errs := fault.New(true) + + mid := &mockItemDataGetter{} + defer mid.check(t, false) + + item := data.NewLazyItem(ctx, mid, id, now, errs) + + assert.Equal(t, id, item.ID(), "ID") + assert.False(t, item.Deleted(), "deleted") + assert.Equal( + t, + now, + item.(data.ItemModTime).ModTime(), + "mod time") + + _, err := item.(data.ItemInfo).Info() + assert.Error(t, err, "Info() error") +}