From e6f5b4515d623275c744c580f0223aa072b14469 Mon Sep 17 00:00:00 2001 From: Danny Date: Fri, 29 Jul 2022 12:35:27 -0400 Subject: [PATCH] GC: `exchange.Collection` PopulateFunction refactor without TaskList (#441) exchange.PopulateFunc executed without a tasklist. --- .../exchange/exchange_data_collection.go | 42 +++++++++---------- src/internal/connector/graph_connector.go | 7 ++-- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 417830791..bba1335b8 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -68,40 +68,36 @@ func (eoc *Collection) PopulateCollection(newData *Stream) { } // populateFromTaskList async call to fill DataCollection via channel implementation -func PopulateFromCollection( +func (edc *Collection) PopulateFromCollection( ctx context.Context, service graph.Service, - collections map[string]*Collection, statusChannel chan<- *support.ConnectorOperationStatus, ) { var errs error var attemptedItems, success int objectWriter := kw.NewJsonSerializationWriter() - for _, edc := range collections { - - for _, task := range edc.jobs { - response, err := service.Client().UsersById(edc.user).MessagesById(task).Get() - if err != nil { - details := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) - continue - } - err = messageToDataCollection(service.Client(), ctx, objectWriter, edc.data, response, edc.user) - success++ - if err != nil { - errs = support.WrapAndAppendf(edc.user, err, errs) - success-- - } - if errs != nil && service.ErrPolicy() { - break - } + for _, task := range edc.jobs { + response, err := service.Client().UsersById(edc.user).MessagesById(task).Get() + if err != nil { + details := support.ConnectorStackErrorTrace(err) + errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve item %s; details %s", task, details), errs) + continue + } + err = messageToDataCollection(service.Client(), ctx, objectWriter, edc.data, response, edc.user) + success++ + if err != nil { + errs = support.WrapAndAppendf(edc.user, err, errs) + success-- + } + if errs != nil && service.ErrPolicy() { + break } - close(edc.data) - attemptedItems += len(edc.jobs) } + close(edc.data) + attemptedItems += len(edc.jobs) - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(collections), errs) + status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs) logger.Ctx(ctx).Debug(status.String()) statusChannel <- status } diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 0388e17eb..42d4ece0f 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -348,9 +348,10 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m if err != nil { return nil, support.WrapAndAppend(user, err, err) } - // async call to populate - go exchange.PopulateFromCollection(ctx, service, collections, gc.statusCh) - gc.incrementAwaitingMessages() + for _, edc := range collections { + go edc.PopulateFromCollection(ctx, service, gc.statusCh) + gc.incrementAwaitingMessages() + } return collections, err }