Groups generic item (#4353)

Use generic item structs for Groups

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #4191

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2023-09-25 17:27:43 -07:00 committed by GitHub
parent 365078a861
commit 6690fd3ea2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 93 deletions

View File

@ -6,12 +6,10 @@ import (
"io" "io"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
kjson "github.com/microsoft/kiota-serialization-json-go" kjson "github.com/microsoft/kiota-serialization-json-go"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
@ -20,12 +18,7 @@ import (
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
) )
var ( var _ data.BackupCollection = &Collection{}
_ data.BackupCollection = &Collection{}
_ data.Item = &Item{}
_ data.ItemInfo = &Item{}
_ data.ItemModTime = &Item{}
)
const ( const (
collectionChannelBufferSize = 1000 collectionChannelBufferSize = 1000
@ -81,57 +74,6 @@ func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.I
return col.stream return col.stream
} }
// ---------------------------------------------------------------------------
// items
// ---------------------------------------------------------------------------
// Item represents a single item retrieved from exchange
type Item struct {
id string
message []byte
info *details.GroupsInfo
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
// request to provide modtime in ItemInfo structs.
modTime time.Time
// true if the item was marked by graph as deleted.
deleted bool
}
func (i *Item) ID() string {
return i.id
}
func (i *Item) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(i.message))
}
func (i Item) Deleted() bool {
return i.deleted
}
func (i *Item) Info() (details.ItemInfo, error) {
return details.ItemInfo{Groups: i.info}, nil
}
func (i *Item) ModTime() time.Time {
return i.modTime
}
func NewItem(
identifier string,
dataBytes []byte,
detail details.GroupsInfo,
modTime time.Time,
) Item {
return Item{
id: identifier,
message: dataBytes,
info: &detail,
modTime: modTime,
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// items() production // items() production
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -172,11 +114,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
col.stream <- &Item{ col.stream <- data.NewDeletedItem(id)
id: id,
modTime: time.Now().UTC(), // removed items have no modTime entry.
deleted: true,
}
atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, 0) atomic.AddInt64(&totalBytes, 0)
@ -221,7 +159,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
return return
} }
data, err := writer.GetSerializedContent() itemData, err := writer.GetSerializedContent()
if err != nil { if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "serializing channel message")) el.AddRecoverable(ctx, clues.Wrap(err, "serializing channel message"))
return return
@ -229,12 +167,10 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
info.ParentPath = col.LocationPath().String() info.ParentPath = col.LocationPath().String()
col.stream <- &Item{ col.stream <- data.NewPrefetchedItem(
id: ptr.Val(item.GetId()), io.NopCloser(bytes.NewReader(itemData)),
message: data, id,
info: info, details.ItemInfo{Groups: info})
modTime: info.Modified,
}
atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size) atomic.AddInt64(&totalBytes, info.Size)

View File

@ -2,6 +2,7 @@ package groups
import ( import (
"bytes" "bytes"
"io"
"testing" "testing"
"time" "time"
@ -14,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/collection/groups/mock" "github.com/alcionai/corso/src/internal/m365/collection/groups/mock"
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -27,31 +29,37 @@ func TestCollectionUnitSuite(t *testing.T) {
suite.Run(t, &CollectionUnitSuite{Suite: tester.NewUnitSuite(t)}) suite.Run(t, &CollectionUnitSuite{Suite: tester.NewUnitSuite(t)})
} }
func (suite *CollectionUnitSuite) TestReader_Valid() { func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() {
m := []byte("test message") table := []struct {
description := "aFile" name string
ed := &Item{id: description, message: m} readData []byte
}{
{
name: "HasData",
readData: []byte("test message"),
},
{
name: "Empty",
readData: []byte{},
},
}
buf := &bytes.Buffer{} for _, test := range table {
_, err := buf.ReadFrom(ed.ToReader()) suite.Run(test.name, func() {
assert.NoError(suite.T(), err, clues.ToCore(err)) t := suite.T()
assert.Equal(suite.T(), buf.Bytes(), m)
assert.Equal(suite.T(), description, ed.ID())
}
func (suite *CollectionUnitSuite) TestReader_Empty() { ed := data.NewPrefetchedItem(
var ( io.NopCloser(bytes.NewReader(test.readData)),
empty []byte "itemID",
expected int64 details.ItemInfo{})
t = suite.T()
)
ed := &Item{message: empty} buf := &bytes.Buffer{}
buf := &bytes.Buffer{} _, err := buf.ReadFrom(ed.ToReader())
received, err := buf.ReadFrom(ed.ToReader()) assert.NoError(t, err, "reading data: %v", clues.ToCore(err))
assert.Equal(t, test.readData, buf.Bytes(), "read data")
assert.Equal(t, expected, received) assert.Equal(t, "itemID", ed.ID(), "item ID")
assert.NoError(t, err, clues.ToCore(err)) })
}
} }
func (suite *CollectionUnitSuite) TestNewCollection_state() { func (suite *CollectionUnitSuite) TestNewCollection_state() {