diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 79fe67410..e965919a4 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -50,9 +50,12 @@ type Collection struct { // M365 user user string // M365 user data chan data.Stream - // jobs represents items from the inventory of M365 objectIds whose information - // is desired to be sent through the data channel for eventual storage - jobs []string + + // added is a list of existing item IDs that were added to a container + added []string + // removed is a list of item IDs that were deleted from, or moved out, of a container + removed []string + // service - client/adapter pair used to access M365 back store service graph.Servicer @@ -95,7 +98,8 @@ func NewCollection( data: make(chan data.Stream, collectionChannelBufferSize), doNotMergeItems: doNotMergeItems, fullPath: curr, - jobs: make([]string, 0), + added: make([]string, 0), + removed: make([]string, 0), prevPath: prev, service: service, state: stateOf(prev, curr), @@ -122,15 +126,10 @@ func stateOf(prev, curr path.Path) data.CollectionState { return data.NotMovedState } -// AddJob appends additional objectID to structure's jobs field -func (col *Collection) AddJob(objID string) { - col.jobs = append(col.jobs, objID) -} - // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel func (col *Collection) Items() <-chan data.Stream { - go col.populateByOptionIdentifier(context.TODO()) + go col.streamItems(context.TODO()) return col.data } @@ -139,11 +138,11 @@ func (col *Collection) Items() <-chan data.Stream { func GetQueryAndSerializeFunc(optID optionIdentifier) (GraphRetrievalFunc, GraphSerializeFunc) { switch optID { case contacts: - return RetrieveContactDataForUser, contactToDataCollection + return RetrieveContactDataForUser, serializeAndStreamContact case events: - return RetrieveEventDataForUser, eventToDataCollection + return RetrieveEventDataForUser, serializeAndStreamEvent case messages: - return RetrieveMessageDataForUser, messageToDataCollection + return RetrieveMessageDataForUser, serializeAndStreamMessage // Unsupported options returns nil, nil default: return nil, nil @@ -169,9 +168,13 @@ func (col Collection) DoNotMergeItems() bool { return col.doNotMergeItems } -// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize -// all the M365IDs defined in the jobs field. data channel is closed by this function -func (col *Collection) populateByOptionIdentifier(ctx context.Context) { +// --------------------------------------------------------------------------- +// Items() channel controller +// --------------------------------------------------------------------------- + +// streamItems is a utility function that uses col.collectionType to be able to serialize +// all the M365IDs defined in the added field. data channel is closed by this function +func (col *Collection) streamItems(ctx context.Context) { var ( errs error success int64 @@ -206,15 +209,40 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) { errs = support.WrapAndAppend(user, err, errs) } - for _, identifier := range col.jobs { - if col.ctrl.FailFast && errs != nil { - break - } + // delete all removed items + for _, id := range col.removed { semaphoreCh <- struct{}{} wg.Add(1) - go func(identifier string) { + go func(id string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + col.data <- &Stream{ + id: id, + modTime: time.Now().UTC(), // removed items have no modTime entry. + deleted: true, + } + + atomic.AddInt64(&success, 1) + atomic.AddInt64(&totalBytes, 0) + + colProgress <- struct{}{} + }(id) + } + + // add any new items + for _, id := range col.added { + if col.ctrl.FailFast && errs != nil { + break + } + + semaphoreCh <- struct{}{} + + wg.Add(1) + + go func(id string) { defer wg.Done() defer func() { <-semaphoreCh }() @@ -224,7 +252,7 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) { ) for i := 1; i <= numberOfRetries; i++ { - response, err = query(ctx, col.service, user, identifier) + response, err = query(ctx, col.service, user, id) if err == nil { break } @@ -255,7 +283,7 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) { atomic.AddInt64(&totalBytes, int64(byteCount)) colProgress <- struct{}{} - }(identifier) + }(id) } wg.Wait() @@ -265,7 +293,7 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) { // and to send the status update through the channel. func (col *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) { close(col.data) - attempted := len(col.jobs) + attempted := len(col.added) + len(col.removed) status := support.CreateStatus(ctx, support.Backup, 1, @@ -285,7 +313,7 @@ type modTimer interface { } func getModTime(mt modTimer) time.Time { - res := time.Now() + res := time.Now().UTC() if t := mt.GetLastModifiedDateTime(); t != nil { res = *t @@ -305,9 +333,9 @@ type GraphSerializeFunc func( user string, ) (int, error) -// eventToDataCollection is a GraphSerializeFunc used to serialize models.Eventable objects into +// serializeAndStreamEvent is a GraphSerializeFunc used to serialize models.Eventable objects into // data.Stream objects. Returns an error the process finishes unsuccessfully. -func eventToDataCollection( +func serializeAndStreamEvent( ctx context.Context, client *msgraphsdk.GraphServiceClient, objectWriter *kioser.JsonSerializationWriter, @@ -373,8 +401,8 @@ func eventToDataCollection( return len(byteArray), nil } -// contactToDataCollection is a GraphSerializeFunc for models.Contactable -func contactToDataCollection( +// serializeAndStreamContact is a GraphSerializeFunc for models.Contactable +func serializeAndStreamContact( ctx context.Context, client *msgraphsdk.GraphServiceClient, objectWriter *kioser.JsonSerializationWriter, @@ -399,24 +427,20 @@ func contactToDataCollection( return 0, support.WrapAndAppend(*contact.GetId(), err, nil) } - addtl := contact.GetAdditionalData() - _, removed := addtl[graph.AddtlDataRemoved] - - if len(bs) > 0 || removed { + if len(bs) > 0 { dataChannel <- &Stream{ id: *contact.GetId(), message: bs, info: ContactInfo(contact, int64(len(bs))), modTime: getModTime(contact), - deleted: removed, } } return len(bs), nil } -// messageToDataCollection is the GraphSerializeFunc for models.Messageable -func messageToDataCollection( +// serializeAndStreamMessage is the GraphSerializeFunc for models.Messageable +func serializeAndStreamMessage( ctx context.Context, client *msgraphsdk.GraphServiceClient, objectWriter *kioser.JsonSerializationWriter, @@ -468,15 +492,11 @@ func messageToDataCollection( return 0, support.SetNonRecoverableError(err) } - addtl := msg.GetAdditionalData() - _, removed := addtl[graph.AddtlDataRemoved] - dataChannel <- &Stream{ id: *msg.GetId(), message: bs, info: MessageInfo(msg, int64(len(bs))), modTime: getModTime(msg), - deleted: removed, } return len(bs), nil diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index 9547e39dd..62b1c48f7 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -92,21 +92,6 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchange suite.Equal(fullPath, edc.FullPath()) } -func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() { - eoc := Collection{ - user: "Dexter", - fullPath: nil, - } - suite.Zero(len(eoc.jobs)) - - shopping := []string{"tomotoes", "potatoes", "pasta", "ice tea"} - for _, item := range shopping { - eoc.AddJob(item) - } - - suite.Equal(len(shopping), len(eoc.jobs)) -} - func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() { fooP, err := path.Builder{}. Append("foo"). diff --git a/src/internal/connector/exchange/iterators_test.go b/src/internal/connector/exchange/iterators_test.go index 913115ed5..5603e92e8 100644 --- a/src/internal/connector/exchange/iterators_test.go +++ b/src/internal/connector/exchange/iterators_test.go @@ -133,7 +133,7 @@ func (suite *ExchangeIteratorSuite) TestCollectionFunctions() { errUpdater := func(id string, err error) { errs = support.WrapAndAppend(id, err, errs) } - // callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[] + // callbackFunc iterates through all models.Messageable and fills exchange.Collection.added[] // with corresponding item IDs. New collections are created for each directory callbackFunc := test.iterativeFunction( collections, "", errUpdater) diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 5610c42d4..ae416c997 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -95,7 +95,7 @@ func filterContainersAndFillCollections( } } - jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta) + added, removed, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta) if err != nil { if graph.IsErrDeletedInFlight(err) == nil { errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) @@ -127,7 +127,8 @@ func filterContainersAndFillCollections( ) collections[cID] = &edc - edc.jobs = append(edc.jobs, jobs...) + edc.added = append(edc.added, added...) + edc.removed = append(edc.removed, removed...) // add the current path for the container ID to be used in the next backup // as the "previous path", for reference in case of a rename or relocation. @@ -278,7 +279,7 @@ type FetchIDFunc func( ctx context.Context, gs graph.Servicer, user, containerID, oldDeltaToken string, -) ([]string, deltaUpdate, error) +) ([]string, []string, deltaUpdate, error) func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { switch category { @@ -302,7 +303,7 @@ func FetchEventIDsFromCalendar( ctx context.Context, gs graph.Servicer, user, calendarID, oldDelta string, -) ([]string, deltaUpdate, error) { +) ([]string, []string, deltaUpdate, error) { var ( errs *multierror.Error ids []string @@ -310,7 +311,7 @@ func FetchEventIDsFromCalendar( options, err := optionsForEventsByCalendar([]string{"id"}) if err != nil { - return nil, deltaUpdate{}, err + return nil, nil, deltaUpdate{}, err } builder := gs.Client(). @@ -322,10 +323,10 @@ func FetchEventIDsFromCalendar( resp, err := builder.Get(ctx, options) if err != nil { if err := graph.IsErrDeletedInFlight(err); err != nil { - return nil, deltaUpdate{}, err + return nil, nil, deltaUpdate{}, err } - return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -351,7 +352,7 @@ func FetchEventIDsFromCalendar( } // Events don't have a delta endpoint so just return an empty string. - return ids, deltaUpdate{}, errs.ErrorOrNil() + return ids, nil, deltaUpdate{}, errs.ErrorOrNil() } // --------------------------------------------------------------------------- @@ -364,17 +365,18 @@ func FetchContactIDsFromDirectory( ctx context.Context, gs graph.Servicer, user, directoryID, oldDelta string, -) ([]string, deltaUpdate, error) { +) ([]string, []string, deltaUpdate, error) { var ( errs *multierror.Error ids []string + removedIDs []string deltaURL string resetDelta bool ) options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"}) if err != nil { - return nil, deltaUpdate{}, errors.Wrap(err, "getting query options") + return nil, nil, deltaUpdate{}, errors.Wrap(err, "getting query options") } getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error { @@ -403,7 +405,11 @@ func FetchContactIDsFromDirectory( continue } - ids = append(ids, *item.GetId()) + if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { + ids = append(ids, *item.GetId()) + } else { + removedIDs = append(removedIDs, *item.GetId()) + } } delta := resp.GetOdataDeltaLink() @@ -426,12 +432,12 @@ func FetchContactIDsFromDirectory( err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter())) // happy path if err == nil { - return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return ids, removedIDs, deltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. // otherwise we'll retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { - return nil, deltaUpdate{}, err + return nil, nil, deltaUpdate{}, err } resetDelta = true @@ -445,10 +451,10 @@ func FetchContactIDsFromDirectory( Delta() if err := getIDs(builder); err != nil { - return nil, deltaUpdate{}, err + return nil, nil, deltaUpdate{}, err } - return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return ids, removedIDs, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } // --------------------------------------------------------------------------- @@ -461,17 +467,18 @@ func FetchMessageIDsFromDirectory( ctx context.Context, gs graph.Servicer, user, directoryID, oldDelta string, -) ([]string, deltaUpdate, error) { +) ([]string, []string, deltaUpdate, error) { var ( errs *multierror.Error ids []string + removedIDs []string deltaURL string resetDelta bool ) options, err := optionsForFolderMessagesDelta([]string{"isRead"}) if err != nil { - return nil, deltaUpdate{}, errors.Wrap(err, "getting query options") + return nil, nil, deltaUpdate{}, errors.Wrap(err, "getting query options") } getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error { @@ -500,7 +507,11 @@ func FetchMessageIDsFromDirectory( continue } - ids = append(ids, *item.GetId()) + if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { + ids = append(ids, *item.GetId()) + } else { + removedIDs = append(removedIDs, *item.GetId()) + } } delta := resp.GetOdataDeltaLink() @@ -523,12 +534,12 @@ func FetchMessageIDsFromDirectory( err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())) // happy path if err == nil { - return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return ids, removedIDs, deltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. // otherwise we'll retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { - return nil, deltaUpdate{}, err + return nil, nil, deltaUpdate{}, err } resetDelta = true @@ -542,8 +553,8 @@ func FetchMessageIDsFromDirectory( Delta() if err := getIDs(builder); err != nil { - return nil, deltaUpdate{}, err + return nil, nil, deltaUpdate{}, err } - return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return ids, removedIDs, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() }