GC: SharePoint: Backup: Abstract / Serialize (#2187)
## Description Changes address updates to `sharePoint.Collection.Populate()`. - SharePoint Collections support `Lists` and `Pages`. Drives are supported in OneDrive at this time. - List's serialize function is abstracted to support `Pages`. Collection needs to support List and Pages support. Additionally, needs to use a similar interface as in `exchange` to ensure retry and async collection population <!-- Insert PR description--> ## Does this PR need a docs update or release note? - [x] ✅ Yes, it's included ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature ## Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * related to #2071 <issue> ## Test Plan Test can be completed locally. Per Issue #2086, the Beta library interferes with overall caching. `TestSharePointPageCollection_Populate()` inspects changes <!-- How will this be tested prior to merging.--> - [x] ⚡ Unit test
This commit is contained in:
parent
adbe85b47f
commit
3d244c9fea
@ -19,7 +19,7 @@ import (
|
||||
|
||||
// GetSitePages retrieves a collection of Pages related to the give Site.
|
||||
// Returns error if error experienced during the call
|
||||
func GetSitePage(
|
||||
func GetSitePages(
|
||||
ctx context.Context,
|
||||
serv *discover.BetaService,
|
||||
siteID string,
|
||||
|
||||
@ -61,7 +61,7 @@ func (suite *SharePointPageSuite) TestFetchPages() {
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *SharePointPageSuite) TestGetSitePage() {
|
||||
func (suite *SharePointPageSuite) TestGetSitePages() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
@ -71,7 +71,7 @@ func (suite *SharePointPageSuite) TestGetSitePage() {
|
||||
require.NotNil(t, tuples)
|
||||
|
||||
jobs := []string{tuples[0].ID}
|
||||
pages, err := api.GetSitePage(ctx, suite.service, suite.siteID, jobs)
|
||||
pages, err := api.GetSitePages(ctx, suite.service, suite.siteID, jobs)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, pages)
|
||||
}
|
||||
|
||||
@ -3,18 +3,22 @@ package sharepoint
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
absser "github.com/microsoft/kiota-abstractions-go/serialization"
|
||||
kw "github.com/microsoft/kiota-serialization-json-go"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/discovery/api"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
sapi "github.com/alcionai/corso/src/internal/connector/sharepoint/api"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
@ -27,6 +31,7 @@ const (
|
||||
Unknown DataCategory = iota
|
||||
List
|
||||
Drive
|
||||
Pages
|
||||
)
|
||||
|
||||
var (
|
||||
@ -36,6 +41,12 @@ var (
|
||||
_ data.StreamModTime = &Item{}
|
||||
)
|
||||
|
||||
type numMetrics struct {
|
||||
attempts int
|
||||
success int
|
||||
totalBytes int64
|
||||
}
|
||||
|
||||
// Collection is the SharePoint.List implementation of data.Collection. SharePoint.Libraries collections are supported
|
||||
// by the oneDrive.Collection as the calls are identical for populating the Collection
|
||||
type Collection struct {
|
||||
@ -46,7 +57,9 @@ type Collection struct {
|
||||
// jobs contain the SharePoint.Site.ListIDs for the associated list(s).
|
||||
jobs []string
|
||||
// M365 IDs of the items of this collection
|
||||
category DataCategory
|
||||
service graph.Servicer
|
||||
ctrl control.Options
|
||||
betaService *api.BetaService
|
||||
statusUpdater support.StatusUpdater
|
||||
}
|
||||
@ -55,6 +68,7 @@ type Collection struct {
|
||||
func NewCollection(
|
||||
folderPath path.Path,
|
||||
service graph.Servicer,
|
||||
category DataCategory,
|
||||
statusUpdater support.StatusUpdater,
|
||||
) *Collection {
|
||||
c := &Collection{
|
||||
@ -63,6 +77,7 @@ func NewCollection(
|
||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||
service: service,
|
||||
statusUpdater: statusUpdater,
|
||||
category: category,
|
||||
}
|
||||
|
||||
return c
|
||||
@ -160,10 +175,9 @@ func (sc *Collection) finishPopulation(ctx context.Context, attempts, success in
|
||||
// populate utility function to retrieve data from back store for a given collection
|
||||
func (sc *Collection) populate(ctx context.Context) {
|
||||
var (
|
||||
objects, success int
|
||||
totalBytes, arrayLength int64
|
||||
errs error
|
||||
writer = kw.NewJsonSerializationWriter()
|
||||
metrics numMetrics
|
||||
errs error
|
||||
writer = kw.NewJsonSerializationWriter()
|
||||
)
|
||||
|
||||
// TODO: Insert correct ID for CollectionProgress
|
||||
@ -176,25 +190,50 @@ func (sc *Collection) populate(ctx context.Context) {
|
||||
|
||||
defer func() {
|
||||
close(colProgress)
|
||||
sc.finishPopulation(ctx, objects, success, totalBytes, errs)
|
||||
sc.finishPopulation(ctx, metrics.attempts, metrics.success, metrics.totalBytes, errs)
|
||||
}()
|
||||
|
||||
// Retrieve list data from M365
|
||||
// Switch retrieval function based on category
|
||||
switch sc.category {
|
||||
case List:
|
||||
metrics, errs = sc.retrieveLists(ctx, writer, colProgress)
|
||||
case Pages:
|
||||
metrics, errs = sc.retrievePages(ctx, writer, colProgress)
|
||||
}
|
||||
}
|
||||
|
||||
// retrieveLists utility function for collection that downloads and serializes
|
||||
// models.Listable objects based on M365 IDs from the jobs field.
|
||||
func (sc *Collection) retrieveLists(
|
||||
ctx context.Context,
|
||||
wtr *kw.JsonSerializationWriter,
|
||||
progress chan<- struct{},
|
||||
) (numMetrics, error) {
|
||||
var (
|
||||
errs error
|
||||
metrics numMetrics
|
||||
)
|
||||
|
||||
lists, err := loadSiteLists(ctx, sc.service, sc.fullPath.ResourceOwner(), sc.jobs)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(sc.fullPath.ResourceOwner(), err, errs)
|
||||
return metrics, errors.Wrap(err, sc.fullPath.ResourceOwner())
|
||||
}
|
||||
|
||||
objects += len(lists)
|
||||
// Write Data and Send
|
||||
metrics.attempts += len(lists)
|
||||
// For each models.Listable, object is serialized and the metrics are collected.
|
||||
// The progress is objected via the passed in channel.
|
||||
for _, lst := range lists {
|
||||
byteArray, err := serializeListContent(writer, lst)
|
||||
byteArray, err := serializeContent(wtr, lst)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(*lst.GetId(), err, errs)
|
||||
if sc.ctrl.FailFast {
|
||||
return metrics, errs
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
arrayLength = int64(len(byteArray))
|
||||
arrayLength := int64(len(byteArray))
|
||||
|
||||
if arrayLength > 0 {
|
||||
t := time.Now()
|
||||
@ -202,9 +241,9 @@ func (sc *Collection) populate(ctx context.Context) {
|
||||
t = *t1
|
||||
}
|
||||
|
||||
totalBytes += arrayLength
|
||||
metrics.totalBytes += arrayLength
|
||||
|
||||
success++
|
||||
metrics.success++
|
||||
sc.data <- &Item{
|
||||
id: *lst.GetId(),
|
||||
data: io.NopCloser(bytes.NewReader(byteArray)),
|
||||
@ -212,15 +251,76 @@ func (sc *Collection) populate(ctx context.Context) {
|
||||
modTime: t,
|
||||
}
|
||||
|
||||
colProgress <- struct{}{}
|
||||
progress <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func serializeListContent(writer *kw.JsonSerializationWriter, lst models.Listable) ([]byte, error) {
|
||||
func (sc *Collection) retrievePages(
|
||||
ctx context.Context,
|
||||
wtr *kw.JsonSerializationWriter,
|
||||
progress chan<- struct{},
|
||||
) (numMetrics, error) {
|
||||
var (
|
||||
errs error
|
||||
metrics numMetrics
|
||||
)
|
||||
|
||||
betaService := sc.betaService
|
||||
if betaService == nil {
|
||||
return metrics, fmt.Errorf("beta service not found in collection")
|
||||
}
|
||||
|
||||
pages, err := sapi.GetSitePages(ctx, betaService, sc.fullPath.ResourceOwner(), sc.jobs)
|
||||
if err != nil {
|
||||
return metrics, errors.Wrap(err, sc.fullPath.ResourceOwner())
|
||||
}
|
||||
|
||||
metrics.attempts = len(pages)
|
||||
// For each models.Pageable, object is serialize and the metrics are collected and returned.
|
||||
// Pageable objects are not supported in v1.0 of msgraph at this time.
|
||||
// TODO: Verify Parsable interface supported with modified-Pageable
|
||||
for _, pg := range pages {
|
||||
byteArray, err := serializeContent(wtr, pg)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(*pg.GetId(), err, errs)
|
||||
if sc.ctrl.FailFast {
|
||||
return metrics, errs
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
arrayLength := int64(len(byteArray))
|
||||
|
||||
if arrayLength > 0 {
|
||||
t := time.Now()
|
||||
if t1 := pg.GetLastModifiedDateTime(); t1 != nil {
|
||||
t = *t1
|
||||
}
|
||||
|
||||
metrics.totalBytes += arrayLength
|
||||
metrics.success++
|
||||
sc.data <- &Item{
|
||||
id: *pg.GetId(),
|
||||
data: io.NopCloser(bytes.NewReader(byteArray)),
|
||||
info: sharePointPageInfo(pg, arrayLength),
|
||||
modTime: t,
|
||||
}
|
||||
|
||||
progress <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return numMetrics{}, nil
|
||||
}
|
||||
|
||||
func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) {
|
||||
defer writer.Close()
|
||||
|
||||
err := writer.WriteObjectValue("", lst)
|
||||
err := writer.WriteObjectValue("", obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
||||
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
||||
"github.com/alcionai/corso/src/internal/connector/sharepoint/api"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
@ -50,7 +51,7 @@ func TestSharePointCollectionSuite(t *testing.T) {
|
||||
suite.Run(t, new(SharePointCollectionSuite))
|
||||
}
|
||||
|
||||
func (suite *SharePointCollectionSuite) TestSharePointDataReader_Valid() {
|
||||
func (suite *SharePointCollectionSuite) TestCollection_Item_Read() {
|
||||
t := suite.T()
|
||||
m := []byte("test message")
|
||||
name := "aFile"
|
||||
@ -65,50 +66,105 @@ func (suite *SharePointCollectionSuite) TestSharePointDataReader_Valid() {
|
||||
assert.Equal(t, readData, m)
|
||||
}
|
||||
|
||||
// TestSharePointListCollection tests basic functionality to create
|
||||
// TestListCollection tests basic functionality to create
|
||||
// SharePoint collection and to use the data stream channel.
|
||||
func (suite *SharePointCollectionSuite) TestSharePointListCollection() {
|
||||
func (suite *SharePointCollectionSuite) TestCollection_Items() {
|
||||
t := suite.T()
|
||||
tenant := "some"
|
||||
user := "user"
|
||||
dirRoot := "directory"
|
||||
tables := []struct {
|
||||
name, itemName string
|
||||
category DataCategory
|
||||
getDir func(t *testing.T) path.Path
|
||||
getItem func(t *testing.T, itemName string) *Item
|
||||
}{
|
||||
{
|
||||
name: "List",
|
||||
itemName: "MockListing",
|
||||
category: List,
|
||||
getDir: func(t *testing.T) path.Path {
|
||||
dir, err := path.Builder{}.Append(dirRoot).
|
||||
ToDataLayerSharePointPath(
|
||||
tenant,
|
||||
user,
|
||||
path.ListsCategory,
|
||||
false)
|
||||
require.NoError(t, err)
|
||||
|
||||
ow := kioser.NewJsonSerializationWriter()
|
||||
listing := mockconnector.GetMockListDefault("Mock List")
|
||||
testName := "MockListing"
|
||||
listing.SetDisplayName(&testName)
|
||||
return dir
|
||||
},
|
||||
getItem: func(t *testing.T, name string) *Item {
|
||||
ow := kioser.NewJsonSerializationWriter()
|
||||
listing := mockconnector.GetMockListDefault(name)
|
||||
listing.SetDisplayName(&name)
|
||||
|
||||
err := ow.WriteObjectValue("", listing)
|
||||
require.NoError(t, err)
|
||||
err := ow.WriteObjectValue("", listing)
|
||||
require.NoError(t, err)
|
||||
|
||||
byteArray, err := ow.GetSerializedContent()
|
||||
require.NoError(t, err)
|
||||
byteArray, err := ow.GetSerializedContent()
|
||||
require.NoError(t, err)
|
||||
|
||||
dir, err := path.Builder{}.Append("directory").
|
||||
ToDataLayerSharePointPath(
|
||||
"some",
|
||||
"user",
|
||||
path.ListsCategory,
|
||||
false)
|
||||
require.NoError(t, err)
|
||||
data := &Item{
|
||||
id: name,
|
||||
data: io.NopCloser(bytes.NewReader(byteArray)),
|
||||
info: sharePointListInfo(listing, int64(len(byteArray))),
|
||||
}
|
||||
|
||||
col := NewCollection(dir, nil, nil)
|
||||
col.data <- &Item{
|
||||
id: testName,
|
||||
data: io.NopCloser(bytes.NewReader(byteArray)),
|
||||
info: sharePointListInfo(listing, int64(len(byteArray))),
|
||||
return data
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Pages",
|
||||
itemName: "MockPages",
|
||||
category: Pages,
|
||||
getDir: func(t *testing.T) path.Path {
|
||||
dir, err := path.Builder{}.Append(dirRoot).
|
||||
ToDataLayerSharePointPath(
|
||||
tenant,
|
||||
user,
|
||||
path.PagesCategory,
|
||||
false)
|
||||
require.NoError(t, err)
|
||||
|
||||
return dir
|
||||
},
|
||||
getItem: func(t *testing.T, itemName string) *Item {
|
||||
byteArray := mockconnector.GetMockPage(itemName)
|
||||
page, err := support.CreatePageFromBytes(byteArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
data := &Item{
|
||||
id: itemName,
|
||||
data: io.NopCloser(bytes.NewReader(byteArray)),
|
||||
info: api.PageInfo(page, int64(len(byteArray))),
|
||||
}
|
||||
|
||||
return data
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
readItems := []data.Stream{}
|
||||
for _, test := range tables {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
col := NewCollection(test.getDir(t), nil, test.category, nil)
|
||||
col.data <- test.getItem(t, test.itemName)
|
||||
|
||||
for item := range col.Items() {
|
||||
readItems = append(readItems, item)
|
||||
readItems := []data.Stream{}
|
||||
|
||||
for item := range col.Items() {
|
||||
readItems = append(readItems, item)
|
||||
}
|
||||
|
||||
require.Equal(t, len(readItems), 1)
|
||||
item := readItems[0]
|
||||
shareInfo, ok := item.(data.StreamInfo)
|
||||
require.True(t, ok)
|
||||
require.NotNil(t, shareInfo.Info())
|
||||
require.NotNil(t, shareInfo.Info().SharePoint)
|
||||
assert.Equal(t, test.itemName, shareInfo.Info().SharePoint.ItemName)
|
||||
})
|
||||
}
|
||||
|
||||
require.Equal(t, len(readItems), 1)
|
||||
item := readItems[0]
|
||||
shareInfo, ok := item.(data.StreamInfo)
|
||||
require.True(t, ok)
|
||||
require.NotNil(t, shareInfo.Info())
|
||||
require.NotNil(t, shareInfo.Info().SharePoint)
|
||||
assert.Equal(t, testName, shareInfo.Info().SharePoint.ItemName)
|
||||
}
|
||||
|
||||
func (suite *SharePointCollectionSuite) TestCollectPages() {
|
||||
@ -122,7 +178,6 @@ func (suite *SharePointCollectionSuite) TestCollectPages() {
|
||||
nil,
|
||||
account.AzureTenantID,
|
||||
suite.siteID,
|
||||
nil,
|
||||
&MockGraphService{},
|
||||
control.Defaults(),
|
||||
)
|
||||
@ -131,7 +186,7 @@ func (suite *SharePointCollectionSuite) TestCollectPages() {
|
||||
}
|
||||
|
||||
// TestRestoreListCollection verifies Graph Restore API for the List Collection
|
||||
func (suite *SharePointCollectionSuite) TestRestoreListCollection() {
|
||||
func (suite *SharePointCollectionSuite) TestListCollection_Restore() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
|
||||
@ -118,7 +118,7 @@ func collectLists(
|
||||
return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID)
|
||||
}
|
||||
|
||||
collection := NewCollection(dir, serv, updater.UpdateStatus)
|
||||
collection := NewCollection(dir, serv, List, updater.UpdateStatus)
|
||||
collection.AddJob(tuple.id)
|
||||
|
||||
spcs = append(spcs, collection)
|
||||
@ -172,7 +172,6 @@ func collectPages(
|
||||
creds account.M365Config,
|
||||
serv graph.Servicer,
|
||||
tenantID, siteID string,
|
||||
scope selectors.SharePointScope,
|
||||
updater statusUpdater,
|
||||
ctrlOpts control.Options,
|
||||
) ([]data.BackupCollection, error) {
|
||||
@ -204,7 +203,7 @@ func collectPages(
|
||||
return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID)
|
||||
}
|
||||
|
||||
collection := NewCollection(dir, serv, updater.UpdateStatus)
|
||||
collection := NewCollection(dir, serv, Pages, updater.UpdateStatus)
|
||||
collection.betaService = betaService
|
||||
collection.AddJob(tuple.ID)
|
||||
|
||||
|
||||
@ -5,10 +5,12 @@ import (
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
@ -128,3 +130,45 @@ func driveItem(name string, path string, isFile bool) models.DriveItemable {
|
||||
|
||||
return item
|
||||
}
|
||||
|
||||
type SharePointPagesSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestSharePointPagesSuite(t *testing.T) {
|
||||
tester.RunOnAny(
|
||||
t,
|
||||
tester.CorsoCITests,
|
||||
tester.CorsoGraphConnectorTests,
|
||||
tester.CorsoGraphConnectorSharePointTests)
|
||||
suite.Run(t, new(SharePointPagesSuite))
|
||||
}
|
||||
|
||||
func (suite *SharePointPagesSuite) TestCollectPages() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
t := suite.T()
|
||||
siteID := tester.M365SiteID(t)
|
||||
a := tester.NewM365Account(t)
|
||||
account, err := a.M365Config()
|
||||
require.NoError(t, err)
|
||||
|
||||
updateFunc := func(*support.ConnectorOperationStatus) {
|
||||
t.Log("Updater Called ")
|
||||
}
|
||||
|
||||
updater := &MockUpdater{UpdateState: updateFunc}
|
||||
|
||||
col, err := collectPages(
|
||||
ctx,
|
||||
account,
|
||||
nil,
|
||||
account.AzureTenantID,
|
||||
siteID,
|
||||
updater,
|
||||
control.Options{},
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, col)
|
||||
}
|
||||
|
||||
@ -17,6 +17,16 @@ import (
|
||||
// ---------------------------------------------------------------------------
|
||||
type MockGraphService struct{}
|
||||
|
||||
type MockUpdater struct {
|
||||
UpdateState func(*support.ConnectorOperationStatus)
|
||||
}
|
||||
|
||||
func (mu *MockUpdater) UpdateStatus(input *support.ConnectorOperationStatus) {
|
||||
if mu.UpdateState != nil {
|
||||
mu.UpdateState(input)
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------
|
||||
// Interface Functions: @See graph.Service
|
||||
//------------------------------------------------------------
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user