diff --git a/src/internal/data/item.go b/src/internal/data/item.go index 2becd287e..81d4befe5 100644 --- a/src/internal/data/item.go +++ b/src/internal/data/item.go @@ -21,7 +21,7 @@ func NewDeletedItem(itemID string) Item { // TODO(ashmrtn): This really doesn't need to be set since deleted items are // never passed to the actual storage engine. Setting it for now so tests // don't break. - modTime: time.Now(), + modTime: time.Now().UTC(), } } diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index 486d2d487..14d3cf301 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -12,7 +12,6 @@ import ( "time" "github.com/alcionai/clues" - "github.com/spatialcurrent/go-lazy/pkg/lazy" "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/data" @@ -27,9 +26,7 @@ import ( var ( _ data.BackupCollection = &prefetchCollection{} - _ data.Item = &Item{} - _ data.ItemInfo = &Item{} - _ data.ItemModTime = &Item{} + _ data.BackupCollection = &lazyFetchCollection{} ) const ( @@ -230,11 +227,7 @@ func (col *prefetchCollection) streamItems( defer wg.Done() defer func() { <-semaphoreCh }() - stream <- &Item{ - id: id, - modTime: time.Now().UTC(), // removed items have no modTime entry. - deleted: true, - } + stream <- data.NewDeletedItem(id) atomic.AddInt64(&success, 1) @@ -285,12 +278,10 @@ func (col *prefetchCollection) streamItems( return } - stream <- &Item{ - id: id, - message: itemData, - info: info, - modTime: info.Modified, - } + stream <- data.NewPrefetchedItem( + io.NopCloser(bytes.NewReader(itemData)), + id, + details.ItemInfo{Exchange: info}) atomic.AddInt64(&success, 1) atomic.AddInt64(&totalBytes, info.Size) @@ -377,11 +368,7 @@ func (col *lazyFetchCollection) streamItems( // delete all removed items for id := range col.removed { - stream <- &Item{ - id: id, - modTime: time.Now().UTC(), // removed items have no modTime entry. - deleted: true, - } + stream <- data.NewDeletedItem(id) atomic.AddInt64(&success, 1) @@ -405,16 +392,19 @@ func (col *lazyFetchCollection) streamItems( "service", path.ExchangeService.String(), "category", col.Category().String()) - stream <- &lazyItem{ - ctx: ictx, - userID: user, - id: id, - getter: col.getter, - modTime: modTime, - immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs, - parentPath: parentPath, - errs: errs, - } + stream <- data.NewLazyItem( + ictx, + &lazyItemGetter{ + userID: user, + itemID: id, + getter: col.getter, + modTime: modTime, + immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs, + parentPath: parentPath, + }, + id, + modTime, + errs) atomic.AddInt64(&success, 1) @@ -424,137 +414,48 @@ func (col *lazyFetchCollection) streamItems( } } -// Item represents a single item retrieved from exchange -type Item struct { - id string - // TODO: We may need this to be a "oneOf" of `message`, `contact`, etc. - // going forward. Using []byte for now but I assume we'll have - // some structured type in here (serialization to []byte can be done in `Read`) - message []byte - info *details.ExchangeInfo // temporary change to bring populate function into directory - // TODO(ashmrtn): Can probably eventually be sourced from info as there's a - // request to provide modtime in ItemInfo structs. - modTime time.Time - - // true if the item was marked by graph as deleted. - deleted bool -} - -func (i *Item) ID() string { - return i.id -} - -func (i *Item) ToReader() io.ReadCloser { - return io.NopCloser(bytes.NewReader(i.message)) -} - -func (i Item) Deleted() bool { - return i.deleted -} - -func (i *Item) Info() (details.ItemInfo, error) { - return details.ItemInfo{Exchange: i.info}, nil -} - -func (i *Item) ModTime() time.Time { - return i.modTime -} - -func NewItem( - identifier string, - dataBytes []byte, - detail details.ExchangeInfo, - modTime time.Time, -) Item { - return Item{ - id: identifier, - message: dataBytes, - info: &detail, - modTime: modTime, - } -} - -// lazyItem represents a single item retrieved from exchange that lazily fetches -// the item's data when the first call to ToReader().Read() is made. -type lazyItem struct { - ctx context.Context - userID string - id string - parentPath string - getter itemGetterSerializer - errs *fault.Bus - - modTime time.Time - // info holds the Exchnage-specific details information for this item. Store - // a pointer in this struct so the golang garbage collector can collect the - // Item struct once kopia is done with it. The ExchangeInfo struct needs to - // stick around until the end of the backup though as backup details is - // written last. - info *details.ExchangeInfo - +type lazyItemGetter struct { + getter itemGetterSerializer + userID string + itemID string + parentPath string + modTime time.Time immutableIDs bool - - delInFlight bool } -func (i lazyItem) ID() string { - return i.id -} - -func (i *lazyItem) ToReader() io.ReadCloser { - return lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - itemData, info, err := getItemAndInfo( - i.ctx, - i.getter, - i.userID, - i.ID(), - i.immutableIDs, - i.parentPath) - if err != nil { - // If an item was deleted then return an empty file so we don't fail - // the backup and return a sentinel error when asked for ItemInfo so - // we don't display the item in the backup. - // - // The item will be deleted from kopia on the next backup when the - // delta token shows it's removed. - if graph.IsErrDeletedInFlight(err) { - logger.CtxErr(i.ctx, err).Info("item not found") - - i.delInFlight = true - - return io.NopCloser(bytes.NewReader([]byte{})), nil - } - - err = clues.Stack(err) - i.errs.AddRecoverable(i.ctx, err) - - return nil, err +func (lig *lazyItemGetter) GetData( + ctx context.Context, +) (io.ReadCloser, *details.ItemInfo, bool, error) { + itemData, info, err := getItemAndInfo( + ctx, + lig.getter, + lig.userID, + lig.itemID, + lig.immutableIDs, + lig.parentPath) + if err != nil { + // If an item was deleted then return an empty file so we don't fail + // the backup and return a sentinel error when asked for ItemInfo so + // we don't display the item in the backup. + // + // The item will be deleted from kopia on the next backup when the + // delta token shows it's removed. + if graph.IsErrDeletedInFlight(err) { + logger.CtxErr(ctx, err).Info("item not found") + return nil, nil, true, nil } - i.info = info - // Update the mod time to what we already told kopia about. This is required - // for proper details merging. - i.info.Modified = i.modTime + err = clues.Stack(err) - return io.NopCloser(bytes.NewReader(itemData)), nil - }) -} - -func (i lazyItem) Deleted() bool { - return false -} - -func (i lazyItem) Info() (details.ItemInfo, error) { - if i.delInFlight { - return details.ItemInfo{}, clues.Stack(data.ErrNotFound).WithClues(i.ctx) - } else if i.info == nil { - return details.ItemInfo{}, clues.New("requesting ItemInfo before data retrieval"). - WithClues(i.ctx) + return nil, nil, false, err } - return details.ItemInfo{Exchange: i.info}, nil -} + // Update the mod time to what we already told kopia about. This is required + // for proper details merging. + info.Modified = lig.modTime -func (i lazyItem) ModTime() time.Time { - return i.modTime + return io.NopCloser(bytes.NewReader(itemData)), + &details.ItemInfo{Exchange: info}, + false, + nil } diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index 39a7e2fc5..5e1665faa 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -36,31 +36,37 @@ func TestCollectionUnitSuite(t *testing.T) { suite.Run(t, &CollectionUnitSuite{Suite: tester.NewUnitSuite(t)}) } -func (suite *CollectionUnitSuite) TestReader_Valid() { - m := []byte("test message") - description := "aFile" - ed := &Item{id: description, message: m} +func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() { + table := []struct { + name string + readData []byte + }{ + { + name: "HasData", + readData: []byte("test message"), + }, + { + name: "Empty", + readData: []byte{}, + }, + } - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(ed.ToReader()) - assert.NoError(suite.T(), err, clues.ToCore(err)) - assert.Equal(suite.T(), buf.Bytes(), m) - assert.Equal(suite.T(), description, ed.ID()) -} + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() -func (suite *CollectionUnitSuite) TestReader_Empty() { - var ( - empty []byte - expected int64 - t = suite.T() - ) + ed := data.NewPrefetchedItem( + io.NopCloser(bytes.NewReader(test.readData)), + "itemID", + details.ItemInfo{}) - ed := &Item{message: empty} - buf := &bytes.Buffer{} - received, err := buf.ReadFrom(ed.ToReader()) - - assert.Equal(t, expected, received) - assert.NoError(t, err, clues.ToCore(err)) + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(ed.ToReader()) + assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) + assert.Equal(t, test.readData, buf.Bytes(), "read data") + assert.Equal(t, "itemID", ed.ID(), "item ID") + }) + } } func (suite *CollectionUnitSuite) TestNewCollection_state() { @@ -480,9 +486,14 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() { ctx, flush := tester.NewContext(t) defer flush() - li := lazyItem{ctx: ctx} + li := data.NewLazyItem( + ctx, + nil, + "itemID", + time.Now(), + fault.New(true)) - _, err := li.Info() + _, err := li.(data.ItemInfo).Info() assert.Error(suite.T(), err, "Info without reading data should error") } @@ -558,30 +569,37 @@ func (suite *CollectionUnitSuite) TestLazyItem() { SerializeErr: test.serializeErr, } - li := &lazyItem{ - ctx: ctx, - userID: "userID", - id: "itemID", - parentPath: parentPath, - getter: getter, - errs: fault.New(true), - modTime: test.modTime, - immutableIDs: false, - } + li := data.NewLazyItem( + ctx, + &lazyItemGetter{ + userID: "userID", + itemID: "itemID", + getter: getter, + modTime: test.modTime, + immutableIDs: false, + parentPath: parentPath, + }, + "itemID", + test.modTime, + fault.New(true)) assert.False(t, li.Deleted(), "item shouldn't be marked deleted") - assert.Equal(t, test.modTime, li.ModTime(), "item mod time") + assert.Equal( + t, + test.modTime, + li.(data.ItemModTime).ModTime(), + "item mod time") - data, err := io.ReadAll(li.ToReader()) + readData, err := io.ReadAll(li.ToReader()) if test.expectReadErrType == nil { assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) } else { assert.ErrorIs(t, err, test.expectReadErrType, "read error") } - test.dataCheck(t, data, "read item data") + test.dataCheck(t, readData, "read item data") - info, err := li.Info() + info, err := li.(data.ItemInfo).Info() // Didn't expect an error getting info, it should be valid. if !test.expectInfoErr {