diff --git a/src/internal/connector/exchange/service_functions.go b/src/internal/connector/exchange/service_functions.go index 99be26ed6..4b59bd9c1 100644 --- a/src/internal/connector/exchange/service_functions.go +++ b/src/internal/connector/exchange/service_functions.go @@ -176,7 +176,9 @@ func populateExchangeContainerResolver( return res, nil } -func pathAndMatch( +// Returns true if the container passes the scope comparison and should be included. +// Also returns the path representing the directory. +func includeContainer( qp graph.QueryParams, c graph.CachedContainer, scope selectors.ExchangeScope, @@ -213,6 +215,6 @@ func pathAndMatch( case path.EventsCategory: return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory) default: - return nil, false + return dirPath, false } } diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index e1b45396b..85190de47 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -7,7 +7,6 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/microsoftgraph/msgraph-sdk-go/models" - "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" msuser "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/pkg/errors" @@ -20,22 +19,12 @@ import ( "github.com/alcionai/corso/src/pkg/selectors" ) -const ( - errEmailFolderNotFound = "ErrorSyncFolderNotFound" - errItemNotFound = "ErrorItemNotFound" -) - -var errContainerDeleted = errors.New("container deleted") - -func hasErrorCode(err error, code string) bool { - var oDataError *odataerrors.ODataError - if !errors.As(err, &oDataError) { - return false - } - - return oDataError.GetError() != nil && - oDataError.GetError().GetCode() != nil && - *oDataError.GetError().GetCode() == code +// carries details about delta retrieval in aggregators +type deltaUpdate struct { + // the deltaLink itself + url string + // true if the old delta was marked as invalid + reset bool } // filterContainersAndFillCollections is a utility function @@ -60,8 +49,8 @@ func filterContainersAndFillCollections( deltaURLs = map[string]string{} currPaths = map[string]string{} // copy of previousPaths. any folder found in the resolver get - // deleted from this map, leaving only the deleted maps behind - deletedPaths = map[string]DeltaPath{} + // deleted from this map, leaving only the deleted folders behind + tombstones = makeTombstones(dps) ) getJobs, err := getFetchIDFunc(qp.Category) @@ -84,11 +73,13 @@ func filterContainersAndFillCollections( cID := *c.GetId() - // this folder exists (probably), do not delete it. - delete(deletedPaths, cID) - + currPath, ok := includeContainer(qp, c, scope) + if currPath != nil { + // this path exists, do not delete it, even if it isn't + // included in this backup. + delete(tombstones, currPath.String()) + } // Only create a collection if the path matches the scope. - currPath, ok := pathAndMatch(qp, c, scope) if !ok { continue } @@ -103,22 +94,35 @@ func filterContainersAndFillCollections( if len(prevPathStr) > 0 { if prevPath, err = pathFromPrevString(prevPathStr); err != nil { logger.Ctx(ctx).Error(err) + // if the previous path is unusable, then the delta must be, too. + prevDelta = "" } } - jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta) + jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta) if err != nil { - // race conditions happen, the container might get - // deleted while this process in flight. - if errors.Is(err, errContainerDeleted) { - currPath = nil - } else { + // race conditions happen, containers might get deleted while + // this process is in flight. If it was deleted, we remake the + // tombstone, just to be sure it gets deleted from storage. + if graph.IsErrDeletedInFlight(err) == nil { errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) + } else { + tombstones[currPath.String()] = struct{}{} } + + continue } - if len(currDelta) > 0 { - deltaURLs[cID] = currDelta + if len(newDelta.url) > 0 { + deltaURLs[cID] = newDelta.url + } + + if newDelta.reset { + // the previous delta was invalid or otherwise unusable. + // We need to mark the collection as New, and tombstone + // the path to clear out any data for a clean refresh. + prevPath = nil + tombstones[currPath.String()] = struct{}{} } edc := NewCollection( @@ -130,12 +134,8 @@ func filterContainersAndFillCollections( statusUpdater, ctrlOpts, ) + collections[cID] = &edc - - if edc.State() == data.DeletedState { - continue - } - edc.jobs = append(edc.jobs, jobs...) // add the current path for the container ID to be used in the next backup @@ -143,18 +143,21 @@ func filterContainersAndFillCollections( currPaths[cID] = currPath.String() } - // any path that wasn't present in the resolver was deleted by the user. - // relocations and renames will have removed the dir by id earlier. What's - // left in deletedPaths are only the previous paths that did not appear as - // children of the root. - for fID, dp := range deletedPaths { + // A tombstone is a collection path that needs to be marked for deletion. + // Tombstones can occur for a number of reasons: the delta token expired, + // the container was deleted in flight, or the user deleted the container + // between backup runs. If events combine to both delete and write content + // to the same container (ex: container_1 gets deleted, then container_2 + // gets created with the same name), it is assumed that the backup consumer + // processes deletions before creations, making the combined operation safe. + for p := range tombstones { service, err := createService(qp.Credentials) if err != nil { errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) continue } - prevPath, err := pathFromPrevString(dp.path) + prevPath, err := pathFromPrevString(p) if err != nil { logger.Ctx(ctx).Error(err) continue @@ -169,7 +172,7 @@ func filterContainersAndFillCollections( statusUpdater, ctrlOpts, ) - collections[fID] = &edc + collections[p] = &edc } entries := []graph.MetadataCollectionEntry{ @@ -196,6 +199,19 @@ func filterContainersAndFillCollections( return errs } +// produces a set keyed by path strings from the deltapaths map. +// Each entry in the set will, if not removed, produce a collection +// that will delete the tombstone by path. +func makeTombstones(dps DeltaPaths) map[string]struct{} { + r := make(map[string]struct{}, len(dps)) + + for _, v := range dps { + r[v.path] = struct{}{} + } + + return r +} + func pathFromPrevString(ps string) (path.Path, error) { p, err := path.FromDataLayerPath(ps, false) if err != nil { @@ -260,7 +276,7 @@ type FetchIDFunc func( ctx context.Context, gs graph.Servicer, user, containerID, oldDeltaToken string, -) ([]string, string, error) +) ([]string, deltaUpdate, error) func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { switch category { @@ -275,12 +291,16 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { } } +// --------------------------------------------------------------------------- +// events +// --------------------------------------------------------------------------- + // FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar. func FetchEventIDsFromCalendar( ctx context.Context, gs graph.Servicer, user, calendarID, oldDelta string, -) ([]string, string, error) { +) ([]string, deltaUpdate, error) { var ( errs *multierror.Error ids []string @@ -288,7 +308,7 @@ func FetchEventIDsFromCalendar( options, err := optionsForEventsByCalendar([]string{"id"}) if err != nil { - return nil, "", err + return nil, deltaUpdate{}, err } builder := gs.Client(). @@ -299,15 +319,11 @@ func FetchEventIDsFromCalendar( for { resp, err := builder.Get(ctx, options) if err != nil { - if hasErrorCode(err, errItemNotFound) { - // The folder was deleted between the time we populated the container - // cache and when we tried to fetch data for it. All we can do is - // return no jobs because we've only pulled basic info about each - // item. - return nil, "", errors.WithStack(errContainerDeleted) + if err := graph.IsErrDeletedInFlight(err); err != nil { + return nil, deltaUpdate{}, err } - return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -333,25 +349,91 @@ func FetchEventIDsFromCalendar( } // Events don't have a delta endpoint so just return an empty string. - return ids, "", errs.ErrorOrNil() + return ids, deltaUpdate{}, errs.ErrorOrNil() } +// --------------------------------------------------------------------------- +// contacts +// --------------------------------------------------------------------------- + // FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts // of the targeted directory func FetchContactIDsFromDirectory( ctx context.Context, gs graph.Servicer, user, directoryID, oldDelta string, -) ([]string, string, error) { +) ([]string, deltaUpdate, error) { var ( - errs *multierror.Error - ids []string - deltaURL string + errs *multierror.Error + ids []string + deltaURL string + resetDelta bool ) options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"}) if err != nil { - return nil, deltaURL, errors.Wrap(err, "getting query options") + return nil, deltaUpdate{}, errors.Wrap(err, "getting query options") + } + + getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error { + for { + resp, err := builder.Get(ctx, options) + if err != nil { + if err := graph.IsErrDeletedInFlight(err); err != nil { + return err + } + + if err := graph.IsErrInvalidDelta(err); err != nil { + return err + } + + return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + } + + for _, item := range resp.GetValue() { + if item.GetId() == nil { + errs = multierror.Append( + errs, + errors.Errorf("item with nil ID in folder %s", directoryID), + ) + + // TODO(ashmrtn): Handle fail-fast. + continue + } + + ids = append(ids, *item.GetId()) + } + + delta := resp.GetOdataDeltaLink() + if delta != nil && len(*delta) > 0 { + deltaURL = *delta + } + + nextLink := resp.GetOdataNextLink() + if nextLink == nil || len(*nextLink) == 0 { + break + } + + builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter()) + } + + return nil + } + + if len(oldDelta) > 0 { + err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter())) + // happy path + if err == nil { + return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil() + } + // only return on error if it is NOT a delta issue. + // otherwise we'll retry the call with the regular builder + if graph.IsErrInvalidDelta(err) == nil { + return nil, deltaUpdate{}, err + } + + resetDelta = true + errs = nil } builder := gs.Client(). @@ -360,70 +442,95 @@ func FetchContactIDsFromDirectory( Contacts(). Delta() - if len(oldDelta) > 0 { - builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()) + if err := getIDs(builder); err != nil { + return nil, deltaUpdate{}, err } - for { - resp, err := builder.Get(ctx, options) - if err != nil { - if hasErrorCode(err, errItemNotFound) { - // The folder was deleted between the time we populated the container - // cache and when we tried to fetch data for it. All we can do is - // return no jobs because we've only pulled basic info about each - // item. - return nil, "", errors.WithStack(errContainerDeleted) - } - - return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) - } - - for _, item := range resp.GetValue() { - if item.GetId() == nil { - errs = multierror.Append( - errs, - errors.Errorf("contact with nil ID in folder %s", directoryID), - ) - - // TODO(ashmrtn): Handle fail-fast. - continue - } - - ids = append(ids, *item.GetId()) - } - - delta := resp.GetOdataDeltaLink() - if delta != nil && len(*delta) > 0 { - deltaURL = *delta - } - - nextLink := resp.GetOdataNextLink() - if nextLink == nil || len(*nextLink) == 0 { - break - } - - builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter()) - } - - return ids, deltaURL, errs.ErrorOrNil() + return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } +// --------------------------------------------------------------------------- +// messages +// --------------------------------------------------------------------------- + // FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail // of the targeted directory func FetchMessageIDsFromDirectory( ctx context.Context, gs graph.Servicer, user, directoryID, oldDelta string, -) ([]string, string, error) { +) ([]string, deltaUpdate, error) { var ( - errs *multierror.Error - ids []string - deltaURL string + errs *multierror.Error + ids []string + deltaURL string + resetDelta bool ) options, err := optionsForFolderMessagesDelta([]string{"isRead"}) if err != nil { - return nil, deltaURL, errors.Wrap(err, "getting query options") + return nil, deltaUpdate{}, errors.Wrap(err, "getting query options") + } + + getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error { + for { + resp, err := builder.Get(ctx, options) + if err != nil { + if err := graph.IsErrDeletedInFlight(err); err != nil { + return err + } + + if err := graph.IsErrInvalidDelta(err); err != nil { + return err + } + + return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + } + + for _, item := range resp.GetValue() { + if item.GetId() == nil { + errs = multierror.Append( + errs, + errors.Errorf("item with nil ID in folder %s", directoryID), + ) + + // TODO(ashmrtn): Handle fail-fast. + continue + } + + ids = append(ids, *item.GetId()) + } + + delta := resp.GetOdataDeltaLink() + if delta != nil && len(*delta) > 0 { + deltaURL = *delta + } + + nextLink := resp.GetOdataNextLink() + if nextLink == nil || len(*nextLink) == 0 { + break + } + + builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter()) + } + + return nil + } + + if len(oldDelta) > 0 { + err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())) + // happy path + if err == nil { + return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil() + } + // only return on error if it is NOT a delta issue. + // otherwise we'll retry the call with the regular builder + if graph.IsErrInvalidDelta(err) == nil { + return nil, deltaUpdate{}, err + } + + resetDelta = true + errs = nil } builder := gs.Client(). @@ -432,50 +539,9 @@ func FetchMessageIDsFromDirectory( Messages(). Delta() - if len(oldDelta) > 0 { - builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()) + if err := getIDs(builder); err != nil { + return nil, deltaUpdate{}, err } - for { - resp, err := builder.Get(ctx, options) - if err != nil { - if hasErrorCode(err, errEmailFolderNotFound) { - // The folder was deleted between the time we populated the container - // cache and when we tried to fetch data for it. All we can do is - // return no jobs because we've only pulled basic info about each - // item. - return nil, "", errors.WithStack(errContainerDeleted) - } - - return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) - } - - for _, item := range resp.GetValue() { - if item.GetId() == nil { - errs = multierror.Append( - errs, - errors.Errorf("item with nil ID in folder %s", directoryID), - ) - - // TODO(ashmrtn): Handle fail-fast. - continue - } - - ids = append(ids, *item.GetId()) - } - - delta := resp.GetOdataDeltaLink() - if delta != nil && len(*delta) > 0 { - deltaURL = *delta - } - - nextLink := resp.GetOdataNextLink() - if nextLink == nil || len(*nextLink) == 0 { - break - } - - builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter()) - } - - return ids, deltaURL, errs.ErrorOrNil() + return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/graph/errors.go b/src/internal/connector/graph/errors.go new file mode 100644 index 000000000..3c949df04 --- /dev/null +++ b/src/internal/connector/graph/errors.go @@ -0,0 +1,127 @@ +package graph + +import ( + "fmt" + "net/url" + + "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/common" +) + +// --------------------------------------------------------------------------- +// Error Interpretation Helpers +// --------------------------------------------------------------------------- + +const ( + errCodeItemNotFound = "ErrorItemNotFound" + errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound" + errCodeResyncRequired = "ResyncRequired" + errCodeSyncStateNotFound = "SyncStateNotFound" +) + +// The folder or item was deleted between the time we identified +// it and when we tried to fetch data for it. +type ErrDeletedInFlight struct { + common.Err +} + +func IsErrDeletedInFlight(err error) error { + if asDeletedInFlight(err) { + return err + } + + if hasErrorCode(err, errCodeItemNotFound) { + return ErrDeletedInFlight{*common.EncapsulateError(err)} + } + + return nil +} + +func asDeletedInFlight(err error) bool { + e := ErrDeletedInFlight{} + return errors.As(err, &e) +} + +// Delta tokens can be desycned or expired. In either case, the token +// becomes invalid, and cannot be used again. +// https://learn.microsoft.com/en-us/graph/errors#code-property +type ErrInvalidDelta struct { + common.Err +} + +func IsErrInvalidDelta(err error) error { + if asInvalidDelta(err) { + return err + } + + if hasErrorCode(err, errCodeSyncStateNotFound) || + hasErrorCode(err, errCodeResyncRequired) { + return ErrInvalidDelta{*common.EncapsulateError(err)} + } + + return nil +} + +func asInvalidDelta(err error) bool { + e := ErrInvalidDelta{} + return errors.As(err, &e) +} + +// Timeout errors are identified for tracking the need to retry calls. +// Other delay errors, like throttling, are already handled by the +// graph client's built-in retries. +// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302 +type ErrTimeout struct { + common.Err +} + +func IsErrTimeout(err error) error { + if asTimeout(err) { + return err + } + + if isTimeoutErr(err) { + return ErrTimeout{*common.EncapsulateError(err)} + } + + return nil +} + +func asTimeout(err error) bool { + e := ErrTimeout{} + return errors.As(err, &e) +} + +// --------------------------------------------------------------------------- +// error parsers +// --------------------------------------------------------------------------- + +func hasErrorCode(err error, code string) bool { + if err == nil { + fmt.Println("nil") + return false + } + + var oDataError *odataerrors.ODataError + if !errors.As(err, &oDataError) { + return false + } + + return oDataError.GetError().GetCode() != nil && + *oDataError.GetError().GetCode() == code +} + +// isTimeoutErr is used to determine if the Graph error returned is +// because of Timeout. This is used to restrict retries to just +// timeouts as other errors are handled within a middleware in the +// client. +func isTimeoutErr(err error) bool { + switch err := err.(type) { + case *url.Error: + return err.Timeout() + default: + return false + } +} diff --git a/src/internal/connector/graph/service_helper.go b/src/internal/connector/graph/service_helper.go index 8513be5c0..db39fcb34 100644 --- a/src/internal/connector/graph/service_helper.go +++ b/src/internal/connector/graph/service_helper.go @@ -57,10 +57,13 @@ func CreateHTTPClient() *nethttp.Client { return httpClient } +// --------------------------------------------------------------------------- +// Logging Middleware +// --------------------------------------------------------------------------- + // LoggingMiddleware can be used to log the http request sent by the graph client type LoggingMiddleware struct{} -// Intercept implements the RequestInterceptor interface and decodes the parameters name func (handler *LoggingMiddleware) Intercept( pipeline khttp.Pipeline, middlewareIndex int, @@ -97,6 +100,10 @@ func (handler *LoggingMiddleware) Intercept( return resp, err } +// --------------------------------------------------------------------------- +// Other Helpers +// --------------------------------------------------------------------------- + func StringToPathCategory(input string) path.CategoryType { param := strings.ToLower(input) diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 638876b66..c225b8571 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -4,7 +4,6 @@ package onedrive import ( "context" "io" - "net/url" "sync" "sync/atomic" "time" @@ -156,19 +155,6 @@ func (od *Item) Info() details.ItemInfo { // return od.info.Modified //} -// isTimeoutErr is used to determine if the Graph error returned is -// because of Timeout. This is used to restrict retries to just -// timeouts as other errors are handled within a middleware in the -// client. -func isTimeoutErr(err error) bool { - switch err := err.(type) { - case *url.Error: - return err.Timeout() - default: - return false - } -} - // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context) { @@ -228,11 +214,8 @@ func (oc *Collection) populateItems(ctx context.Context) { for i := 1; i <= maxRetries; i++ { itemInfo, itemData, err = oc.itemReader(ctx, item) - // We only retry if it is a timeout error. Other - // errors like throttling are already handled within - // the graph client via a retry middleware. - // https://github.com/microsoftgraph/msgraph-sdk-go/issues/302 - if err == nil || !isTimeoutErr(err) { + // retry on Timeout type errors, break otherwise. + if err == nil || graph.IsErrTimeout(err) == nil { break }