GC: Backup: SharePoint: Pages Connect Pipeline (#2220)

## Description
Finalize the backup workflow for `SharePoint.Pages.`
Populate functions parallelizes
Fix for Incorrect Status during backup
<!-- Insert PR description-->

## Does this PR need a docs update or release note?
- [x]  No 

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* closes  #2071<issue>
* closes #2257
* related to #2173

## Test Plan
- [x]  Unit test
This commit is contained in:
Danny 2023-02-09 09:39:00 -05:00 committed by GitHub
parent 6c2c873cc5
commit 2643fc2c89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 150 additions and 77 deletions

View File

@ -92,7 +92,7 @@ func (gc *GraphConnector) DataCollections(
ctx, ctx,
gc.itemClient, gc.itemClient,
sels, sels,
gc.credentials.AzureTenantID, gc.credentials,
gc.Service, gc.Service,
gc, gc,
ctrlOpts) ctrlOpts)

View File

@ -249,7 +249,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
ctx, ctx,
graph.HTTPClient(graph.NoTimeout()), graph.HTTPClient(graph.NoTimeout()),
test.getSelector(), test.getSelector(),
connector.credentials.AzureTenantID, connector.credentials,
connector.Service, connector.Service,
connector, connector,
control.Options{}) control.Options{})

View File

@ -4,3 +4,5 @@ type Tuple struct {
Name string Name string
ID string ID string
} }
const fetchChannelSize = 5

View File

