diff --git a/src/cli/backup/sharepoint.go b/src/cli/backup/sharepoint.go index 4eb62dca0..fb4f7e766 100644 --- a/src/cli/backup/sharepoint.go +++ b/src/cli/backup/sharepoint.go @@ -13,6 +13,7 @@ import ( . "github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/internal/connector" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/pkg/backup" @@ -209,7 +210,7 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error { defer utils.CloseRepo(ctx, r) - gc, err := connector.NewGraphConnector(ctx, acct, connector.Sites) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Sites) if err != nil { return Only(ctx, errors.Wrap(err, "Failed to connect to Microsoft APIs")) } diff --git a/src/cmd/factory/impl/common.go b/src/cmd/factory/impl/common.go index 369d80d20..585118442 100644 --- a/src/cmd/factory/impl/common.go +++ b/src/cmd/factory/impl/common.go @@ -12,6 +12,7 @@ import ( . "github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/connector" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/account" @@ -111,7 +112,7 @@ func getGCAndVerifyUser(ctx context.Context, userID string) (*connector.GraphCon } // build a graph connector - gc, err := connector.NewGraphConnector(ctx, acct, connector.Users) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) if err != nil { return nil, account.Account{}, errors.Wrap(err, "connecting to graph api") } diff --git a/src/cmd/getM365/getItem.go b/src/cmd/getM365/getItem.go index 25961a236..24ce81d9a 100644 --- a/src/cmd/getM365/getItem.go +++ b/src/cmd/getM365/getItem.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/exchange/api" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/credentials" @@ -177,7 +178,7 @@ func getGC(ctx context.Context) (*connector.GraphConnector, account.M365Config, return nil, m365Cfg, Only(ctx, errors.Wrap(err, "finding m365 account details")) } - gc, err := connector.NewGraphConnector(ctx, acct, connector.Users) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) if err != nil { return nil, m365Cfg, Only(ctx, errors.Wrap(err, "connecting to graph API")) } diff --git a/src/cmd/purge/purge.go b/src/cmd/purge/purge.go index a59f7c2b8..32100772d 100644 --- a/src/cmd/purge/purge.go +++ b/src/cmd/purge/purge.go @@ -255,7 +255,7 @@ func getGC(ctx context.Context) (*connector.GraphConnector, error) { } // build a graph connector - gc, err := connector.NewGraphConnector(ctx, acct, connector.Users) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) if err != nil { return nil, Only(ctx, errors.Wrap(err, "connecting to graph api")) } diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 7eaf1c517..0b6d20b27 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -87,6 +87,7 @@ func (gc *GraphConnector) DataCollections( case selectors.ServiceSharePoint: colls, err := sharepoint.DataCollections( ctx, + gc.itemClient, sels, gc.credentials.AzureTenantID, gc.Service, @@ -198,6 +199,7 @@ func (gc *GraphConnector) OneDriveDataCollections( logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections") odcs, err := onedrive.NewCollections( + gc.itemClient, gc.credentials.AzureTenantID, user, onedrive.OneDriveSource, diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index e16aa4b51..57332ce1a 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/connector/exchange" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/sharepoint" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" @@ -43,7 +44,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) SetupSuite() { tester.MustGetEnvVars(suite.T(), tester.M365AcctCredEnvs...) - suite.connector = loadConnector(ctx, suite.T(), AllResources) + suite.connector = loadConnector(ctx, suite.T(), graph.LargeItemClient(), AllResources) suite.user = tester.M365UserID(suite.T()) suite.site = tester.M365SiteID(suite.T()) @@ -62,7 +63,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection selUsers := []string{suite.user} - connector := loadConnector(ctx, suite.T(), Users) + connector := loadConnector(ctx, suite.T(), graph.LargeItemClient(), Users) tests := []struct { name string getSelector func(t *testing.T) selectors.Selector @@ -138,7 +139,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestDataCollections_invali owners := []string{"snuffleupagus"} - connector := loadConnector(ctx, suite.T(), Users) + connector := loadConnector(ctx, suite.T(), graph.LargeItemClient(), Users) tests := []struct { name string getSelector func(t *testing.T) selectors.Selector @@ -214,7 +215,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti selSites := []string{suite.site} - connector := loadConnector(ctx, suite.T(), Sites) + connector := loadConnector(ctx, suite.T(), graph.LargeItemClient(), Sites) tests := []struct { name string expected int @@ -243,6 +244,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti suite.T().Run(test.name, func(t *testing.T) { collections, err := sharepoint.DataCollections( ctx, + graph.LargeItemClient(), test.getSelector(), connector.credentials.AzureTenantID, connector.Service, @@ -298,7 +300,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) SetupSuite() { tester.MustGetEnvSets(suite.T(), tester.M365AcctCredEnvs) - suite.connector = loadConnector(ctx, suite.T(), Sites) + suite.connector = loadConnector(ctx, suite.T(), graph.LargeItemClient(), Sites) suite.user = tester.M365UserID(suite.T()) tester.LogTimeOfTest(suite.T()) @@ -311,7 +313,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar var ( t = suite.T() siteID = tester.M365SiteID(t) - gc = loadConnector(ctx, t, Sites) + gc = loadConnector(ctx, t, graph.LargeItemClient(), Sites) siteIDs = []string{siteID} ) @@ -335,7 +337,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar var ( t = suite.T() siteID = tester.M365SiteID(t) - gc = loadConnector(ctx, t, Sites) + gc = loadConnector(ctx, t, graph.LargeItemClient(), Sites) siteIDs = []string{siteID} ) diff --git a/src/internal/connector/graph/service_helper.go b/src/internal/connector/graph/service_helper.go index b374933f8..900919406 100644 --- a/src/internal/connector/graph/service_helper.go +++ b/src/internal/connector/graph/service_helper.go @@ -55,6 +55,23 @@ func CreateHTTPClient() *http.Client { return httpClient } +// LargeItemClient generates a client that's configured to handle +// large file downloads. This client isn't suitable for other queries +// due to loose restrictions on timeouts and such. +// +// Re-use of http clients is critical, or else we leak os resources +// and consume relatively unbound socket connections. It is important +// to centralize this client to be passed downstream where api calls +// can utilize it on a per-download basis. +// +// TODO: this should get owned by an API client layer, not the GC itself. +func LargeItemClient() *http.Client { + httpClient := CreateHTTPClient() + httpClient.Timeout = 0 // infinite timeout for pulling large files + + return httpClient +} + // --------------------------------------------------------------------------- // Logging Middleware // --------------------------------------------------------------------------- @@ -85,6 +102,10 @@ func (handler *LoggingMiddleware) Intercept( logger.Ctx(ctx).Infow("graph api throttling", "method", req.Method, "url", req.URL) } + if resp.StatusCode != http.StatusTooManyRequests && (resp.StatusCode/100) != 2 { + logger.Ctx(ctx).Infow("graph api error", "method", req.Method, "url", req.URL) + } + if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" { respDump, _ := httputil.DumpResponse(resp, true) diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 68f03d048..3dbc0e60c 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -4,6 +4,7 @@ package connector import ( "context" + "net/http" "runtime/trace" "strings" "sync" @@ -38,8 +39,9 @@ import ( // GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for // bookkeeping and interfacing with other component. type GraphConnector struct { - Service graph.Servicer - Owners api.Client + Service graph.Servicer + Owners api.Client + itemClient *http.Client // configured to handle large item downloads tenant string Users map[string]string // key value @@ -64,13 +66,19 @@ const ( Sites ) -func NewGraphConnector(ctx context.Context, acct account.Account, r resource) (*GraphConnector, error) { +func NewGraphConnector( + ctx context.Context, + itemClient *http.Client, + acct account.Account, + r resource, +) (*GraphConnector, error) { m365, err := acct.M365Config() if err != nil { return nil, errors.Wrap(err, "retrieving m365 account configuration") } gc := GraphConnector{ + itemClient: itemClient, tenant: m365.AzureTenantID, Users: make(map[string]string, 0), wg: &sync.WaitGroup{}, diff --git a/src/internal/connector/graph_connector_disconnected_test.go b/src/internal/connector/graph_connector_disconnected_test.go index f7f583ebd..711e55ff8 100644 --- a/src/internal/connector/graph_connector_disconnected_test.go +++ b/src/internal/connector/graph_connector_disconnected_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" @@ -65,7 +66,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestBadConnection() { for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - gc, err := NewGraphConnector(ctx, test.acct(t), Users) + gc, err := NewGraphConnector(ctx, graph.LargeItemClient(), test.acct(t), Users) assert.Nil(t, gc, test.name+" failed") assert.NotNil(t, err, test.name+"failed") }) diff --git a/src/internal/connector/graph_connector_helper_test.go b/src/internal/connector/graph_connector_helper_test.go index c614df05d..ce16a5a4d 100644 --- a/src/internal/connector/graph_connector_helper_test.go +++ b/src/internal/connector/graph_connector_helper_test.go @@ -3,6 +3,7 @@ package connector import ( "context" "io" + "net/http" "reflect" "testing" @@ -976,9 +977,9 @@ func getSelectorWith( } } -func loadConnector(ctx context.Context, t *testing.T, r resource) *GraphConnector { +func loadConnector(ctx context.Context, t *testing.T, itemClient *http.Client, r resource) *GraphConnector { a := tester.NewM365Account(t) - connector, err := NewGraphConnector(ctx, a, r) + connector, err := NewGraphConnector(ctx, itemClient, a, r) require.NoError(t, err) return connector diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 71eff095a..85ad3f45f 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -156,7 +156,7 @@ func (suite *GraphConnectorIntegrationSuite) SetupSuite() { tester.MustGetEnvSets(suite.T(), tester.M365AcctCredEnvs) - suite.connector = loadConnector(ctx, suite.T(), Users) + suite.connector = loadConnector(ctx, suite.T(), graph.LargeItemClient(), Users) suite.user = tester.M365UserID(suite.T()) suite.acct = tester.NewM365Account(suite.T()) @@ -380,7 +380,7 @@ func runRestoreBackupTest( start := time.Now() - restoreGC := loadConnector(ctx, t, test.resource) + restoreGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) restoreSel := getSelectorWith(t, test.service, resourceOwners, true) deets, err := restoreGC.RestoreDataCollections( ctx, @@ -419,7 +419,7 @@ func runRestoreBackupTest( }) } - backupGC := loadConnector(ctx, t, test.resource) + backupGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) backupSel := backupSelectorForExpected(t, test.service, expectedDests) t.Logf("Selective backup of %s\n", backupSel) @@ -870,7 +870,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames dest.ContainerName, ) - restoreGC := loadConnector(ctx, t, test.resource) + restoreGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) deets, err := restoreGC.RestoreDataCollections(ctx, suite.acct, restoreSel, dest, collections) require.NoError(t, err) require.NotNil(t, deets) @@ -888,7 +888,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames // Run a backup and compare its output with what we put in. - backupGC := loadConnector(ctx, t, test.resource) + backupGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) backupSel := backupSelectorForExpected(t, test.service, expectedDests) t.Log("Selective backup of", backupSel) diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 4ea9ea9eb..ac0aa9fb3 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -4,11 +4,13 @@ package onedrive import ( "context" "io" + "net/http" "sync" "sync/atomic" "time" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/pkg/errors" "github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/alcionai/corso/src/internal/connector/graph" @@ -43,6 +45,9 @@ var ( // Collection represents a set of OneDrive objects retrieved from M365 type Collection struct { + // configured to handle large item downloads + itemClient *http.Client + // data is used to share data streams with the collection consumer data chan data.Stream // folderPath indicates what level in the hierarchy this collection @@ -64,12 +69,13 @@ type Collection struct { // itemReadFunc returns a reader for the specified item type itemReaderFunc func( - ctx context.Context, + hc *http.Client, item models.DriveItemable, ) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error) // NewCollection creates a Collection func NewCollection( + itemClient *http.Client, folderPath path.Path, driveID string, service graph.Servicer, @@ -78,6 +84,7 @@ func NewCollection( ctrlOpts control.Options, ) *Collection { c := &Collection{ + itemClient: itemClient, folderPath: folderPath, driveItems: map[string]models.DriveItemable{}, driveID: driveID, @@ -198,11 +205,16 @@ func (oc *Collection) populateItems(ctx context.Context) { m.Unlock() } - for _, item := range oc.driveItems { + for id, item := range oc.driveItems { if oc.ctrl.FailFast && errs != nil { break } + if item == nil { + errUpdater(id, errors.New("nil item")) + continue + } + semaphoreCh <- struct{}{} wg.Add(1) @@ -219,10 +231,9 @@ func (oc *Collection) populateItems(ctx context.Context) { ) for i := 1; i <= maxRetries; i++ { - itemInfo, itemData, err = oc.itemReader(ctx, item) - - // retry on Timeout type errors, break otherwise. + itemInfo, itemData, err = oc.itemReader(oc.itemClient, item) if err == nil || graph.IsErrTimeout(err) == nil { + // retry on Timeout type errors, break otherwise. break } diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index a19021ff7..a36db58c9 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -2,9 +2,8 @@ package onedrive import ( "bytes" - "context" - "errors" "io" + "net/http" "sync" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/backup/details" @@ -73,7 +73,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { name: "oneDrive, no duplicates", numInstances: 1, source: OneDriveSource, - itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -87,7 +87,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { name: "oneDrive, duplicates", numInstances: 3, source: OneDriveSource, - itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -101,7 +101,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { name: "sharePoint, no duplicates", numInstances: 1, source: SharePointSource, - itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -115,7 +115,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { name: "sharePoint, duplicates", numInstances: 3, source: SharePointSource, - itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -140,6 +140,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { require.NoError(t, err) coll := NewCollection( + graph.LargeItemClient(), folderPath, "drive-id", suite, @@ -224,6 +225,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { require.NoError(t, err) coll := NewCollection( + graph.LargeItemClient(), folderPath, "fakeDriveID", suite, @@ -235,10 +237,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { mockItem.SetId(&testItemID) coll.Add(mockItem) - readError := errors.New("Test error") - - coll.itemReader = func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { - return details.ItemInfo{}, nil, readError + coll.itemReader = func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { + return details.ItemInfo{}, nil, assert.AnError } coll.Items() diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index 8f18aa780..f446aa246 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -3,6 +3,7 @@ package onedrive import ( "context" "fmt" + "net/http" "strings" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -46,6 +47,9 @@ type folderMatcher interface { // Collections is used to retrieve drive data for a // resource owner, which can be either a user or a sharepoint site. type Collections struct { + // configured to handle large item downloads + itemClient *http.Client + tenant string resourceOwner string source driveSource @@ -66,6 +70,7 @@ type Collections struct { } func NewCollections( + itemClient *http.Client, tenant string, resourceOwner string, source driveSource, @@ -75,6 +80,7 @@ func NewCollections( ctrlOpts control.Options, ) *Collections { return &Collections{ + itemClient: itemClient, tenant: tenant, resourceOwner: resourceOwner, source: source, @@ -265,13 +271,13 @@ func (c *Collections) UpdateCollections( // TODO(ashmrtn): Compare old and new path and set collection state // accordingly. col = NewCollection( + c.itemClient, collectionPath, driveID, c.service, c.statusUpdater, c.source, - c.ctrl, - ) + c.ctrl) c.CollectionMap[collectionPath.String()] = col c.NumContainers++ diff --git a/src/internal/connector/onedrive/collections_test.go b/src/internal/connector/onedrive/collections_test.go index 485b9ed86..21ca061a9 100644 --- a/src/internal/connector/onedrive/collections_test.go +++ b/src/internal/connector/onedrive/collections_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/suite" "golang.org/x/exp/maps" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/selectors" @@ -587,6 +588,7 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() { outputFolderMap := map[string]string{} maps.Copy(outputFolderMap, tt.inputFolderMap) c := NewCollections( + graph.LargeItemClient(), tenant, user, OneDriveSource, diff --git a/src/internal/connector/onedrive/drive_test.go b/src/internal/connector/onedrive/drive_test.go index 7f6901621..755b7293b 100644 --- a/src/internal/connector/onedrive/drive_test.go +++ b/src/internal/connector/onedrive/drive_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/common" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/logger" @@ -146,6 +147,7 @@ func (suite *OneDriveSuite) TestOneDriveNewCollections() { NewOneDriveBackup([]string{test.user}). AllData()[0] odcs, err := NewCollections( + graph.LargeItemClient(), creds.AzureTenantID, test.user, OneDriveSource, diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index 73391033b..3e4e9e516 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "net/http" "strings" msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives" @@ -13,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/uploadsession" + "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/logger" ) @@ -27,7 +29,7 @@ const ( // It crafts this by querying M365 for a download URL for the item // and using a http client to initialize a reader func sharePointItemReader( - ctx context.Context, + hc *http.Client, item models.DriveItemable, ) (details.ItemInfo, io.ReadCloser, error) { url, ok := item.GetAdditionalData()[downloadURLKey].(*string) @@ -35,7 +37,7 @@ func sharePointItemReader( return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) } - rc, err := driveItemReader(ctx, *url) + resp, err := hc.Get(*url) if err != nil { return details.ItemInfo{}, nil, err } @@ -44,14 +46,14 @@ func sharePointItemReader( SharePoint: sharePointItemInfo(item, *item.GetSize()), } - return dii, rc, nil + return dii, resp.Body, nil } // oneDriveItemReader will return a io.ReadCloser for the specified item // It crafts this by querying M365 for a download URL for the item // and using a http client to initialize a reader func oneDriveItemReader( - ctx context.Context, + hc *http.Client, item models.DriveItemable, ) (details.ItemInfo, io.ReadCloser, error) { url, ok := item.GetAdditionalData()[downloadURLKey].(*string) @@ -59,7 +61,17 @@ func oneDriveItemReader( return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) } - rc, err := driveItemReader(ctx, *url) + req, err := http.NewRequest(http.MethodGet, *url, nil) + if err != nil { + return details.ItemInfo{}, nil, err + } + + // Decorate the traffic + //nolint:lll + // See https://learn.microsoft.com/en-us/sharepoint/dev/general-development/how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online#how-to-decorate-your-http-traffic + req.Header.Set("User-Agent", "ISV|Alcion|Corso/"+version.Version) + + resp, err := hc.Do(req) if err != nil { return details.ItemInfo{}, nil, err } @@ -68,25 +80,7 @@ func oneDriveItemReader( OneDrive: oneDriveItemInfo(item, *item.GetSize()), } - return dii, rc, nil -} - -// driveItemReader will return a io.ReadCloser for the specified item -// It crafts this by querying M365 for a download URL for the item -// and using a http client to initialize a reader -func driveItemReader( - ctx context.Context, - url string, -) (io.ReadCloser, error) { - httpClient := graph.CreateHTTPClient() - httpClient.Timeout = 0 // infinite timeout for pulling large files - - resp, err := httpClient.Get(url) - if err != nil { - return nil, errors.Wrapf(err, "failed to download file from %s", url) - } - - return resp.Body, nil + return dii, resp.Body, nil } // oneDriveItemInfo will populate a details.OneDriveInfo struct @@ -97,7 +91,7 @@ func driveItemReader( func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo { var email, parent string - if di.GetCreatedBy().GetUser() != nil { + if di.GetCreatedBy() != nil && di.GetCreatedBy().GetUser() != nil { // User is sometimes not available when created via some // external applications (like backup/restore solutions) ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"] @@ -106,11 +100,9 @@ func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDrive } } - if di.GetParentReference() != nil { - if di.GetParentReference().GetName() != nil { - // EndPoint is not always populated from external apps - parent = *di.GetParentReference().GetName() - } + if di.GetParentReference() != nil && di.GetParentReference().GetName() != nil { + // EndPoint is not always populated from external apps + parent = *di.GetParentReference().GetName() } return &details.OneDriveInfo{ diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index b3f45ca56..2b28f0910 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -126,7 +126,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { // Read data for the file - itemInfo, itemData, err := oneDriveItemReader(ctx, driveItem) + itemInfo, itemData, err := oneDriveItemReader(graph.LargeItemClient(), driveItem) require.NoError(suite.T(), err) require.NotNil(suite.T(), itemInfo.OneDrive) require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName) diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index d7c6547a0..5950daede 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -2,6 +2,7 @@ package sharepoint import ( "context" + "net/http" "github.com/pkg/errors" @@ -24,6 +25,7 @@ type statusUpdater interface { // for the specified user func DataCollections( ctx context.Context, + itemClient *http.Client, selector selectors.Selector, tenantID string, serv graph.Servicer, @@ -66,6 +68,7 @@ func DataCollections( case path.LibrariesCategory: spcs, err = collectLibraries( ctx, + itemClient, serv, tenantID, site, @@ -124,6 +127,7 @@ func collectLists( // all the drives associated with the site. func collectLibraries( ctx context.Context, + itemClient *http.Client, serv graph.Servicer, tenantID, siteID string, scope selectors.SharePointScope, @@ -138,6 +142,7 @@ func collectLibraries( logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint Library collections") colls := onedrive.NewCollections( + itemClient, tenantID, siteID, onedrive.SharePointSource, diff --git a/src/internal/connector/sharepoint/data_collections_test.go b/src/internal/connector/sharepoint/data_collections_test.go index 423336253..4daca0877 100644 --- a/src/internal/connector/sharepoint/data_collections_test.go +++ b/src/internal/connector/sharepoint/data_collections_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" @@ -91,6 +92,7 @@ func (suite *SharePointLibrariesSuite) TestUpdateCollections() { newPaths := map[string]string{} excluded := map[string]struct{}{} c := onedrive.NewCollections( + graph.LargeItemClient(), tenant, site, onedrive.SharePointSource, diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 3ee5c0230..83748baa2 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -655,7 +655,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() { m365, err := acct.M365Config() require.NoError(t, err) - gc, err := connector.NewGraphConnector(ctx, acct, connector.Users) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) require.NoError(t, err) ac, err := api.NewClient(m365) diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 30770bdf5..e5382b022 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/observe" @@ -107,7 +108,7 @@ func connectToM365( resource = connector.Sites } - gc, err := connector.NewGraphConnector(ctx, acct, resource) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, resource) if err != nil { return nil, err } diff --git a/src/pkg/services/m365/m365.go b/src/pkg/services/m365/m365.go index ba9cc2a59..e0dd75af9 100644 --- a/src/pkg/services/m365/m365.go +++ b/src/pkg/services/m365/m365.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/discovery" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/pkg/account" ) @@ -20,7 +21,7 @@ type User struct { // Users returns a list of users in the specified M365 tenant // TODO: Implement paging support func Users(ctx context.Context, m365Account account.Account) ([]*User, error) { - gc, err := connector.NewGraphConnector(ctx, m365Account, connector.Users) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), m365Account, connector.Users) if err != nil { return nil, errors.Wrap(err, "could not initialize M365 graph connection") } @@ -76,7 +77,7 @@ func UserPNs(ctx context.Context, m365Account account.Account) ([]string, error) // SiteURLs returns a list of SharePoint site WebURLs in the specified M365 tenant func SiteURLs(ctx context.Context, m365Account account.Account) ([]string, error) { - gc, err := connector.NewGraphConnector(ctx, m365Account, connector.Sites) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), m365Account, connector.Sites) if err != nil { return nil, errors.Wrap(err, "could not initialize M365 graph connection") } @@ -86,7 +87,7 @@ func SiteURLs(ctx context.Context, m365Account account.Account) ([]string, error // SiteURLs returns a list of SharePoint sites IDs in the specified M365 tenant func SiteIDs(ctx context.Context, m365Account account.Account) ([]string, error) { - gc, err := connector.NewGraphConnector(ctx, m365Account, connector.Sites) + gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), m365Account, connector.Sites) if err != nil { return nil, errors.Wrap(err, "could not initialize M365 graph connection") }