Revert "add itemClient for re-usable od item downloads" (#2273)

Reverts alcionai/corso#2266. Am seeing issues on data restore that need to be debugged.
This commit is contained in:
Vaibhav Kamra 2023-01-25 23:52:56 -08:00 committed by GitHub
parent 84a21aef52
commit e9354a8429
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 75 additions and 132 deletions

View File

@ -13,7 +13,6 @@ import (
. "github.com/alcionai/corso/src/cli/print" . "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup"
@ -210,7 +209,7 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error {
defer utils.CloseRepo(ctx, r) defer utils.CloseRepo(ctx, r)
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Sites) gc, err := connector.NewGraphConnector(ctx, acct, connector.Sites)
if err != nil { if err != nil {
return Only(ctx, errors.Wrap(err, "Failed to connect to Microsoft APIs")) return Only(ctx, errors.Wrap(err, "Failed to connect to Microsoft APIs"))
} }

View File

@ -12,7 +12,6 @@ import (
. "github.com/alcionai/corso/src/cli/print" . "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
@ -112,7 +111,7 @@ func getGCAndVerifyUser(ctx context.Context, userID string) (*connector.GraphCon
} }
// build a graph connector // build a graph connector
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) gc, err := connector.NewGraphConnector(ctx, acct, connector.Users)
if err != nil { if err != nil {
return nil, account.Account{}, errors.Wrap(err, "connecting to graph api") return nil, account.Account{}, errors.Wrap(err, "connecting to graph api")
} }

View File

@ -19,7 +19,6 @@ import (
"github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/exchange/api" "github.com/alcionai/corso/src/internal/connector/exchange/api"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/credentials" "github.com/alcionai/corso/src/pkg/credentials"
@ -178,7 +177,7 @@ func getGC(ctx context.Context) (*connector.GraphConnector, account.M365Config,
return nil, m365Cfg, Only(ctx, errors.Wrap(err, "finding m365 account details")) return nil, m365Cfg, Only(ctx, errors.Wrap(err, "finding m365 account details"))
} }
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) gc, err := connector.NewGraphConnector(ctx, acct, connector.Users)
if err != nil { if err != nil {
return nil, m365Cfg, Only(ctx, errors.Wrap(err, "connecting to graph API")) return nil, m365Cfg, Only(ctx, errors.Wrap(err, "connecting to graph API"))
} }

View File

@ -255,7 +255,7 @@ func getGC(ctx context.Context) (*connector.GraphConnector, error) {
} }
// build a graph connector // build a graph connector
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) gc, err := connector.NewGraphConnector(ctx, acct, connector.Users)
if err != nil { if err != nil {
return nil, Only(ctx, errors.Wrap(err, "connecting to graph api")) return nil, Only(ctx, errors.Wrap(err, "connecting to graph api"))
} }

View File

