handle invalid or expired deltas
If a delta token expires or is otherwise invalid, the backup should fall back to the same behavior as if the collection were new. It must collect the full delta of information, and create a collection with New state. If the previous container moved between the two deltas, it should be marked for deletion with a tomb- stone collection.
This commit is contained in:
parent
741b36da98
commit
591b13388d
@ -176,7 +176,9 @@ func populateExchangeContainerResolver(
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func pathAndMatch(
|
||||
// Returns true if the container passes the scope comparison and should be included.
|
||||
// Also returns the path representing the directory.
|
||||
func includeContainer(
|
||||
qp graph.QueryParams,
|
||||
c graph.CachedContainer,
|
||||
scope selectors.ExchangeScope,
|
||||
@ -213,6 +215,6 @@ func pathAndMatch(
|
||||
case path.EventsCategory:
|
||||
return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory)
|
||||
default:
|
||||
return nil, false
|
||||
return dirPath, false
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@ -20,22 +19,12 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
|
||||
const (
|
||||
errEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
||||
errItemNotFound = "ErrorItemNotFound"
|
||||
)
|
||||
|
||||
var errContainerDeleted = errors.New("container deleted")
|
||||
|
||||
func hasErrorCode(err error, code string) bool {
|
||||
var oDataError *odataerrors.ODataError
|
||||
if !errors.As(err, &oDataError) {
|
||||
return false
|
||||
}
|
||||
|
||||
return oDataError.GetError() != nil &&
|
||||
oDataError.GetError().GetCode() != nil &&
|
||||
*oDataError.GetError().GetCode() == code
|
||||
// carries details about delta retrieval in aggregators
|
||||
type deltaUpdate struct {
|
||||
// the deltaLink itself
|
||||
url string
|
||||
// true if the old delta was marked as invalid
|
||||
reset bool
|
||||
}
|
||||
|
||||
// filterContainersAndFillCollections is a utility function
|
||||
@ -61,7 +50,7 @@ func filterContainersAndFillCollections(
|
||||
currPaths = map[string]string{}
|
||||
// copy of previousPaths. any folder found in the resolver get
|
||||
// deleted from this map, leaving only the deleted maps behind
|
||||
deletedPaths = map[string]DeltaPath{}
|
||||
tombstones = makeTombstones(dps)
|
||||
)
|
||||
|
||||
getJobs, err := getFetchIDFunc(qp.Category)
|
||||
@ -84,11 +73,13 @@ func filterContainersAndFillCollections(
|
||||
|
||||
cID := *c.GetId()
|
||||
|
||||
// this folder exists (probably), do not delete it.
|
||||
delete(deletedPaths, cID)
|
||||
|
||||
currPath, ok := includeContainer(qp, c, scope)
|
||||
if currPath != nil {
|
||||
// this path exists, do not delete it, even if it isn't
|
||||
// included in this backup.
|
||||
delete(tombstones, currPath.String())
|
||||
}
|
||||
// Only create a collection if the path matches the scope.
|
||||
currPath, ok := pathAndMatch(qp, c, scope)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@ -106,19 +97,28 @@ func filterContainersAndFillCollections(
|
||||
}
|
||||
}
|
||||
|
||||
jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
||||
jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
||||
if err != nil {
|
||||
// race conditions happen, the container might get
|
||||
// deleted while this process in flight.
|
||||
if errors.Is(err, errContainerDeleted) {
|
||||
currPath = nil
|
||||
} else {
|
||||
// race conditions happen, containers might get deleted while
|
||||
// this process is in flight. If it was deleted, we skip it
|
||||
// and move on, which will create a tombstone to delete the path.
|
||||
if graph.IsErrDeletedInFlight(err) == nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if len(currDelta) > 0 {
|
||||
deltaURLs[cID] = currDelta
|
||||
if len(newDelta.url) > 0 {
|
||||
deltaURLs[cID] = newDelta.url
|
||||
}
|
||||
|
||||
if newDelta.reset {
|
||||
// the previous delta was invalid or otherwise unusable.
|
||||
// We need to mark the collection as New, and tombstone
|
||||
// the path to clear out any data for a clean refresh.
|
||||
prevPath = nil
|
||||
tombstones[currPath.String()] = struct{}{}
|
||||
}
|
||||
|
||||
edc := NewCollection(
|
||||
@ -130,12 +130,8 @@ func filterContainersAndFillCollections(
|
||||
statusUpdater,
|
||||
ctrlOpts,
|
||||
)
|
||||
|
||||
collections[cID] = &edc
|
||||
|
||||
if edc.State() == data.DeletedState {
|
||||
continue
|
||||
}
|
||||
|
||||
edc.jobs = append(edc.jobs, jobs...)
|
||||
|
||||
// add the current path for the container ID to be used in the next backup
|
||||
@ -143,18 +139,21 @@ func filterContainersAndFillCollections(
|
||||
currPaths[cID] = currPath.String()
|
||||
}
|
||||
|
||||
// any path that wasn't present in the resolver was deleted by the user.
|
||||
// relocations and renames will have removed the dir by id earlier. What's
|
||||
// left in deletedPaths are only the previous paths that did not appear as
|
||||
// children of the root.
|
||||
for fID, dp := range deletedPaths {
|
||||
// A tombstone is a collection path that needs to be marked for deletion.
|
||||
// Tombstones can occur for a number of reasons: the delta token expired,
|
||||
// the container was deleted in flight, or the user deleted the container
|
||||
// between backup runs. If events combine to both delete and write content
|
||||
// to the same container (ex: container_1 gets deleted, then container_2
|
||||
// gets created with the same name), it is assumed that the backup consumer
|
||||
// processes deletions before creations, making the combined operation safe.
|
||||
for p := range tombstones {
|
||||
service, err := createService(qp.Credentials)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
continue
|
||||
}
|
||||
|
||||
prevPath, err := pathFromPrevString(dp.path)
|
||||
prevPath, err := pathFromPrevString(p)
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Error(err)
|
||||
continue
|
||||
@ -169,7 +168,7 @@ func filterContainersAndFillCollections(
|
||||
statusUpdater,
|
||||
ctrlOpts,
|
||||
)
|
||||
collections[fID] = &edc
|
||||
collections[p] = &edc
|
||||
}
|
||||
|
||||
entries := []graph.MetadataCollectionEntry{
|
||||
@ -196,6 +195,19 @@ func filterContainersAndFillCollections(
|
||||
return errs
|
||||
}
|
||||
|
||||
// produces a set keyed by path strings from the deltapaths map.
|
||||
// Each entry in the set will, if not removed, produce a collection
|
||||
// that will delete the tombstone by path.
|
||||
func makeTombstones(dps DeltaPaths) map[string]struct{} {
|
||||
r := make(map[string]struct{}, len(dps))
|
||||
|
||||
for _, v := range dps {
|
||||
r[v.path] = struct{}{}
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func pathFromPrevString(ps string) (path.Path, error) {
|
||||
p, err := path.FromDataLayerPath(ps, false)
|
||||
if err != nil {
|
||||
@ -260,7 +272,7 @@ type FetchIDFunc func(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, containerID, oldDeltaToken string,
|
||||
) ([]string, string, error)
|
||||
) ([]string, deltaUpdate, error)
|
||||
|
||||
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
||||
switch category {
|
||||
@ -275,12 +287,16 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// events
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar.
|
||||
func FetchEventIDsFromCalendar(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, calendarID, oldDelta string,
|
||||
) ([]string, string, error) {
|
||||
) ([]string, deltaUpdate, error) {
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
@ -288,7 +304,7 @@ func FetchEventIDsFromCalendar(
|
||||
|
||||
options, err := optionsForEventsByCalendar([]string{"id"})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
@ -299,15 +315,11 @@ func FetchEventIDsFromCalendar(
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if hasErrorCode(err, errItemNotFound) {
|
||||
// The folder was deleted between the time we populated the container
|
||||
// cache and when we tried to fetch data for it. All we can do is
|
||||
// return no jobs because we've only pulled basic info about each
|
||||
// item.
|
||||
return nil, "", errors.WithStack(errContainerDeleted)
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
@ -333,25 +345,91 @@ func FetchEventIDsFromCalendar(
|
||||
}
|
||||
|
||||
// Events don't have a delta endpoint so just return an empty string.
|
||||
return ids, "", errs.ErrorOrNil()
|
||||
return ids, deltaUpdate{}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// contacts
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts
|
||||
// of the targeted directory
|
||||
func FetchContactIDsFromDirectory(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, directoryID, oldDelta string,
|
||||
) ([]string, string, error) {
|
||||
) ([]string, deltaUpdate, error) {
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
deltaURL string
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
deltaURL string
|
||||
resetDelta bool
|
||||
)
|
||||
|
||||
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
|
||||
options, err := optionsForContactFoldersItemDelta([]string{"isRead"})
|
||||
if err != nil {
|
||||
return nil, deltaURL, errors.Wrap(err, "getting query options")
|
||||
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error {
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
ids = append(ids, *item.GetId())
|
||||
}
|
||||
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()))
|
||||
// happy path
|
||||
if err == nil {
|
||||
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
}
|
||||
// only return on error if it is NOT a delta issue.
|
||||
// otherwise we'll retry the call with the regular builder
|
||||
if graph.IsErrInvalidDelta(err) == nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
resetDelta = true
|
||||
errs = nil
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
@ -360,70 +438,95 @@ func FetchContactIDsFromDirectory(
|
||||
Contacts().
|
||||
Delta()
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter())
|
||||
if err := getIDs(builder); err != nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if hasErrorCode(err, errItemNotFound) {
|
||||
// The folder was deleted between the time we populated the container
|
||||
// cache and when we tried to fetch data for it. All we can do is
|
||||
// return no jobs because we've only pulled basic info about each
|
||||
// item.
|
||||
return nil, "", errors.WithStack(errContainerDeleted)
|
||||
}
|
||||
|
||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("contact with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
ids = append(ids, *item.GetId())
|
||||
}
|
||||
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||
}
|
||||
|
||||
return ids, deltaURL, errs.ErrorOrNil()
|
||||
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// messages
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
|
||||
// of the targeted directory
|
||||
func FetchMessageIDsFromDirectory(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, directoryID, oldDelta string,
|
||||
) ([]string, string, error) {
|
||||
) ([]string, deltaUpdate, error) {
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
deltaURL string
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
deltaURL string
|
||||
resetDelta bool
|
||||
)
|
||||
|
||||
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
|
||||
if err != nil {
|
||||
return nil, deltaURL, errors.Wrap(err, "getting query options")
|
||||
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error {
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
ids = append(ids, *item.GetId())
|
||||
}
|
||||
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()))
|
||||
// happy path
|
||||
if err == nil {
|
||||
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
}
|
||||
// only return on error if it is NOT a delta issue.
|
||||
// otherwise we'll retry the call with the regular builder
|
||||
if graph.IsErrInvalidDelta(err) == nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
resetDelta = true
|
||||
errs = nil
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
@ -432,50 +535,9 @@ func FetchMessageIDsFromDirectory(
|
||||
Messages().
|
||||
Delta()
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())
|
||||
if err := getIDs(builder); err != nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if hasErrorCode(err, errEmailFolderNotFound) {
|
||||
// The folder was deleted between the time we populated the container
|
||||
// cache and when we tried to fetch data for it. All we can do is
|
||||
// return no jobs because we've only pulled basic info about each
|
||||
// item.
|
||||
return nil, "", errors.WithStack(errContainerDeleted)
|
||||
}
|
||||
|
||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
ids = append(ids, *item.GetId())
|
||||
}
|
||||
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||
}
|
||||
|
||||
return ids, deltaURL, errs.ErrorOrNil()
|
||||
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
126
src/internal/connector/graph/errors.go
Normal file
126
src/internal/connector/graph/errors.go
Normal file
@ -0,0 +1,126 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Error Interpretation Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const (
|
||||
errCodeItemNotFound = "ErrorItemNotFound"
|
||||
errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
||||
errCodeResyncRequired = "ResyncRequired"
|
||||
errCodeSyncStateNotFound = "SyncStateNotFound"
|
||||
)
|
||||
|
||||
// The folder or item was deleted between the time we identified
|
||||
// it and when we tried to fetch data for it.
|
||||
type ErrDeletedInFlight struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func IsErrDeletedInFlight(err error) error {
|
||||
if asDeletedInFlight(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if hasErrorCode(err, errCodeItemNotFound) {
|
||||
return ErrDeletedInFlight{*common.EncapsulateError(err)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func asDeletedInFlight(err error) bool {
|
||||
e := ErrDeletedInFlight{}
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
// Delta tokens can be desycned or expired. In either case, the token
|
||||
// becomes invalid, and cannot be used again.
|
||||
// https://learn.microsoft.com/en-us/graph/errors#code-property
|
||||
type ErrInvalidDelta struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func IsErrInvalidDelta(err error) error {
|
||||
if asInvalidDelta(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if hasErrorCode(err, errCodeSyncStateNotFound) ||
|
||||
hasErrorCode(err, errCodeResyncRequired) {
|
||||
return ErrInvalidDelta{*common.EncapsulateError(err)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func asInvalidDelta(err error) bool {
|
||||
e := ErrInvalidDelta{}
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
// Timeout errors are identified for tracking the need to retry calls.
|
||||
// Other delay errors, like throttling, are already handled by the
|
||||
// graph client's built-in retries.
|
||||
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
|
||||
type ErrTimeout struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func IsErrTimeout(err error) error {
|
||||
if asTimeout(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if isTimeoutErr(err) {
|
||||
return ErrTimeout{*common.EncapsulateError(err)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func asTimeout(err error) bool {
|
||||
e := ErrTimeout{}
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// error parsers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func hasErrorCode(err error, code string) bool {
|
||||
if err == nil {
|
||||
fmt.Println("nil")
|
||||
return false
|
||||
}
|
||||
|
||||
var oDataError *odataerrors.ODataError
|
||||
if !errors.As(err, &oDataError) {
|
||||
return false
|
||||
}
|
||||
|
||||
return oDataError.GetError().GetCode() != nil &&
|
||||
*oDataError.GetError().GetCode() == code
|
||||
}
|
||||
|
||||
// isTimeoutErr is used to determine if the Graph error returned is
|
||||
// because of Timeout. This is used to restrict retries to just
|
||||
// timeouts as other errors are handled within a middleware in the
|
||||
// client.
|
||||
func isTimeoutErr(err error) bool {
|
||||
switch err := err.(type) {
|
||||
case *url.Error:
|
||||
return err.Timeout()
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -57,10 +57,13 @@ func CreateHTTPClient() *nethttp.Client {
|
||||
return httpClient
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Logging Middleware
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// LoggingMiddleware can be used to log the http request sent by the graph client
|
||||
type LoggingMiddleware struct{}
|
||||
|
||||
// Intercept implements the RequestInterceptor interface and decodes the parameters name
|
||||
func (handler *LoggingMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
@ -97,6 +100,10 @@ func (handler *LoggingMiddleware) Intercept(
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Other Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func StringToPathCategory(input string) path.CategoryType {
|
||||
param := strings.ToLower(input)
|
||||
|
||||
|
||||
@ -4,7 +4,6 @@ package onedrive
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -156,19 +155,6 @@ func (od *Item) Info() details.ItemInfo {
|
||||
// return od.info.Modified
|
||||
//}
|
||||
|
||||
// isTimeoutErr is used to determine if the Graph error returned is
|
||||
// because of Timeout. This is used to restrict retries to just
|
||||
// timeouts as other errors are handled within a middleware in the
|
||||
// client.
|
||||
func isTimeoutErr(err error) bool {
|
||||
switch err := err.(type) {
|
||||
case *url.Error:
|
||||
return err.Timeout()
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
// and uses the collection `itemReader` to read the item
|
||||
func (oc *Collection) populateItems(ctx context.Context) {
|
||||
@ -228,11 +214,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
for i := 1; i <= maxRetries; i++ {
|
||||
itemInfo, itemData, err = oc.itemReader(ctx, item)
|
||||
|
||||
// We only retry if it is a timeout error. Other
|
||||
// errors like throttling are already handled within
|
||||
// the graph client via a retry middleware.
|
||||
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
|
||||
if err == nil || !isTimeoutErr(err) {
|
||||
// retry on Timeout type errors, break otherwise.
|
||||
if err == nil || graph.IsErrTimeout(err) == nil {
|
||||
break
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user