@ -4,11 +4,13 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
discover "github.com/alcionai/corso/src/internal/connector/discovery/api" discover "github.com/alcionai/corso/src/internal/connector/discovery/api"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/graph/betasdk/models" "github.com/alcionai/corso/src/internal/connector/graph/betasdk/models"
"github.com/alcionai/corso/src/internal/connector/graph/betasdk/sites" "github.com/alcionai/corso/src/internal/connector/graph/betasdk/sites"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
@ -25,16 +27,55 @@ func GetSitePages(
siteID string, siteID string,
pages []string, pages []string,
) ([]models.SitePageable, error) { ) ([]models.SitePageable, error) {
col := make([]models.SitePageable, 0) var (
opts := retrieveSitePageOptions() col = make([]models.SitePageable, 0)
semaphoreCh = make(chan struct{}, fetchChannelSize)
opts = retrieveSitePageOptions()
err, errs error
wg sync.WaitGroup
m sync.Mutex
)
defer close(semaphoreCh)
errUpdater := func(id string, err error) {
m.Lock()
errs = support.WrapAndAppend(id, err, errs)
m.Unlock()
}
updatePages := func(page models.SitePageable) {
m.Lock()
col = append(col, page)
m.Unlock()
}
for _, entry := range pages { for _, entry := range pages {
page, err := serv.Client().SitesById(siteID).PagesById(entry).Get(ctx, opts) semaphoreCh <- struct{}{}
if err != nil {
return nil, support.ConnectorStackErrorTraceWrap(err, "fetching page: "+entry)
}
col = append(col, page) wg.Add(1)
go func(pageID string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
var page models.SitePageable
err = graph.RunWithRetry(func() error {
page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts)
return err
})
if err != nil {
errUpdater(pageID, errors.Wrap(err, support.ConnectorStackErrorTrace(err)+" fetching page"))
} else {
updatePages(page)
}
}(entry)
}
wg.Wait()
if errs != nil {
return nil, errs
} }
return col, nil return col, nil
@ -46,10 +87,15 @@ func FetchPages(ctx context.Context, bs *discover.BetaService, siteID string) ([
builder = bs.Client().SitesById(siteID).Pages() builder = bs.Client().SitesById(siteID).Pages()
opts = fetchPageOptions() opts = fetchPageOptions()
pageTuples = make([]Tuple, 0) pageTuples = make([]Tuple, 0)
resp models.SitePageCollectionResponseable
err error
) )
for { for {
resp, err := builder.Get(ctx, opts) err = graph.RunWithRetry(func() error {
resp, err = builder.Get(ctx, opts)
return err
})
if err != nil { if err != nil {
return nil, support.ConnectorStackErrorTraceWrap(err, "failed fetching site page") return nil, support.ConnectorStackErrorTraceWrap(err, "failed fetching site page")
} }

View File

@ -28,6 +28,7 @@ type DataCategory int
//go:generate stringer -type=DataCategory //go:generate stringer -type=DataCategory
const ( const (
collectionChannelBufferSize = 50 collectionChannelBufferSize = 50
fetchChannelSize = 5
Unknown DataCategory = iota Unknown DataCategory = iota
List List
Drive Drive
@ -70,6 +71,7 @@ func NewCollection(
service graph.Servicer, service graph.Servicer,
category DataCategory, category DataCategory,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
ctrlOpts control.Options,
) *Collection { ) *Collection {
c := &Collection{ c := &Collection{
fullPath: folderPath, fullPath: folderPath,
@ -78,6 +80,7 @@ func NewCollection(
service: service, service: service,
statusUpdater: statusUpdater, statusUpdater: statusUpdater,
category: category, category: category,
ctrl: ctrlOpts,
} }
return c return c
@ -157,7 +160,7 @@ func (sc *Collection) finishPopulation(ctx context.Context, attempts, success in
status := support.CreateStatus( status := support.CreateStatus(
ctx, ctx,
support.Backup, support.Backup,
len(sc.jobs), 1, // 1 folder
support.CollectionMetrics{ support.CollectionMetrics{
Objects: attempted, Objects: attempted,
Successes: success, Successes: success,
@ -180,6 +183,9 @@ func (sc *Collection) populate(ctx context.Context) {
writer = kw.NewJsonSerializationWriter() writer = kw.NewJsonSerializationWriter()
) )
defer func() {
sc.finishPopulation(ctx, metrics.attempts, metrics.success, int64(metrics.totalBytes), errs)
}()
// TODO: Insert correct ID for CollectionProgress // TODO: Insert correct ID for CollectionProgress
colProgress, closer := observe.CollectionProgress( colProgress, closer := observe.CollectionProgress(
ctx, ctx,
@ -190,7 +196,6 @@ func (sc *Collection) populate(ctx context.Context) {
defer func() { defer func() {
close(colProgress) close(colProgress)
sc.finishPopulation(ctx, metrics.attempts, metrics.success, metrics.totalBytes, errs)
}() }()
// Switch retrieval function based on category // Switch retrieval function based on category
@ -314,7 +319,7 @@ func (sc *Collection) retrievePages(
} }
} }
return numMetrics{}, nil return metrics, nil
} }
func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) { func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) {

View File

@ -147,7 +147,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
for _, test := range tables { for _, test := range tables {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
col := NewCollection(test.getDir(t), nil, test.category, nil) col := NewCollection(test.getDir(t), nil, test.category, nil, control.Defaults())
col.data <- test.getItem(t, test.itemName) col.data <- test.getItem(t, test.itemName)
readItems := []data.Stream{} readItems := []data.Stream{}
@ -167,24 +167,6 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
} }
} }
func (suite *SharePointCollectionSuite) TestCollectPages() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
col, err := collectPages(
ctx,
suite.creds,
nil,
account.AzureTenantID,
suite.siteID,
&MockGraphService{},
control.Defaults(),
)
assert.NoError(t, err)
assert.NotEmpty(t, col)
}
// TestRestoreListCollection verifies Graph Restore API for the List Collection // TestRestoreListCollection verifies Graph Restore API for the List Collection
func (suite *SharePointCollectionSuite) TestListCollection_Restore() { func (suite *SharePointCollectionSuite) TestListCollection_Restore() {
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()

View File

@ -30,7 +30,7 @@ func DataCollections(
ctx context.Context, ctx context.Context,
itemClient *http.Client, itemClient *http.Client,
selector selectors.Selector, selector selectors.Selector,
tenantID string, creds account.M365Config,
serv graph.Servicer, serv graph.Servicer,
su statusUpdater, su statusUpdater,
ctrlOpts control.Options, ctrlOpts control.Options,
@ -61,7 +61,7 @@ func DataCollections(
spcs, err = collectLists( spcs, err = collectLists(
ctx, ctx,
serv, serv,
tenantID, creds.AzureTenantID,
site, site,
su, su,
ctrlOpts) ctrlOpts)
@ -74,7 +74,7 @@ func DataCollections(
ctx, ctx,
itemClient, itemClient,
serv, serv,
tenantID, creds.AzureTenantID,
site, site,
scope, scope,
su, su,
@ -82,6 +82,17 @@ func DataCollections(
if err != nil { if err != nil {
return nil, nil, support.WrapAndAppend(site, err, errs) return nil, nil, support.WrapAndAppend(site, err, errs)
} }
case path.PagesCategory:
spcs, err = collectPages(
ctx,
creds,
serv,
site,
su,
ctrlOpts)
if err != nil {
return nil, nil, support.WrapAndAppend(site, err, errs)
}
} }
collections = append(collections, spcs...) collections = append(collections, spcs...)
@ -118,7 +129,7 @@ func collectLists(
return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID) return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID)
} }
collection := NewCollection(dir, serv, List, updater.UpdateStatus) collection := NewCollection(dir, serv, List, updater.UpdateStatus, ctrlOpts)
collection.AddJob(tuple.id) collection.AddJob(tuple.id)
spcs = append(spcs, collection) spcs = append(spcs, collection)
@ -166,12 +177,12 @@ func collectLibraries(
} }
// collectPages constructs a sharepoint Collections struct and Get()s the associated // collectPages constructs a sharepoint Collections struct and Get()s the associated
// M365 IDs for the associated Pages // M365 IDs for the associated Pages.
func collectPages( func collectPages(
ctx context.Context, ctx context.Context,
creds account.M365Config, creds account.M365Config,
serv graph.Servicer, serv graph.Servicer,
tenantID, siteID string, siteID string,
updater statusUpdater, updater statusUpdater,
ctrlOpts control.Options, ctrlOpts control.Options,
) ([]data.BackupCollection, error) { ) ([]data.BackupCollection, error) {
@ -180,9 +191,10 @@ func collectPages(
spcs := make([]data.BackupCollection, 0) spcs := make([]data.BackupCollection, 0)
// make the betaClient // make the betaClient
// Need to receive From DataCollection Call
adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret) adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "adapter for betaservice not created") return nil, errors.New("unable to create adapter w/ env credentials")
} }
betaService := api.NewBetaService(adpt) betaService := api.NewBetaService(adpt)
@ -195,7 +207,7 @@ func collectPages(
for _, tuple := range tuples { for _, tuple := range tuples {
dir, err := path.Builder{}.Append(tuple.Name). dir, err := path.Builder{}.Append(tuple.Name).
ToDataLayerSharePointPath( ToDataLayerSharePointPath(
tenantID, creds.AzureTenantID,
siteID, siteID,
path.PagesCategory, path.PagesCategory,
false) false)
@ -203,7 +215,7 @@ func collectPages(
return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID) return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID)
} }
collection := NewCollection(dir, serv, Pages, updater.UpdateStatus) collection := NewCollection(dir, serv, Pages, updater.UpdateStatus, ctrlOpts)
collection.betaService = betaService collection.betaService = betaService
collection.AddJob(tuple.ID) collection.AddJob(tuple.ID)

View File

@ -10,7 +10,6 @@ import (
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/onedrive"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
@ -154,20 +153,13 @@ func (suite *SharePointPagesSuite) TestCollectPages() {
account, err := a.M365Config() account, err := a.M365Config()
require.NoError(t, err) require.NoError(t, err)
updateFunc := func(*support.ConnectorOperationStatus) {
t.Log("Updater Called ")
}
updater := &MockUpdater{UpdateState: updateFunc}
col, err := collectPages( col, err := collectPages(
ctx, ctx,
account, account,
nil, nil,
account.AzureTenantID,
siteID, siteID,
updater, &MockGraphService{},
control.Options{}, control.Defaults(),
) )
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEmpty(t, col) assert.NotEmpty(t, col)

View File

@ -8,19 +8,20 @@ func _() {
// An "invalid array index" compiler error signifies that the constant values have changed. // An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again. // Re-run the stringer command to generate them again.
var x [1]struct{} var x [1]struct{}
_ = x[Unknown-1] _ = x[Unknown-2]
_ = x[List-2] _ = x[List-3]
_ = x[Drive-3] _ = x[Drive-4]
_ = x[Pages-5]
} }
const _DataCategory_name = "UnknownListDrive" const _DataCategory_name = "UnknownListDrivePages"
var _DataCategory_index = [...]uint8{0, 7, 11, 16} var _DataCategory_index = [...]uint8{0, 7, 11, 16, 21}
func (i DataCategory) String() string { func (i DataCategory) String() string {
i -= 1 i -= 2
if i < 0 || i >= DataCategory(len(_DataCategory_index)-1) { if i < 0 || i >= DataCategory(len(_DataCategory_index)-1) {
return "DataCategory(" + strconv.FormatInt(int64(i+1), 10) + ")" return "DataCategory(" + strconv.FormatInt(int64(i+2), 10) + ")"
} }
return _DataCategory_name[_DataCategory_index[i]:_DataCategory_index[i+1]] return _DataCategory_name[_DataCategory_index[i]:_DataCategory_index[i+1]]
} }

View File

@ -3,6 +3,7 @@ package sharepoint
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
mssite "github.com/microsoftgraph/msgraph-sdk-go/sites" mssite "github.com/microsoftgraph/msgraph-sdk-go/sites"
@ -91,33 +92,65 @@ func loadSiteLists(
listIDs []string, listIDs []string,
) ([]models.Listable, error) { ) ([]models.Listable, error) {
var ( var (
results = make([]models.Listable, 0) results = make([]models.Listable, 0)
errs error semaphoreCh = make(chan struct{}, fetchChannelSize)
errs error
wg sync.WaitGroup
m sync.Mutex
) )
for _, listID := range listIDs { defer close(semaphoreCh)
entry, err := gs.Client().SitesById(siteID).ListsById(listID).Get(ctx, nil)
if err != nil { errUpdater := func(id string, err error) {
errs = support.WrapAndAppend( m.Lock()
listID, errs = support.WrapAndAppend(id, err, errs)
errors.Wrap(err, support.ConnectorStackErrorTrace(err)), m.Unlock()
errs, }
)
} updateLists := func(list models.Listable) {
m.Lock()
results = append(results, list)
m.Unlock()
}
for _, listID := range listIDs {
semaphoreCh <- struct{}{}
wg.Add(1)
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
var (
entry models.Listable
err error
)
err = graph.RunWithRetry(func() error {
entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil)
return err
})
if err != nil {
errUpdater(id, support.ConnectorStackErrorTraceWrap(err, ""))
return
}
cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, id)
if err != nil {
errUpdater(id, errors.Wrap(err, "unable to fetchRelationships during loadSiteLists"))
return
}
cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, listID)
if err == nil {
entry.SetColumns(cols) entry.SetColumns(cols)
entry.SetContentTypes(cTypes) entry.SetContentTypes(cTypes)
entry.SetItems(lItems) entry.SetItems(lItems)
} else { updateLists(entry)
errs = support.WrapAndAppend("unable to fetchRelationships during loadSiteLists", err, errs) }(listID)
continue
}
results = append(results, entry)
} }
wg.Wait()
if errs != nil { if errs != nil {
return nil, errs return nil, errs
} }