distinguish added and removed jobs (#1952)

## Description

In order to track deleted items, we need to build
separate job sets when aggregating item IDs:
one for added items, and one for removed.

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🐛 Bugfix

## Issue(s)

* #1777

## Test Plan

- [x] 💪 Manual
This commit is contained in:
Keepers 2022-12-23 16:53:57 -07:00 committed by GitHub
parent 843772ec50
commit 442686e51e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 78 deletions

View File

@ -50,9 +50,12 @@ type Collection struct {
// M365 user
user string // M365 user
data chan data.Stream
// jobs represents items from the inventory of M365 objectIds whose information
// is desired to be sent through the data channel for eventual storage
jobs []string
// added is a list of existing item IDs that were added to a container
added []string
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed []string
// service - client/adapter pair used to access M365 back store
service graph.Servicer
@ -95,7 +98,8 @@ func NewCollection(
data: make(chan data.Stream, collectionChannelBufferSize),
doNotMergeItems: doNotMergeItems,
fullPath: curr,
jobs: make([]string, 0),
added: make([]string, 0),
removed: make([]string, 0),
prevPath: prev,
service: service,
state: stateOf(prev, curr),
@ -122,15 +126,10 @@ func stateOf(prev, curr path.Path) data.CollectionState {
return data.NotMovedState
}
// AddJob appends additional objectID to structure's jobs field
func (col *Collection) AddJob(objID string) {
col.jobs = append(col.jobs, objID)
}
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (col *Collection) Items() <-chan data.Stream {
go col.populateByOptionIdentifier(context.TODO())
go col.streamItems(context.TODO())
return col.data
}
@ -139,11 +138,11 @@ func (col *Collection) Items() <-chan data.Stream {
func GetQueryAndSerializeFunc(optID optionIdentifier) (GraphRetrievalFunc, GraphSerializeFunc) {
switch optID {
case contacts:
return RetrieveContactDataForUser, contactToDataCollection
return RetrieveContactDataForUser, serializeAndStreamContact
case events:
return RetrieveEventDataForUser, eventToDataCollection
return RetrieveEventDataForUser, serializeAndStreamEvent
case messages:
return RetrieveMessageDataForUser, messageToDataCollection
return RetrieveMessageDataForUser, serializeAndStreamMessage
// Unsupported options returns nil, nil
default:
return nil, nil
@ -169,9 +168,13 @@ func (col Collection) DoNotMergeItems() bool {
return col.doNotMergeItems
}
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the jobs field. data channel is closed by this function
func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
// ---------------------------------------------------------------------------
// Items() channel controller
// ---------------------------------------------------------------------------
// streamItems is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the added field. data channel is closed by this function
func (col *Collection) streamItems(ctx context.Context) {
var (
errs error
success int64
@ -206,15 +209,40 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
errs = support.WrapAndAppend(user, err, errs)
}
for _, identifier := range col.jobs {
if col.ctrl.FailFast && errs != nil {
break
}
// delete all removed items
for _, id := range col.removed {
semaphoreCh <- struct{}{}
wg.Add(1)
go func(identifier string) {
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
col.data <- &Stream{
id: id,
modTime: time.Now().UTC(), // removed items have no modTime entry.
deleted: true,
}
atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, 0)
colProgress <- struct{}{}
}(id)
}
// add any new items
for _, id := range col.added {
if col.ctrl.FailFast && errs != nil {
break
}
semaphoreCh <- struct{}{}
wg.Add(1)
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
@ -224,7 +252,7 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
)
for i := 1; i <= numberOfRetries; i++ {
response, err = query(ctx, col.service, user, identifier)
response, err = query(ctx, col.service, user, id)
if err == nil {
break
}
@ -255,7 +283,7 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
atomic.AddInt64(&totalBytes, int64(byteCount))
colProgress <- struct{}{}
}(identifier)
}(id)
}
wg.Wait()
@ -265,7 +293,7 @@ func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
// and to send the status update through the channel.
func (col *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) {
close(col.data)
attempted := len(col.jobs)
attempted := len(col.added) + len(col.removed)
status := support.CreateStatus(ctx,
support.Backup,
1,
@ -285,7 +313,7 @@ type modTimer interface {
}
func getModTime(mt modTimer) time.Time {
res := time.Now()
res := time.Now().UTC()
if t := mt.GetLastModifiedDateTime(); t != nil {
res = *t
@ -305,9 +333,9 @@ type GraphSerializeFunc func(
user string,
) (int, error)
// eventToDataCollection is a GraphSerializeFunc used to serialize models.Eventable objects into
// serializeAndStreamEvent is a GraphSerializeFunc used to serialize models.Eventable objects into
// data.Stream objects. Returns an error the process finishes unsuccessfully.
func eventToDataCollection(
func serializeAndStreamEvent(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
@ -373,8 +401,8 @@ func eventToDataCollection(
return len(byteArray), nil
}
// contactToDataCollection is a GraphSerializeFunc for models.Contactable
func contactToDataCollection(
// serializeAndStreamContact is a GraphSerializeFunc for models.Contactable
func serializeAndStreamContact(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
@ -399,24 +427,20 @@ func contactToDataCollection(
return 0, support.WrapAndAppend(*contact.GetId(), err, nil)
}
addtl := contact.GetAdditionalData()
_, removed := addtl[graph.AddtlDataRemoved]
if len(bs) > 0 || removed {
if len(bs) > 0 {
dataChannel <- &Stream{
id: *contact.GetId(),
message: bs,
info: ContactInfo(contact, int64(len(bs))),
modTime: getModTime(contact),
deleted: removed,
}
}
return len(bs), nil
}
// messageToDataCollection is the GraphSerializeFunc for models.Messageable
func messageToDataCollection(
// serializeAndStreamMessage is the GraphSerializeFunc for models.Messageable
func serializeAndStreamMessage(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
@ -468,15 +492,11 @@ func messageToDataCollection(
return 0, support.SetNonRecoverableError(err)
}
addtl := msg.GetAdditionalData()
_, removed := addtl[graph.AddtlDataRemoved]
dataChannel <- &Stream{
id: *msg.GetId(),
message: bs,
info: MessageInfo(msg, int64(len(bs))),
modTime: getModTime(msg),
deleted: removed,
}
return len(bs), nil

View File

@ -92,21 +92,6 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchange
suite.Equal(fullPath, edc.FullPath())
}
func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() {
eoc := Collection{
user: "Dexter",
fullPath: nil,
}
suite.Zero(len(eoc.jobs))
shopping := []string{"tomotoes", "potatoes", "pasta", "ice tea"}
for _, item := range shopping {
eoc.AddJob(item)
}
suite.Equal(len(shopping), len(eoc.jobs))
}
func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() {
fooP, err := path.Builder{}.
Append("foo").

View File

@ -133,7 +133,7 @@ func (suite *ExchangeIteratorSuite) TestCollectionFunctions() {
errUpdater := func(id string, err error) {
errs = support.WrapAndAppend(id, err, errs)
}
// callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[]
// callbackFunc iterates through all models.Messageable and fills exchange.Collection.added[]
// with corresponding item IDs. New collections are created for each directory
callbackFunc := test.iterativeFunction(
collections, "", errUpdater)

View File

@ -95,7 +95,7 @@ func filterContainersAndFillCollections(
}
}
jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
added, removed, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
if err != nil {
if graph.IsErrDeletedInFlight(err) == nil {
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
@ -127,7 +127,8 @@ func filterContainersAndFillCollections(
)
collections[cID] = &edc
edc.jobs = append(edc.jobs, jobs...)
edc.added = append(edc.added, added...)
edc.removed = append(edc.removed, removed...)
// add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation.
@ -278,7 +279,7 @@ type FetchIDFunc func(
ctx context.Context,
gs graph.Servicer,
user, containerID, oldDeltaToken string,
) ([]string, deltaUpdate, error)
) ([]string, []string, deltaUpdate, error)
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
switch category {
@ -302,7 +303,7 @@ func FetchEventIDsFromCalendar(
ctx context.Context,
gs graph.Servicer,
user, calendarID, oldDelta string,
) ([]string, deltaUpdate, error) {
) ([]string, []string, deltaUpdate, error) {
var (
errs *multierror.Error
ids []string
@ -310,7 +311,7 @@ func FetchEventIDsFromCalendar(
options, err := optionsForEventsByCalendar([]string{"id"})
if err != nil {
return nil, deltaUpdate{}, err
return nil, nil, deltaUpdate{}, err
}
builder := gs.Client().
@ -322,10 +323,10 @@ func FetchEventIDsFromCalendar(
resp, err := builder.Get(ctx, options)
if err != nil {
if err := graph.IsErrDeletedInFlight(err); err != nil {
return nil, deltaUpdate{}, err
return nil, nil, deltaUpdate{}, err
}
return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
return nil, nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
@ -351,7 +352,7 @@ func FetchEventIDsFromCalendar(
}
// Events don't have a delta endpoint so just return an empty string.
return ids, deltaUpdate{}, errs.ErrorOrNil()
return ids, nil, deltaUpdate{}, errs.ErrorOrNil()
}
// ---------------------------------------------------------------------------
@ -364,17 +365,18 @@ func FetchContactIDsFromDirectory(
ctx context.Context,
gs graph.Servicer,
user, directoryID, oldDelta string,
) ([]string, deltaUpdate, error) {
) ([]string, []string, deltaUpdate, error) {
var (
errs *multierror.Error
ids []string
removedIDs []string
deltaURL string
resetDelta bool
)
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
if err != nil {
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
return nil, nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
}
getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error {
@ -403,7 +405,11 @@ func FetchContactIDsFromDirectory(
continue
}
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
ids = append(ids, *item.GetId())
} else {
removedIDs = append(removedIDs, *item.GetId())
}
}
delta := resp.GetOdataDeltaLink()
@ -426,12 +432,12 @@ func FetchContactIDsFromDirectory(
err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()))
// happy path
if err == nil {
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
return ids, removedIDs, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
}
// only return on error if it is NOT a delta issue.
// otherwise we'll retry the call with the regular builder
if graph.IsErrInvalidDelta(err) == nil {
return nil, deltaUpdate{}, err
return nil, nil, deltaUpdate{}, err
}
resetDelta = true
@ -445,10 +451,10 @@ func FetchContactIDsFromDirectory(
Delta()
if err := getIDs(builder); err != nil {
return nil, deltaUpdate{}, err
return nil, nil, deltaUpdate{}, err
}
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
return ids, removedIDs, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
}
// ---------------------------------------------------------------------------
@ -461,17 +467,18 @@ func FetchMessageIDsFromDirectory(
ctx context.Context,
gs graph.Servicer,
user, directoryID, oldDelta string,
) ([]string, deltaUpdate, error) {
) ([]string, []string, deltaUpdate, error) {
var (
errs *multierror.Error
ids []string
removedIDs []string
deltaURL string
resetDelta bool
)
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
if err != nil {
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
return nil, nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
}
getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error {
@ -500,7 +507,11 @@ func FetchMessageIDsFromDirectory(
continue
}
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
ids = append(ids, *item.GetId())
} else {
removedIDs = append(removedIDs, *item.GetId())
}
}
delta := resp.GetOdataDeltaLink()
@ -523,12 +534,12 @@ func FetchMessageIDsFromDirectory(
err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()))
// happy path
if err == nil {
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
return ids, removedIDs, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
}
// only return on error if it is NOT a delta issue.
// otherwise we'll retry the call with the regular builder
if graph.IsErrInvalidDelta(err) == nil {
return nil, deltaUpdate{}, err
return nil, nil, deltaUpdate{}, err
}
resetDelta = true
@ -542,8 +553,8 @@ func FetchMessageIDsFromDirectory(
Delta()
if err := getIDs(builder); err != nil {
return nil, deltaUpdate{}, err
return nil, nil, deltaUpdate{}, err
}
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
return ids, removedIDs, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
}