diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7bf3e3f2a..ea57c6595 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,11 +27,6 @@ jobs: with: go-version: 1.18 - - name: go-cache-paths - run: | - echo "::set-output name=go-build::$(go env GOCACHE)" - echo "::set-output name=go-mod::$(go env GOMODCACHE)" - - name: Cache Go build uses: actions/cache@v3 id: mybuild diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 2814ed328..7f277b76f 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "strings" + "sync/atomic" az "github.com/Azure/azure-sdk-for-go/sdk/azidentity" ka "github.com/microsoft/kiota-authentication-azure-go" @@ -23,7 +24,7 @@ import ( ) const ( - numberOfRetries = 3 + numberOfRetries = 4 mailCategory = "mail" ) @@ -31,12 +32,18 @@ const ( // GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for // bookkeeping and interfacing with other component. type GraphConnector struct { - tenant string - adapter msgraphsdk.GraphRequestAdapter + graphService + tenant string + Users map[string]string //key value + status *support.ConnectorOperationStatus // contains the status of the last run status + statusCh chan *support.ConnectorOperationStatus + awaitingMessages int32 + credentials account.M365Config +} + +type graphService struct { client msgraphsdk.GraphServiceClient - Users map[string]string //key value - Streams string //Not implemented for ease of code check-in - status *support.ConnectorOperationStatus // contains the status of the last run status + adapter msgraphsdk.GraphRequestAdapter } func NewGraphConnector(acct account.Account) (*GraphConnector, error) { @@ -44,8 +51,28 @@ func NewGraphConnector(acct account.Account) (*GraphConnector, error) { if err != nil { return nil, errors.Wrap(err, "retrieving m356 account configuration") } + gc := GraphConnector{ + tenant: m365.TenantID, + Users: make(map[string]string, 0), + status: nil, + statusCh: make(chan *support.ConnectorOperationStatus), + credentials: m365, + } + aService, err := gc.createService() + if err != nil { + return nil, err + } + gc.graphService = *aService + err = gc.setTenantUsers() + if err != nil { + return nil, err + } + return &gc, nil +} + +func createAdapter(tenant, client, secret string) (*msgraphsdk.GraphRequestAdapter, error) { // Client Provider: Uses Secret for access to tenant-level data - cred, err := az.NewClientSecretCredential(m365.TenantID, m365.ClientID, m365.ClientSecret, nil) + cred, err := az.NewClientSecretCredential(tenant, client, secret, nil) if err != nil { return nil, err } @@ -54,22 +81,20 @@ func NewGraphConnector(acct account.Account) (*GraphConnector, error) { return nil, err } adapter, err := msgraphsdk.NewGraphRequestAdapter(auth) + return adapter, err +} + +// createSubConnector private constructor method for subConnector +func (gc *GraphConnector) createService() (*graphService, error) { + adapter, err := createAdapter(gc.credentials.TenantID, gc.credentials.ClientID, gc.credentials.ClientSecret) if err != nil { return nil, err } - gc := GraphConnector{ - tenant: m365.TenantID, + connector := graphService{ adapter: *adapter, client: *msgraphsdk.NewGraphServiceClient(adapter), - Users: make(map[string]string, 0), - status: nil, } - // TODO: Revisit Query all users. - err = gc.setTenantUsers() - if err != nil { - return nil, err - } - return &gc, nil + return &connector, err } // setTenantUsers queries the M365 to identify the users in the @@ -83,7 +108,7 @@ func (gc *GraphConnector) setTenantUsers() error { options := &msuser.UsersRequestBuilderGetRequestConfiguration{ QueryParameters: requestParams, } - response, err := gc.client.Users().GetWithRequestConfigurationAndResponseHandler(options, nil) + response, err := gc.graphService.client.Users().GetWithRequestConfigurationAndResponseHandler(options, nil) if err != nil { return err } @@ -91,7 +116,7 @@ func (gc *GraphConnector) setTenantUsers() error { err = support.WrapAndAppend("general access", errors.New("connector failed: No access"), err) return err } - userIterator, err := msgraphgocore.NewPageIterator(response, &gc.adapter, models.CreateUserCollectionResponseFromDiscriminatorValue) + userIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, models.CreateUserCollectionResponseFromDiscriminatorValue) if err != nil { return err } @@ -99,7 +124,7 @@ func (gc *GraphConnector) setTenantUsers() error { callbackFunc := func(userItem interface{}) bool { user, ok := userItem.(models.Userable) if !ok { - err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), errors.New("user iteration failure"), err) + err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), errors.New("user iteration failure"), err) return true } gc.Users[*user.GetMail()] = *user.GetId() @@ -107,7 +132,7 @@ func (gc *GraphConnector) setTenantUsers() error { } iterateError = userIterator.Iterate(callbackFunc) if iterateError != nil { - err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err) + err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, err) } return err } @@ -171,17 +196,16 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s } dcs, err := gc.serializeMessages(ctx, user) if err != nil { - errs = support.WrapAndAppend(user, err, errs) + return nil, support.WrapAndAppend(user, err, errs) } + if len(dcs) > 0 { - collections = append(collections, dcs...) + for _, collection := range dcs { + collections = append(collections, collection) + } } } } - - // TODO replace with completion of Issue 124: - - //TODO: Retry handler to convert return: (DataCollection, error) return collections, errs } @@ -235,7 +259,7 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollect clone.SetSingleValueExtendedProperties(svlep) draft := false clone.SetIsDraft(&draft) - sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone) + sentMessage, err := gc.graphService.client.UsersById(user).MailFoldersById(address).Messages().Post(clone) if err != nil { errs = support.WrapAndAppend( data.UUID()+": "+support.ConnectorStackErrorTrace(err), @@ -263,13 +287,13 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollect // serializeMessages: Temp Function as place Holder until Collections have been added // to the GraphConnector struct. -func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([]DataCollection, error) { +func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*ExchangeDataCollection, error) { options := optionsForMessageSnapshot() - response, err := gc.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil) + response, err := gc.graphService.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil) if err != nil { return nil, err } - pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue) + pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue) if err != nil { return nil, err } @@ -277,7 +301,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ callbackFunc := func(messageItem any) bool { message, ok := messageItem.(models.Messageable) if !ok { - err = support.WrapAndAppendf(gc.adapter.GetBaseUrl(), errors.New("message iteration failure"), err) + err = support.WrapAndAppendf(gc.graphService.adapter.GetBaseUrl(), errors.New("message iteration failure"), err) return true } // Saving to messages to list. Indexed by folder @@ -286,48 +310,89 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ } iterateError := pageIterator.Iterate(callbackFunc) if iterateError != nil { - err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err) + err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, err) } if err != nil { return nil, err // return error if snapshot is incomplete } - // Time to create Exchange data Holder - collections := make([]DataCollection, 0) - objectWriter := kw.NewJsonSerializationWriter() - var errs error - var attemptedItems, success int + // Create collection of ExchangeDataCollection and create data Holder + collections := make(map[string]*ExchangeDataCollection) - for aFolder, tasks := range tasklist { + for aFolder := range tasklist { // prep the items for handoff to the backup consumer edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder}) + collections[aFolder] = &edc + } + + if len(collections) == 0 { + if len(tasklist) != 0 { + // Below error message needs revising. Assumption is that it should always + // find both items to fetch and a DataCollection to put them in + return nil, support.WrapAndAppend( + user, errors.New("found items but no directories"), err) + } + // return empty collection when no items found + return nil, err + } + service, err := gc.createService() + if err != nil { + return nil, support.WrapAndAppend(user, err, err) + } + // async call to populate + go service.populateFromTaskList(ctx, tasklist, collections, gc.statusCh) + gc.incrementAwaitingMessages() + + return collections, err +} + +// populateFromTaskList async call to fill DataCollection via channel implementation +func (sc *graphService) populateFromTaskList( + context context.Context, + tasklist TaskList, + collections map[string]*ExchangeDataCollection, + statusChannel chan<- *support.ConnectorOperationStatus, +) { + var errs error + var attemptedItems, success int + objectWriter := kw.NewJsonSerializationWriter() + //Todo this has to return all the errors in the status + for aFolder, tasks := range tasklist { + // Get the same folder + edc := collections[aFolder] + if edc == nil { + for _, task := range tasks { + errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs) + } + continue + } + for _, task := range tasks { - response, err := gc.client.UsersById(user).MessagesById(task).Get() + response, err := sc.client.UsersById(edc.user).MessagesById(task).Get() if err != nil { details := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) + errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) continue } - err = gc.messageToDataCollection(ctx, objectWriter, edc, response, user) + err = messageToDataCollection(&sc.client, context, objectWriter, edc.data, response, edc.user) if err != nil { - errs = support.WrapAndAppendf(user, err, errs) + errs = support.WrapAndAppendf(edc.user, err, errs) } } edc.FinishPopulation() attemptedItems += len(tasks) success += edc.Length() - collections = append(collections, &edc) } - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs) - gc.SetStatus(*status) - logger.Ctx(ctx).Debugw(gc.PrintableStatus()) - return collections, errs + status := support.CreateStatus(context, support.Backup, attemptedItems, success, len(tasklist), errs) + logger.Ctx(context).Debug(status.String()) + statusChannel <- status } -func (gc *GraphConnector) messageToDataCollection( +func messageToDataCollection( + client *msgraphsdk.GraphServiceClient, ctx context.Context, objectWriter *kw.JsonSerializationWriter, - edc ExchangeDataCollection, + dataChannel chan<- DataStream, message models.Messageable, user string, ) error { @@ -344,7 +409,7 @@ func (gc *GraphConnector) messageToDataCollection( // getting all the attachments might take a couple attempts due to filesize var retriesErr error for count := 0; count < numberOfRetries; count++ { - attached, err := gc.client. + attached, err := client. UsersById(user). MessagesById(*aMessage.GetId()). Attachments(). @@ -371,9 +436,8 @@ func (gc *GraphConnector) messageToDataCollection( return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) } if byteArray != nil { - edc.PopulateCollection(&ExchangeData{id: *aMessage.GetId(), message: byteArray}) + dataChannel <- &ExchangeData{id: *aMessage.GetId(), message: byteArray} } - return nil } @@ -382,6 +446,16 @@ func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) { gc.status = &cos } +// AwaitStatus updates status field based on item within statusChannel. +func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { + if gc.awaitingMessages > 0 { + gc.status = <-gc.statusCh + atomic.AddInt32(&gc.awaitingMessages, -1) + return gc.status + } + return nil +} + // Status returns the current status of the graphConnector operaion. func (gc *GraphConnector) Status() *support.ConnectorOperationStatus { return gc.status @@ -395,6 +469,10 @@ func (gc *GraphConnector) PrintableStatus() string { return gc.status.String() } +func (gc *GraphConnector) incrementAwaitingMessages() { + atomic.AddInt32(&gc.awaitingMessages, 1) +} + // IsRecoverableError returns true iff error is a RecoverableGCEerror func IsRecoverableError(e error) bool { var recoverable support.RecoverableGCError diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 009bd4a16..d14fc6732 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -59,11 +59,15 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_setTenantUsers() func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataCollection() { sel := selectors.NewExchangeBackup() - sel.Include(sel.Users("lidiah@8qzvrj.onmicrosoft.com")) + sel.Include(sel.Users("meganb@8qzvrj.onmicrosoft.com")) collectionList, err := suite.connector.ExchangeDataCollection(context.Background(), sel.Selector) assert.NotNil(suite.T(), collectionList, "collection list") assert.Nil(suite.T(), err) - assert.NotNil(suite.T(), suite.connector.status, "connector status") + assert.True(suite.T(), suite.connector.awaitingMessages > 0) + assert.Nil(suite.T(), suite.connector.status) + status := suite.connector.AwaitStatus() + assert.NotNil(suite.T(), status, "status not blocking on async call") + exchangeData := collectionList[0] suite.Greater(len(exchangeData.FullPath()), 2) } diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 9fa7869f2..fc97d16a2 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -90,7 +90,6 @@ func (op *BackupOperation) Run(ctx context.Context) error { stats.readErr = err return errors.Wrap(err, "retrieving service data") } - stats.gc = gc.Status() // hand the results to the consumer var details *backup.Details @@ -99,6 +98,7 @@ func (op *BackupOperation) Run(ctx context.Context) error { stats.writeErr = err return errors.Wrap(err, "backing up service data") } + stats.gc = gc.AwaitStatus() err = op.createBackupModels(ctx, stats.k.SnapshotID, details) if err != nil {