@ -87,7 +87,6 @@ func (gc *GraphConnector) DataCollections(
case selectors.ServiceSharePoint: case selectors.ServiceSharePoint:
colls, err := sharepoint.DataCollections( colls, err := sharepoint.DataCollections(
ctx, ctx,
gc.itemClient,
sels, sels,
gc.credentials.AzureTenantID, gc.credentials.AzureTenantID,
gc.Service, gc.Service,
@ -199,7 +198,6 @@ func (gc *GraphConnector) OneDriveDataCollections(
logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections") logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections")
odcs, err := onedrive.NewCollections( odcs, err := onedrive.NewCollections(
gc.itemClient,
gc.credentials.AzureTenantID, gc.credentials.AzureTenantID,
user, user,
onedrive.OneDriveSource, onedrive.OneDriveSource,

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/sharepoint" "github.com/alcionai/corso/src/internal/connector/sharepoint"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
@ -44,7 +43,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) SetupSuite() {
tester.MustGetEnvVars(suite.T(), tester.M365AcctCredEnvs...) tester.MustGetEnvVars(suite.T(), tester.M365AcctCredEnvs...)
suite.connector = loadConnector(ctx, suite.T(), graph.LargeItemClient(), AllResources) suite.connector = loadConnector(ctx, suite.T(), AllResources)
suite.user = tester.M365UserID(suite.T()) suite.user = tester.M365UserID(suite.T())
suite.site = tester.M365SiteID(suite.T()) suite.site = tester.M365SiteID(suite.T())
@ -63,7 +62,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
selUsers := []string{suite.user} selUsers := []string{suite.user}
connector := loadConnector(ctx, suite.T(), graph.LargeItemClient(), Users) connector := loadConnector(ctx, suite.T(), Users)
tests := []struct { tests := []struct {
name string name string
getSelector func(t *testing.T) selectors.Selector getSelector func(t *testing.T) selectors.Selector
@ -139,7 +138,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestDataCollections_invali
owners := []string{"snuffleupagus"} owners := []string{"snuffleupagus"}
connector := loadConnector(ctx, suite.T(), graph.LargeItemClient(), Users) connector := loadConnector(ctx, suite.T(), Users)
tests := []struct { tests := []struct {
name string name string
getSelector func(t *testing.T) selectors.Selector getSelector func(t *testing.T) selectors.Selector
@ -215,7 +214,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
selSites := []string{suite.site} selSites := []string{suite.site}
connector := loadConnector(ctx, suite.T(), graph.LargeItemClient(), Sites) connector := loadConnector(ctx, suite.T(), Sites)
tests := []struct { tests := []struct {
name string name string
expected int expected int
@ -244,7 +243,6 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
collections, err := sharepoint.DataCollections( collections, err := sharepoint.DataCollections(
ctx, ctx,
graph.LargeItemClient(),
test.getSelector(), test.getSelector(),
connector.credentials.AzureTenantID, connector.credentials.AzureTenantID,
connector.Service, connector.Service,
@ -300,7 +298,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) SetupSuite() {
tester.MustGetEnvSets(suite.T(), tester.M365AcctCredEnvs) tester.MustGetEnvSets(suite.T(), tester.M365AcctCredEnvs)
suite.connector = loadConnector(ctx, suite.T(), graph.LargeItemClient(), Sites) suite.connector = loadConnector(ctx, suite.T(), Sites)
suite.user = tester.M365UserID(suite.T()) suite.user = tester.M365UserID(suite.T())
tester.LogTimeOfTest(suite.T()) tester.LogTimeOfTest(suite.T())
@ -313,7 +311,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar
var ( var (
t = suite.T() t = suite.T()
siteID = tester.M365SiteID(t) siteID = tester.M365SiteID(t)
gc = loadConnector(ctx, t, graph.LargeItemClient(), Sites) gc = loadConnector(ctx, t, Sites)
siteIDs = []string{siteID} siteIDs = []string{siteID}
) )
@ -337,7 +335,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar
var ( var (
t = suite.T() t = suite.T()
siteID = tester.M365SiteID(t) siteID = tester.M365SiteID(t)
gc = loadConnector(ctx, t, graph.LargeItemClient(), Sites) gc = loadConnector(ctx, t, Sites)
siteIDs = []string{siteID} siteIDs = []string{siteID}
) )

View File

@ -55,31 +55,6 @@ func CreateHTTPClient() *http.Client {
return httpClient return httpClient
} }
// LargeItemClient generates a client that's configured to handle
// large file downloads. This client isn't suitable for other queries
// due to loose restrictions on timeouts and such.
//
// Re-use of http clients is critical, or else we leak os resources
// and consume relatively unbound socket connections. It is important
// to centralize this client to be passed downstream where api calls
// can utilize it on a per-download basis.
//
// TODO: this should get owned by an API client layer, not the GC itself.
func LargeItemClient() *http.Client {
httpClient := CreateHTTPClient()
httpClient.Timeout = 0 // infinite timeout for pulling large files
httpClient.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
return httpClient
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Logging Middleware // Logging Middleware
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -110,10 +85,6 @@ func (handler *LoggingMiddleware) Intercept(
logger.Ctx(ctx).Infow("graph api throttling", "method", req.Method, "url", req.URL) logger.Ctx(ctx).Infow("graph api throttling", "method", req.Method, "url", req.URL)
} }
if resp.StatusCode != http.StatusTooManyRequests && (resp.StatusCode/100) != 2 {
logger.Ctx(ctx).Infow("graph api error", "method", req.Method, "url", req.URL)
}
if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" { if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" {
respDump, _ := httputil.DumpResponse(resp, true) respDump, _ := httputil.DumpResponse(resp, true)

View File

@ -4,7 +4,6 @@ package connector
import ( import (
"context" "context"
"net/http"
"runtime/trace" "runtime/trace"
"strings" "strings"
"sync" "sync"
@ -39,9 +38,8 @@ import (
// GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for // GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for
// bookkeeping and interfacing with other component. // bookkeeping and interfacing with other component.
type GraphConnector struct { type GraphConnector struct {
Service graph.Servicer Service graph.Servicer
Owners api.Client Owners api.Client
itemClient *http.Client // configured to handle large item downloads
tenant string tenant string
Users map[string]string // key<email> value<id> Users map[string]string // key<email> value<id>
@ -66,19 +64,13 @@ const (
Sites Sites
) )
func NewGraphConnector( func NewGraphConnector(ctx context.Context, acct account.Account, r resource) (*GraphConnector, error) {
ctx context.Context,
itemClient *http.Client,
acct account.Account,
r resource,
) (*GraphConnector, error) {
m365, err := acct.M365Config() m365, err := acct.M365Config()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "retrieving m365 account configuration") return nil, errors.Wrap(err, "retrieving m365 account configuration")
} }
gc := GraphConnector{ gc := GraphConnector{
itemClient: itemClient,
tenant: m365.AzureTenantID, tenant: m365.AzureTenantID,
Users: make(map[string]string, 0), Users: make(map[string]string, 0),
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},

View File

@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
@ -66,7 +65,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestBadConnection() {
for _, test := range table { for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
gc, err := NewGraphConnector(ctx, graph.LargeItemClient(), test.acct(t), Users) gc, err := NewGraphConnector(ctx, test.acct(t), Users)
assert.Nil(t, gc, test.name+" failed") assert.Nil(t, gc, test.name+" failed")
assert.NotNil(t, err, test.name+"failed") assert.NotNil(t, err, test.name+"failed")
}) })

View File

@ -3,7 +3,6 @@ package connector
import ( import (
"context" "context"
"io" "io"
"net/http"
"reflect" "reflect"
"testing" "testing"
@ -977,9 +976,9 @@ func getSelectorWith(
} }
} }
func loadConnector(ctx context.Context, t *testing.T, itemClient *http.Client, r resource) *GraphConnector { func loadConnector(ctx context.Context, t *testing.T, r resource) *GraphConnector {
a := tester.NewM365Account(t) a := tester.NewM365Account(t)
connector, err := NewGraphConnector(ctx, itemClient, a, r) connector, err := NewGraphConnector(ctx, a, r)
require.NoError(t, err) require.NoError(t, err)
return connector return connector

View File

@ -156,7 +156,7 @@ func (suite *GraphConnectorIntegrationSuite) SetupSuite() {
tester.MustGetEnvSets(suite.T(), tester.M365AcctCredEnvs) tester.MustGetEnvSets(suite.T(), tester.M365AcctCredEnvs)
suite.connector = loadConnector(ctx, suite.T(), graph.LargeItemClient(), Users) suite.connector = loadConnector(ctx, suite.T(), Users)
suite.user = tester.M365UserID(suite.T()) suite.user = tester.M365UserID(suite.T())
suite.acct = tester.NewM365Account(suite.T()) suite.acct = tester.NewM365Account(suite.T())
@ -380,7 +380,7 @@ func runRestoreBackupTest(
start := time.Now() start := time.Now()
restoreGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) restoreGC := loadConnector(ctx, t, test.resource)
restoreSel := getSelectorWith(t, test.service, resourceOwners, true) restoreSel := getSelectorWith(t, test.service, resourceOwners, true)
deets, err := restoreGC.RestoreDataCollections( deets, err := restoreGC.RestoreDataCollections(
ctx, ctx,
@ -419,7 +419,7 @@ func runRestoreBackupTest(
}) })
} }
backupGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) backupGC := loadConnector(ctx, t, test.resource)
backupSel := backupSelectorForExpected(t, test.service, expectedDests) backupSel := backupSelectorForExpected(t, test.service, expectedDests)
t.Logf("Selective backup of %s\n", backupSel) t.Logf("Selective backup of %s\n", backupSel)
@ -870,7 +870,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
dest.ContainerName, dest.ContainerName,
) )
restoreGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) restoreGC := loadConnector(ctx, t, test.resource)
deets, err := restoreGC.RestoreDataCollections(ctx, suite.acct, restoreSel, dest, collections) deets, err := restoreGC.RestoreDataCollections(ctx, suite.acct, restoreSel, dest, collections)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, deets) require.NotNil(t, deets)
@ -888,7 +888,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
// Run a backup and compare its output with what we put in. // Run a backup and compare its output with what we put in.
backupGC := loadConnector(ctx, t, graph.LargeItemClient(), test.resource) backupGC := loadConnector(ctx, t, test.resource)
backupSel := backupSelectorForExpected(t, test.service, expectedDests) backupSel := backupSelectorForExpected(t, test.service, expectedDests)
t.Log("Selective backup of", backupSel) t.Log("Selective backup of", backupSel)

View File

@ -4,13 +4,11 @@ package onedrive
import ( import (
"context" "context"
"io" "io"
"net/http"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
"github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/spatialcurrent/go-lazy/pkg/lazy"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
@ -45,9 +43,6 @@ var (
// Collection represents a set of OneDrive objects retrieved from M365 // Collection represents a set of OneDrive objects retrieved from M365
type Collection struct { type Collection struct {
// configured to handle large item downloads
itemClient *http.Client
// data is used to share data streams with the collection consumer // data is used to share data streams with the collection consumer
data chan data.Stream data chan data.Stream
// folderPath indicates what level in the hierarchy this collection // folderPath indicates what level in the hierarchy this collection
@ -69,13 +64,12 @@ type Collection struct {
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
type itemReaderFunc func( type itemReaderFunc func(
hc *http.Client, ctx context.Context,
item models.DriveItemable, item models.DriveItemable,
) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error) ) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error)
// NewCollection creates a Collection // NewCollection creates a Collection
func NewCollection( func NewCollection(
itemClient *http.Client,
folderPath path.Path, folderPath path.Path,
driveID string, driveID string,
service graph.Servicer, service graph.Servicer,
@ -84,7 +78,6 @@ func NewCollection(
ctrlOpts control.Options, ctrlOpts control.Options,
) *Collection { ) *Collection {
c := &Collection{ c := &Collection{
itemClient: itemClient,
folderPath: folderPath, folderPath: folderPath,
driveItems: map[string]models.DriveItemable{}, driveItems: map[string]models.DriveItemable{},
driveID: driveID, driveID: driveID,
@ -205,16 +198,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
m.Unlock() m.Unlock()
} }
for id, item := range oc.driveItems { for _, item := range oc.driveItems {
if oc.ctrl.FailFast && errs != nil { if oc.ctrl.FailFast && errs != nil {
break break
} }
if item == nil {
errUpdater(id, errors.New("nil item"))
continue
}
semaphoreCh <- struct{}{} semaphoreCh <- struct{}{}
wg.Add(1) wg.Add(1)
@ -231,9 +219,10 @@ func (oc *Collection) populateItems(ctx context.Context) {
) )
for i := 1; i <= maxRetries; i++ { for i := 1; i <= maxRetries; i++ {
itemInfo, itemData, err = oc.itemReader(oc.itemClient, item) itemInfo, itemData, err = oc.itemReader(ctx, item)
// retry on Timeout type errors, break otherwise.
if err == nil || graph.IsErrTimeout(err) == nil { if err == nil || graph.IsErrTimeout(err) == nil {
// retry on Timeout type errors, break otherwise.
break break
} }

View File

@ -2,8 +2,9 @@ package onedrive
import ( import (
"bytes" "bytes"
"context"
"errors"
"io" "io"
"net/http"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -14,7 +15,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
@ -73,7 +73,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
name: "oneDrive, no duplicates", name: "oneDrive, no duplicates",
numInstances: 1, numInstances: 1,
source: OneDriveSource, source: OneDriveSource,
itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}}, return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)), io.NopCloser(bytes.NewReader(testItemData)),
nil nil
@ -87,7 +87,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
name: "oneDrive, duplicates", name: "oneDrive, duplicates",
numInstances: 3, numInstances: 3,
source: OneDriveSource, source: OneDriveSource,
itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}}, return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)), io.NopCloser(bytes.NewReader(testItemData)),
nil nil
@ -101,7 +101,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
name: "sharePoint, no duplicates", name: "sharePoint, no duplicates",
numInstances: 1, numInstances: 1,
source: SharePointSource, source: SharePointSource,
itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}}, return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)), io.NopCloser(bytes.NewReader(testItemData)),
nil nil
@ -115,7 +115,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
name: "sharePoint, duplicates", name: "sharePoint, duplicates",
numInstances: 3, numInstances: 3,
source: SharePointSource, source: SharePointSource,
itemReader: func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}}, return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName, Modified: now}},
io.NopCloser(bytes.NewReader(testItemData)), io.NopCloser(bytes.NewReader(testItemData)),
nil nil
@ -140,7 +140,6 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
require.NoError(t, err) require.NoError(t, err)
coll := NewCollection( coll := NewCollection(
graph.LargeItemClient(),
folderPath, folderPath,
"drive-id", "drive-id",
suite, suite,
@ -225,7 +224,6 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
require.NoError(t, err) require.NoError(t, err)
coll := NewCollection( coll := NewCollection(
graph.LargeItemClient(),
folderPath, folderPath,
"fakeDriveID", "fakeDriveID",
suite, suite,
@ -237,8 +235,10 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
mockItem.SetId(&testItemID) mockItem.SetId(&testItemID)
coll.Add(mockItem) coll.Add(mockItem)
coll.itemReader = func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { readError := errors.New("Test error")
return details.ItemInfo{}, nil, assert.AnError
coll.itemReader = func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{}, nil, readError
} }
coll.Items() coll.Items()

View File

@ -3,7 +3,6 @@ package onedrive
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"strings" "strings"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
@ -47,9 +46,6 @@ type folderMatcher interface {
// Collections is used to retrieve drive data for a // Collections is used to retrieve drive data for a
// resource owner, which can be either a user or a sharepoint site. // resource owner, which can be either a user or a sharepoint site.
type Collections struct { type Collections struct {
// configured to handle large item downloads
itemClient *http.Client
tenant string tenant string
resourceOwner string resourceOwner string
source driveSource source driveSource
@ -70,7 +66,6 @@ type Collections struct {
} }
func NewCollections( func NewCollections(
itemClient *http.Client,
tenant string, tenant string,
resourceOwner string, resourceOwner string,
source driveSource, source driveSource,
@ -80,7 +75,6 @@ func NewCollections(
ctrlOpts control.Options, ctrlOpts control.Options,
) *Collections { ) *Collections {
return &Collections{ return &Collections{
itemClient: itemClient,
tenant: tenant, tenant: tenant,
resourceOwner: resourceOwner, resourceOwner: resourceOwner,
source: source, source: source,
@ -271,13 +265,13 @@ func (c *Collections) UpdateCollections(
// TODO(ashmrtn): Compare old and new path and set collection state // TODO(ashmrtn): Compare old and new path and set collection state
// accordingly. // accordingly.
col = NewCollection( col = NewCollection(
c.itemClient,
collectionPath, collectionPath,
driveID, driveID,
c.service, c.service,
c.statusUpdater, c.statusUpdater,
c.source, c.source,
c.ctrl) c.ctrl,
)
c.CollectionMap[collectionPath.String()] = col c.CollectionMap[collectionPath.String()] = col
c.NumContainers++ c.NumContainers++

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
@ -588,7 +587,6 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() {
outputFolderMap := map[string]string{} outputFolderMap := map[string]string{}
maps.Copy(outputFolderMap, tt.inputFolderMap) maps.Copy(outputFolderMap, tt.inputFolderMap)
c := NewCollections( c := NewCollections(
graph.LargeItemClient(),
tenant, tenant,
user, user,
OneDriveSource, OneDriveSource,

View File

@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
@ -147,7 +146,6 @@ func (suite *OneDriveSuite) TestOneDriveNewCollections() {
NewOneDriveBackup([]string{test.user}). NewOneDriveBackup([]string{test.user}).
AllData()[0] AllData()[0]
odcs, err := NewCollections( odcs, err := NewCollections(
graph.LargeItemClient(),
creds.AzureTenantID, creds.AzureTenantID,
test.user, test.user,
OneDriveSource, OneDriveSource,

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net/http"
"strings" "strings"
msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives" msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives"
@ -28,7 +27,7 @@ const (
// It crafts this by querying M365 for a download URL for the item // It crafts this by querying M365 for a download URL for the item
// and using a http client to initialize a reader // and using a http client to initialize a reader
func sharePointItemReader( func sharePointItemReader(
hc *http.Client, ctx context.Context,
item models.DriveItemable, item models.DriveItemable,
) (details.ItemInfo, io.ReadCloser, error) { ) (details.ItemInfo, io.ReadCloser, error) {
url, ok := item.GetAdditionalData()[downloadURLKey].(*string) url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
@ -36,7 +35,7 @@ func sharePointItemReader(
return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName())
} }
resp, err := hc.Get(*url) rc, err := driveItemReader(ctx, *url)
if err != nil { if err != nil {
return details.ItemInfo{}, nil, err return details.ItemInfo{}, nil, err
} }
@ -45,14 +44,14 @@ func sharePointItemReader(
SharePoint: sharePointItemInfo(item, *item.GetSize()), SharePoint: sharePointItemInfo(item, *item.GetSize()),
} }
return dii, resp.Body, nil return dii, rc, nil
} }
// oneDriveItemReader will return a io.ReadCloser for the specified item // oneDriveItemReader will return a io.ReadCloser for the specified item
// It crafts this by querying M365 for a download URL for the item // It crafts this by querying M365 for a download URL for the item
// and using a http client to initialize a reader // and using a http client to initialize a reader
func oneDriveItemReader( func oneDriveItemReader(
hc *http.Client, ctx context.Context,
item models.DriveItemable, item models.DriveItemable,
) (details.ItemInfo, io.ReadCloser, error) { ) (details.ItemInfo, io.ReadCloser, error) {
url, ok := item.GetAdditionalData()[downloadURLKey].(*string) url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
@ -60,7 +59,7 @@ func oneDriveItemReader(
return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName())
} }
resp, err := hc.Get(*url) rc, err := driveItemReader(ctx, *url)
if err != nil { if err != nil {
return details.ItemInfo{}, nil, err return details.ItemInfo{}, nil, err
} }
@ -69,7 +68,25 @@ func oneDriveItemReader(
OneDrive: oneDriveItemInfo(item, *item.GetSize()), OneDrive: oneDriveItemInfo(item, *item.GetSize()),
} }
return dii, resp.Body, nil return dii, rc, nil
}
// driveItemReader will return a io.ReadCloser for the specified item
// It crafts this by querying M365 for a download URL for the item
// and using a http client to initialize a reader
func driveItemReader(
ctx context.Context,
url string,
) (io.ReadCloser, error) {
httpClient := graph.CreateHTTPClient()
httpClient.Timeout = 0 // infinite timeout for pulling large files
resp, err := httpClient.Get(url)
if err != nil {
return nil, errors.Wrapf(err, "failed to download file from %s", url)
}
return resp.Body, nil
} }
// oneDriveItemInfo will populate a details.OneDriveInfo struct // oneDriveItemInfo will populate a details.OneDriveInfo struct
@ -80,7 +97,7 @@ func oneDriveItemReader(
func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo { func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo {
var email, parent string var email, parent string
if di.GetCreatedBy() != nil && di.GetCreatedBy().GetUser() != nil { if di.GetCreatedBy().GetUser() != nil {
// User is sometimes not available when created via some // User is sometimes not available when created via some
// external applications (like backup/restore solutions) // external applications (like backup/restore solutions)
ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"] ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"]
@ -89,9 +106,11 @@ func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDrive
} }
} }
if di.GetParentReference() != nil && di.GetParentReference().GetName() != nil { if di.GetParentReference() != nil {
// EndPoint is not always populated from external apps if di.GetParentReference().GetName() != nil {
parent = *di.GetParentReference().GetName() // EndPoint is not always populated from external apps
parent = *di.GetParentReference().GetName()
}
} }
return &details.OneDriveInfo{ return &details.OneDriveInfo{

View File

@ -126,7 +126,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
// Read data for the file // Read data for the file
itemInfo, itemData, err := oneDriveItemReader(graph.LargeItemClient(), driveItem) itemInfo, itemData, err := oneDriveItemReader(ctx, driveItem)
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
require.NotNil(suite.T(), itemInfo.OneDrive) require.NotNil(suite.T(), itemInfo.OneDrive)
require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName) require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName)

View File

@ -2,7 +2,6 @@ package sharepoint
import ( import (
"context" "context"
"net/http"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -25,7 +24,6 @@ type statusUpdater interface {
// for the specified user // for the specified user
func DataCollections( func DataCollections(
ctx context.Context, ctx context.Context,
itemClient *http.Client,
selector selectors.Selector, selector selectors.Selector,
tenantID string, tenantID string,
serv graph.Servicer, serv graph.Servicer,
@ -68,7 +66,6 @@ func DataCollections(
case path.LibrariesCategory: case path.LibrariesCategory:
spcs, err = collectLibraries( spcs, err = collectLibraries(
ctx, ctx,
itemClient,
serv, serv,
tenantID, tenantID,
site, site,
@ -127,7 +124,6 @@ func collectLists(
// all the drives associated with the site. // all the drives associated with the site.
func collectLibraries( func collectLibraries(
ctx context.Context, ctx context.Context,
itemClient *http.Client,
serv graph.Servicer, serv graph.Servicer,
tenantID, siteID string, tenantID, siteID string,
scope selectors.SharePointScope, scope selectors.SharePointScope,
@ -142,7 +138,6 @@ func collectLibraries(
logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint Library collections") logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint Library collections")
colls := onedrive.NewCollections( colls := onedrive.NewCollections(
itemClient,
tenantID, tenantID,
siteID, siteID,
onedrive.SharePointSource, onedrive.SharePointSource,

View File

@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/onedrive"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
@ -92,7 +91,6 @@ func (suite *SharePointLibrariesSuite) TestUpdateCollections() {
newPaths := map[string]string{} newPaths := map[string]string{}
excluded := map[string]struct{}{} excluded := map[string]struct{}{}
c := onedrive.NewCollections( c := onedrive.NewCollections(
graph.LargeItemClient(),
tenant, tenant,
site, site,
onedrive.SharePointSource, onedrive.SharePointSource,

View File

@ -655,7 +655,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() {
m365, err := acct.M365Config() m365, err := acct.M365Config()
require.NoError(t, err) require.NoError(t, err)
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, connector.Users) gc, err := connector.NewGraphConnector(ctx, acct, connector.Users)
require.NoError(t, err) require.NoError(t, err)
ac, err := api.NewClient(m365) ac, err := api.NewClient(m365)

View File

@ -7,7 +7,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
@ -108,7 +107,7 @@ func connectToM365(
resource = connector.Sites resource = connector.Sites
} }
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), acct, resource) gc, err := connector.NewGraphConnector(ctx, acct, resource)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -8,7 +8,6 @@ import (
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/discovery" "github.com/alcionai/corso/src/internal/connector/discovery"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
) )
@ -21,7 +20,7 @@ type User struct {
// Users returns a list of users in the specified M365 tenant // Users returns a list of users in the specified M365 tenant
// TODO: Implement paging support // TODO: Implement paging support
func Users(ctx context.Context, m365Account account.Account) ([]*User, error) { func Users(ctx context.Context, m365Account account.Account) ([]*User, error) {
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), m365Account, connector.Users) gc, err := connector.NewGraphConnector(ctx, m365Account, connector.Users)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not initialize M365 graph connection") return nil, errors.Wrap(err, "could not initialize M365 graph connection")
} }
@ -77,7 +76,7 @@ func UserPNs(ctx context.Context, m365Account account.Account) ([]string, error)
// SiteURLs returns a list of SharePoint site WebURLs in the specified M365 tenant // SiteURLs returns a list of SharePoint site WebURLs in the specified M365 tenant
func SiteURLs(ctx context.Context, m365Account account.Account) ([]string, error) { func SiteURLs(ctx context.Context, m365Account account.Account) ([]string, error) {
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), m365Account, connector.Sites) gc, err := connector.NewGraphConnector(ctx, m365Account, connector.Sites)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not initialize M365 graph connection") return nil, errors.Wrap(err, "could not initialize M365 graph connection")
} }
@ -87,7 +86,7 @@ func SiteURLs(ctx context.Context, m365Account account.Account) ([]string, error
// SiteURLs returns a list of SharePoint sites IDs in the specified M365 tenant // SiteURLs returns a list of SharePoint sites IDs in the specified M365 tenant
func SiteIDs(ctx context.Context, m365Account account.Account) ([]string, error) { func SiteIDs(ctx context.Context, m365Account account.Account) ([]string, error) {
gc, err := connector.NewGraphConnector(ctx, graph.LargeItemClient(), m365Account, connector.Sites) gc, err := connector.NewGraphConnector(ctx, m365Account, connector.Sites)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not initialize M365 graph connection") return nil, errors.Wrap(err, "could not initialize M365 graph connection")
} }