GC: exchange.Collection PopulateFunction refactor without TaskList (#441)

exchange.PopulateFunc executed without a tasklist.
This commit is contained in:
Danny 2022-07-29 12:35:27 -04:00 committed by GitHub
parent 9eb6a5dd0c
commit e6f5b4515d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 26 deletions

View File

@ -68,40 +68,36 @@ func (eoc *Collection) PopulateCollection(newData *Stream) {
} }
// populateFromTaskList async call to fill DataCollection via channel implementation // populateFromTaskList async call to fill DataCollection via channel implementation
func PopulateFromCollection( func (edc *Collection) PopulateFromCollection(
ctx context.Context, ctx context.Context,
service graph.Service, service graph.Service,
collections map[string]*Collection,
statusChannel chan<- *support.ConnectorOperationStatus, statusChannel chan<- *support.ConnectorOperationStatus,
) { ) {
var errs error var errs error
var attemptedItems, success int var attemptedItems, success int
objectWriter := kw.NewJsonSerializationWriter() objectWriter := kw.NewJsonSerializationWriter()
for _, edc := range collections { for _, task := range edc.jobs {
response, err := service.Client().UsersById(edc.user).MessagesById(task).Get()
for _, task := range edc.jobs { if err != nil {
response, err := service.Client().UsersById(edc.user).MessagesById(task).Get() details := support.ConnectorStackErrorTrace(err)
if err != nil { errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve item %s; details %s", task, details), errs)
details := support.ConnectorStackErrorTrace(err) continue
errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) }
continue err = messageToDataCollection(service.Client(), ctx, objectWriter, edc.data, response, edc.user)
} success++
err = messageToDataCollection(service.Client(), ctx, objectWriter, edc.data, response, edc.user) if err != nil {
success++ errs = support.WrapAndAppendf(edc.user, err, errs)
if err != nil { success--
errs = support.WrapAndAppendf(edc.user, err, errs) }
success-- if errs != nil && service.ErrPolicy() {
} break
if errs != nil && service.ErrPolicy() {
break
}
} }
close(edc.data)
attemptedItems += len(edc.jobs)
} }
close(edc.data)
attemptedItems += len(edc.jobs)
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(collections), errs) status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs)
logger.Ctx(ctx).Debug(status.String()) logger.Ctx(ctx).Debug(status.String())
statusChannel <- status statusChannel <- status
} }

View File

@ -348,9 +348,10 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
if err != nil { if err != nil {
return nil, support.WrapAndAppend(user, err, err) return nil, support.WrapAndAppend(user, err, err)
} }
// async call to populate for _, edc := range collections {
go exchange.PopulateFromCollection(ctx, service, collections, gc.statusCh) go edc.PopulateFromCollection(ctx, service, gc.statusCh)
gc.incrementAwaitingMessages() gc.incrementAwaitingMessages()
}
return collections, err return collections, err
} }