Create in-memory MetadataCollection type (#1719)
## Description Adds an in-memory collection type that can be used to pass metadata files from GraphConnector to KopiaWrapper. Meant for only small amounts of data as everything must be buffered in-memory in the collection ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) * #1685 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
cdc92b3929
commit
14a3c2e189
106
src/internal/connector/graph/metadata_collection.go
Normal file
106
src/internal/connector/graph/metadata_collection.go
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ data.Collection = &MetadataCollection{}
|
||||||
|
_ data.Stream = &MetadataItem{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// MetadataCollection in a simple collection that assumes all items to be
|
||||||
|
// returned are already resident in-memory and known when the collection is
|
||||||
|
// created. This collection has no logic for lazily fetching item data.
|
||||||
|
type MetadataCollection struct {
|
||||||
|
fullPath path.Path
|
||||||
|
items []MetadataItem
|
||||||
|
statusUpdater support.StatusUpdater
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetadataCollection(
|
||||||
|
p path.Path,
|
||||||
|
items []MetadataItem,
|
||||||
|
statusUpdater support.StatusUpdater,
|
||||||
|
) *MetadataCollection {
|
||||||
|
return &MetadataCollection{
|
||||||
|
fullPath: p,
|
||||||
|
items: items,
|
||||||
|
statusUpdater: statusUpdater,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (md MetadataCollection) FullPath() path.Path {
|
||||||
|
return md.fullPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func (md MetadataCollection) Items() <-chan data.Stream {
|
||||||
|
res := make(chan data.Stream)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
totalBytes := int64(0)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// Need to report after the collection is created because otherwise
|
||||||
|
// statusUpdater may not have accounted for the fact that this collection
|
||||||
|
// will be running.
|
||||||
|
status := support.CreateStatus(
|
||||||
|
context.TODO(),
|
||||||
|
support.Backup,
|
||||||
|
1,
|
||||||
|
support.CollectionMetrics{
|
||||||
|
Objects: len(md.items),
|
||||||
|
Successes: len(md.items),
|
||||||
|
TotalBytes: totalBytes,
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
md.fullPath.Folder(),
|
||||||
|
)
|
||||||
|
|
||||||
|
md.statusUpdater(status)
|
||||||
|
}()
|
||||||
|
defer close(res)
|
||||||
|
|
||||||
|
for _, item := range md.items {
|
||||||
|
totalBytes += int64(len(item.data))
|
||||||
|
res <- item
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetadataItem is an in-memory data.Stream implementation. MetadataItem does
|
||||||
|
// not implement additional interfaces like data.StreamInfo, so it should only
|
||||||
|
// be used for items with a small amount of content that don't need to be added
|
||||||
|
// to backup details.
|
||||||
|
//
|
||||||
|
// Currently the expected use-case for this struct are storing metadata for a
|
||||||
|
// backup like delta tokens or a mapping of container IDs to container paths.
|
||||||
|
type MetadataItem struct {
|
||||||
|
// uuid is an ID that can be used to refer to the item.
|
||||||
|
uuid string
|
||||||
|
// data is a buffer of data that the item refers to.
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetadataItem(uuid string, itemData []byte) MetadataItem {
|
||||||
|
return MetadataItem{
|
||||||
|
uuid: uuid,
|
||||||
|
data: itemData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mi MetadataItem) UUID() string {
|
||||||
|
return mi.uuid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mi MetadataItem) ToReader() io.ReadCloser {
|
||||||
|
return io.NopCloser(bytes.NewReader(mi.data))
|
||||||
|
}
|
||||||
102
src/internal/connector/graph/metadata_collection_test.go
Normal file
102
src/internal/connector/graph/metadata_collection_test.go
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
package graph_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MetadataCollectionUnitSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetadataCollectionUnitSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(MetadataCollectionUnitSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *MetadataCollectionUnitSuite) TestFullPath() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
p, err := path.Builder{}.
|
||||||
|
Append("foo").
|
||||||
|
ToDataLayerExchangePathForCategory(
|
||||||
|
"a-tenant",
|
||||||
|
"a-user",
|
||||||
|
path.EmailCategory,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
c := graph.NewMetadataCollection(p, nil, nil)
|
||||||
|
|
||||||
|
assert.Equal(t, p.String(), c.FullPath().String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *MetadataCollectionUnitSuite) TestItems() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
itemNames := []string{
|
||||||
|
"a",
|
||||||
|
"aa",
|
||||||
|
}
|
||||||
|
itemData := [][]byte{
|
||||||
|
[]byte("a"),
|
||||||
|
[]byte("aa"),
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(
|
||||||
|
t,
|
||||||
|
len(itemNames),
|
||||||
|
len(itemData),
|
||||||
|
"Requires same number of items and data",
|
||||||
|
)
|
||||||
|
|
||||||
|
items := []graph.MetadataItem{}
|
||||||
|
|
||||||
|
for i := 0; i < len(itemNames); i++ {
|
||||||
|
items = append(items, graph.NewMetadataItem(itemNames[i], itemData[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := path.Builder{}.
|
||||||
|
Append("foo").
|
||||||
|
ToDataLayerExchangePathForCategory(
|
||||||
|
"a-tenant",
|
||||||
|
"a-user",
|
||||||
|
path.EmailCategory,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
c := graph.NewMetadataCollection(
|
||||||
|
p,
|
||||||
|
items,
|
||||||
|
func(c *support.ConnectorOperationStatus) {
|
||||||
|
assert.Equal(t, len(itemNames), c.ObjectCount)
|
||||||
|
assert.Equal(t, len(itemNames), c.Successful)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
gotData := [][]byte{}
|
||||||
|
gotNames := []string{}
|
||||||
|
|
||||||
|
for s := range c.Items() {
|
||||||
|
gotNames = append(gotNames, s.UUID())
|
||||||
|
|
||||||
|
buf, err := io.ReadAll(s.ToReader())
|
||||||
|
if !assert.NoError(t, err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
gotData = append(gotData, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, itemNames, gotNames)
|
||||||
|
assert.ElementsMatch(t, itemData, gotData)
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user