diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index a1e364b93..b8ff3b436 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -6,12 +6,10 @@ import ( "io" "sync" "sync/atomic" - "time" "github.com/alcionai/clues" kjson "github.com/microsoft/kiota-serialization-json-go" - "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/observe" @@ -20,12 +18,7 @@ import ( "github.com/alcionai/corso/src/pkg/logger" ) -var ( - _ data.BackupCollection = &Collection{} - _ data.Item = &Item{} - _ data.ItemInfo = &Item{} - _ data.ItemModTime = &Item{} -) +var _ data.BackupCollection = &Collection{} const ( collectionChannelBufferSize = 1000 @@ -81,57 +74,6 @@ func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.I return col.stream } -// --------------------------------------------------------------------------- -// items -// --------------------------------------------------------------------------- - -// Item represents a single item retrieved from exchange -type Item struct { - id string - message []byte - info *details.GroupsInfo - // TODO(ashmrtn): Can probably eventually be sourced from info as there's a - // request to provide modtime in ItemInfo structs. - modTime time.Time - - // true if the item was marked by graph as deleted. - deleted bool -} - -func (i *Item) ID() string { - return i.id -} - -func (i *Item) ToReader() io.ReadCloser { - return io.NopCloser(bytes.NewReader(i.message)) -} - -func (i Item) Deleted() bool { - return i.deleted -} - -func (i *Item) Info() (details.ItemInfo, error) { - return details.ItemInfo{Groups: i.info}, nil -} - -func (i *Item) ModTime() time.Time { - return i.modTime -} - -func NewItem( - identifier string, - dataBytes []byte, - detail details.GroupsInfo, - modTime time.Time, -) Item { - return Item{ - id: identifier, - message: dataBytes, - info: &detail, - modTime: modTime, - } -} - // --------------------------------------------------------------------------- // items() production // --------------------------------------------------------------------------- @@ -172,11 +114,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { defer wg.Done() defer func() { <-semaphoreCh }() - col.stream <- &Item{ - id: id, - modTime: time.Now().UTC(), // removed items have no modTime entry. - deleted: true, - } + col.stream <- data.NewDeletedItem(id) atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&totalBytes, 0) @@ -221,7 +159,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { return } - data, err := writer.GetSerializedContent() + itemData, err := writer.GetSerializedContent() if err != nil { el.AddRecoverable(ctx, clues.Wrap(err, "serializing channel message")) return @@ -229,12 +167,10 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { info.ParentPath = col.LocationPath().String() - col.stream <- &Item{ - id: ptr.Val(item.GetId()), - message: data, - info: info, - modTime: info.Modified, - } + col.stream <- data.NewPrefetchedItem( + io.NopCloser(bytes.NewReader(itemData)), + id, + details.ItemInfo{Groups: info}) atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&totalBytes, info.Size) diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index 118fc778b..be4c52dc7 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -2,6 +2,7 @@ package groups import ( "bytes" + "io" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/collection/groups/mock" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -27,31 +29,37 @@ func TestCollectionUnitSuite(t *testing.T) { suite.Run(t, &CollectionUnitSuite{Suite: tester.NewUnitSuite(t)}) } -func (suite *CollectionUnitSuite) TestReader_Valid() { - m := []byte("test message") - description := "aFile" - ed := &Item{id: description, message: m} +func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() { + table := []struct { + name string + readData []byte + }{ + { + name: "HasData", + readData: []byte("test message"), + }, + { + name: "Empty", + readData: []byte{}, + }, + } - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(ed.ToReader()) - assert.NoError(suite.T(), err, clues.ToCore(err)) - assert.Equal(suite.T(), buf.Bytes(), m) - assert.Equal(suite.T(), description, ed.ID()) -} + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() -func (suite *CollectionUnitSuite) TestReader_Empty() { - var ( - empty []byte - expected int64 - t = suite.T() - ) + ed := data.NewPrefetchedItem( + io.NopCloser(bytes.NewReader(test.readData)), + "itemID", + details.ItemInfo{}) - ed := &Item{message: empty} - buf := &bytes.Buffer{} - received, err := buf.ReadFrom(ed.ToReader()) - - assert.Equal(t, expected, received) - assert.NoError(t, err, clues.ToCore(err)) + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(ed.ToReader()) + assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) + assert.Equal(t, test.readData, buf.Bytes(), "read data") + assert.Equal(t, "itemID", ed.ID(), "item ID") + }) + } } func (suite *CollectionUnitSuite) TestNewCollection_state() {