diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 4cd167ea6..4c1bd4461 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -92,7 +92,7 @@ func (gc *GraphConnector) DataCollections( ctx, gc.itemClient, sels, - gc.credentials.AzureTenantID, + gc.credentials, gc.Service, gc, ctrlOpts) diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index c90bee511..4484a92aa 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -249,7 +249,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti ctx, graph.HTTPClient(graph.NoTimeout()), test.getSelector(), - connector.credentials.AzureTenantID, + connector.credentials, connector.Service, connector, control.Options{}) diff --git a/src/internal/connector/sharepoint/api/api.go b/src/internal/connector/sharepoint/api/api.go index c05eaad6b..6c9658418 100644 --- a/src/internal/connector/sharepoint/api/api.go +++ b/src/internal/connector/sharepoint/api/api.go @@ -4,3 +4,5 @@ type Tuple struct { Name string ID string } + +const fetchChannelSize = 5 diff --git a/src/internal/connector/sharepoint/api/pages.go b/src/internal/connector/sharepoint/api/pages.go index a62fbc40a..45b32ddd5 100644 --- a/src/internal/connector/sharepoint/api/pages.go +++ b/src/internal/connector/sharepoint/api/pages.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "io" + "sync" "time" "github.com/pkg/errors" discover "github.com/alcionai/corso/src/internal/connector/discovery/api" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph/betasdk/models" "github.com/alcionai/corso/src/internal/connector/graph/betasdk/sites" "github.com/alcionai/corso/src/internal/connector/support" @@ -25,16 +27,55 @@ func GetSitePages( siteID string, pages []string, ) ([]models.SitePageable, error) { - col := make([]models.SitePageable, 0) - opts := retrieveSitePageOptions() + var ( + col = make([]models.SitePageable, 0) + semaphoreCh = make(chan struct{}, fetchChannelSize) + opts = retrieveSitePageOptions() + err, errs error + wg sync.WaitGroup + m sync.Mutex + ) + + defer close(semaphoreCh) + + errUpdater := func(id string, err error) { + m.Lock() + errs = support.WrapAndAppend(id, err, errs) + m.Unlock() + } + updatePages := func(page models.SitePageable) { + m.Lock() + col = append(col, page) + m.Unlock() + } for _, entry := range pages { - page, err := serv.Client().SitesById(siteID).PagesById(entry).Get(ctx, opts) - if err != nil { - return nil, support.ConnectorStackErrorTraceWrap(err, "fetching page: "+entry) - } + semaphoreCh <- struct{}{} - col = append(col, page) + wg.Add(1) + + go func(pageID string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + var page models.SitePageable + + err = graph.RunWithRetry(func() error { + page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts) + return err + }) + if err != nil { + errUpdater(pageID, errors.Wrap(err, support.ConnectorStackErrorTrace(err)+" fetching page")) + } else { + updatePages(page) + } + }(entry) + } + + wg.Wait() + + if errs != nil { + return nil, errs } return col, nil @@ -46,10 +87,15 @@ func FetchPages(ctx context.Context, bs *discover.BetaService, siteID string) ([ builder = bs.Client().SitesById(siteID).Pages() opts = fetchPageOptions() pageTuples = make([]Tuple, 0) + resp models.SitePageCollectionResponseable + err error ) for { - resp, err := builder.Get(ctx, opts) + err = graph.RunWithRetry(func() error { + resp, err = builder.Get(ctx, opts) + return err + }) if err != nil { return nil, support.ConnectorStackErrorTraceWrap(err, "failed fetching site page") } diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index ca07399eb..91ebf5d65 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -28,6 +28,7 @@ type DataCategory int //go:generate stringer -type=DataCategory const ( collectionChannelBufferSize = 50 + fetchChannelSize = 5 Unknown DataCategory = iota List Drive @@ -70,6 +71,7 @@ func NewCollection( service graph.Servicer, category DataCategory, statusUpdater support.StatusUpdater, + ctrlOpts control.Options, ) *Collection { c := &Collection{ fullPath: folderPath, @@ -78,6 +80,7 @@ func NewCollection( service: service, statusUpdater: statusUpdater, category: category, + ctrl: ctrlOpts, } return c @@ -157,7 +160,7 @@ func (sc *Collection) finishPopulation(ctx context.Context, attempts, success in status := support.CreateStatus( ctx, support.Backup, - len(sc.jobs), + 1, // 1 folder support.CollectionMetrics{ Objects: attempted, Successes: success, @@ -180,6 +183,9 @@ func (sc *Collection) populate(ctx context.Context) { writer = kw.NewJsonSerializationWriter() ) + defer func() { + sc.finishPopulation(ctx, metrics.attempts, metrics.success, int64(metrics.totalBytes), errs) + }() // TODO: Insert correct ID for CollectionProgress colProgress, closer := observe.CollectionProgress( ctx, @@ -190,7 +196,6 @@ func (sc *Collection) populate(ctx context.Context) { defer func() { close(colProgress) - sc.finishPopulation(ctx, metrics.attempts, metrics.success, metrics.totalBytes, errs) }() // Switch retrieval function based on category @@ -314,7 +319,7 @@ func (sc *Collection) retrievePages( } } - return numMetrics{}, nil + return metrics, nil } func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) { diff --git a/src/internal/connector/sharepoint/collection_test.go b/src/internal/connector/sharepoint/collection_test.go index 494287457..2f2a2c472 100644 --- a/src/internal/connector/sharepoint/collection_test.go +++ b/src/internal/connector/sharepoint/collection_test.go @@ -147,7 +147,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { for _, test := range tables { t.Run(test.name, func(t *testing.T) { - col := NewCollection(test.getDir(t), nil, test.category, nil) + col := NewCollection(test.getDir(t), nil, test.category, nil, control.Defaults()) col.data <- test.getItem(t, test.itemName) readItems := []data.Stream{} @@ -167,24 +167,6 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { } } -func (suite *SharePointCollectionSuite) TestCollectPages() { - ctx, flush := tester.NewContext() - defer flush() - - t := suite.T() - col, err := collectPages( - ctx, - suite.creds, - nil, - account.AzureTenantID, - suite.siteID, - &MockGraphService{}, - control.Defaults(), - ) - assert.NoError(t, err) - assert.NotEmpty(t, col) -} - // TestRestoreListCollection verifies Graph Restore API for the List Collection func (suite *SharePointCollectionSuite) TestListCollection_Restore() { ctx, flush := tester.NewContext() diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index ce17c9c8d..f98344dab 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -30,7 +30,7 @@ func DataCollections( ctx context.Context, itemClient *http.Client, selector selectors.Selector, - tenantID string, + creds account.M365Config, serv graph.Servicer, su statusUpdater, ctrlOpts control.Options, @@ -61,7 +61,7 @@ func DataCollections( spcs, err = collectLists( ctx, serv, - tenantID, + creds.AzureTenantID, site, su, ctrlOpts) @@ -74,7 +74,7 @@ func DataCollections( ctx, itemClient, serv, - tenantID, + creds.AzureTenantID, site, scope, su, @@ -82,6 +82,17 @@ func DataCollections( if err != nil { return nil, nil, support.WrapAndAppend(site, err, errs) } + case path.PagesCategory: + spcs, err = collectPages( + ctx, + creds, + serv, + site, + su, + ctrlOpts) + if err != nil { + return nil, nil, support.WrapAndAppend(site, err, errs) + } } collections = append(collections, spcs...) @@ -118,7 +129,7 @@ func collectLists( return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID) } - collection := NewCollection(dir, serv, List, updater.UpdateStatus) + collection := NewCollection(dir, serv, List, updater.UpdateStatus, ctrlOpts) collection.AddJob(tuple.id) spcs = append(spcs, collection) @@ -166,12 +177,12 @@ func collectLibraries( } // collectPages constructs a sharepoint Collections struct and Get()s the associated -// M365 IDs for the associated Pages +// M365 IDs for the associated Pages. func collectPages( ctx context.Context, creds account.M365Config, serv graph.Servicer, - tenantID, siteID string, + siteID string, updater statusUpdater, ctrlOpts control.Options, ) ([]data.BackupCollection, error) { @@ -180,9 +191,10 @@ func collectPages( spcs := make([]data.BackupCollection, 0) // make the betaClient + // Need to receive From DataCollection Call adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret) if err != nil { - return nil, errors.Wrap(err, "adapter for betaservice not created") + return nil, errors.New("unable to create adapter w/ env credentials") } betaService := api.NewBetaService(adpt) @@ -195,7 +207,7 @@ func collectPages( for _, tuple := range tuples { dir, err := path.Builder{}.Append(tuple.Name). ToDataLayerSharePointPath( - tenantID, + creds.AzureTenantID, siteID, path.PagesCategory, false) @@ -203,7 +215,7 @@ func collectPages( return nil, errors.Wrapf(err, "failed to create collection path for site: %s", siteID) } - collection := NewCollection(dir, serv, Pages, updater.UpdateStatus) + collection := NewCollection(dir, serv, Pages, updater.UpdateStatus, ctrlOpts) collection.betaService = betaService collection.AddJob(tuple.ID) diff --git a/src/internal/connector/sharepoint/data_collections_test.go b/src/internal/connector/sharepoint/data_collections_test.go index 775cda23f..623b5c2e7 100644 --- a/src/internal/connector/sharepoint/data_collections_test.go +++ b/src/internal/connector/sharepoint/data_collections_test.go @@ -10,7 +10,6 @@ import ( "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/onedrive" - "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/selectors" @@ -154,20 +153,13 @@ func (suite *SharePointPagesSuite) TestCollectPages() { account, err := a.M365Config() require.NoError(t, err) - updateFunc := func(*support.ConnectorOperationStatus) { - t.Log("Updater Called ") - } - - updater := &MockUpdater{UpdateState: updateFunc} - col, err := collectPages( ctx, account, nil, - account.AzureTenantID, siteID, - updater, - control.Options{}, + &MockGraphService{}, + control.Defaults(), ) assert.NoError(t, err) assert.NotEmpty(t, col) diff --git a/src/internal/connector/sharepoint/datacategory_string.go b/src/internal/connector/sharepoint/datacategory_string.go index c75c0ad92..b3281ff7f 100644 --- a/src/internal/connector/sharepoint/datacategory_string.go +++ b/src/internal/connector/sharepoint/datacategory_string.go @@ -8,19 +8,20 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[Unknown-1] - _ = x[List-2] - _ = x[Drive-3] + _ = x[Unknown-2] + _ = x[List-3] + _ = x[Drive-4] + _ = x[Pages-5] } -const _DataCategory_name = "UnknownListDrive" +const _DataCategory_name = "UnknownListDrivePages" -var _DataCategory_index = [...]uint8{0, 7, 11, 16} +var _DataCategory_index = [...]uint8{0, 7, 11, 16, 21} func (i DataCategory) String() string { - i -= 1 + i -= 2 if i < 0 || i >= DataCategory(len(_DataCategory_index)-1) { - return "DataCategory(" + strconv.FormatInt(int64(i+1), 10) + ")" + return "DataCategory(" + strconv.FormatInt(int64(i+2), 10) + ")" } return _DataCategory_name[_DataCategory_index[i]:_DataCategory_index[i+1]] } diff --git a/src/internal/connector/sharepoint/list.go b/src/internal/connector/sharepoint/list.go index 101de9722..64183c3a9 100644 --- a/src/internal/connector/sharepoint/list.go +++ b/src/internal/connector/sharepoint/list.go @@ -3,6 +3,7 @@ package sharepoint import ( "context" "fmt" + "sync" "github.com/microsoftgraph/msgraph-sdk-go/models" mssite "github.com/microsoftgraph/msgraph-sdk-go/sites" @@ -91,33 +92,65 @@ func loadSiteLists( listIDs []string, ) ([]models.Listable, error) { var ( - results = make([]models.Listable, 0) - errs error + results = make([]models.Listable, 0) + semaphoreCh = make(chan struct{}, fetchChannelSize) + errs error + wg sync.WaitGroup + m sync.Mutex ) - for _, listID := range listIDs { - entry, err := gs.Client().SitesById(siteID).ListsById(listID).Get(ctx, nil) - if err != nil { - errs = support.WrapAndAppend( - listID, - errors.Wrap(err, support.ConnectorStackErrorTrace(err)), - errs, - ) - } + defer close(semaphoreCh) + + errUpdater := func(id string, err error) { + m.Lock() + errs = support.WrapAndAppend(id, err, errs) + m.Unlock() + } + + updateLists := func(list models.Listable) { + m.Lock() + results = append(results, list) + m.Unlock() + } + + for _, listID := range listIDs { + semaphoreCh <- struct{}{} + + wg.Add(1) + + go func(id string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + var ( + entry models.Listable + err error + ) + + err = graph.RunWithRetry(func() error { + entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil) + return err + }) + if err != nil { + errUpdater(id, support.ConnectorStackErrorTraceWrap(err, "")) + return + } + + cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, id) + if err != nil { + errUpdater(id, errors.Wrap(err, "unable to fetchRelationships during loadSiteLists")) + return + } - cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, listID) - if err == nil { entry.SetColumns(cols) entry.SetContentTypes(cTypes) entry.SetItems(lItems) - } else { - errs = support.WrapAndAppend("unable to fetchRelationships during loadSiteLists", err, errs) - continue - } - - results = append(results, entry) + updateLists(entry) + }(listID) } + wg.Wait() + if errs != nil { return nil, errs }