Compare commits
7 Commits
main
...
issue-1914
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4ec628df9 | ||
|
|
3089879cec | ||
|
|
f0c638e8b9 | ||
|
|
e413ca1742 | ||
|
|
1ef0daace4 | ||
|
|
2acf01af31 | ||
|
|
591b13388d |
@ -69,6 +69,9 @@ type Collection struct {
|
||||
prevPath path.Path
|
||||
|
||||
state data.CollectionState
|
||||
|
||||
// doNotMergeItems should only be true if the old delta token expired.
|
||||
doNotMergeItems bool
|
||||
}
|
||||
|
||||
// NewExchangeDataCollection creates an ExchangeDataCollection.
|
||||
@ -156,12 +159,14 @@ func (col Collection) PreviousPath() path.Path {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
|
||||
// hierarchies.
|
||||
func (col Collection) State() data.CollectionState {
|
||||
return col.state
|
||||
}
|
||||
|
||||
func (col Collection) DoNotMergeItems() bool {
|
||||
return col.doNotMergeItems
|
||||
}
|
||||
|
||||
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
|
||||
// all the M365IDs defined in the jobs field. data channel is closed by this function
|
||||
func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
|
||||
|
||||
@ -176,7 +176,9 @@ func populateExchangeContainerResolver(
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func pathAndMatch(
|
||||
// Returns true if the container passes the scope comparison and should be included.
|
||||
// Also returns the path representing the directory.
|
||||
func includeContainer(
|
||||
qp graph.QueryParams,
|
||||
c graph.CachedContainer,
|
||||
scope selectors.ExchangeScope,
|
||||
@ -213,6 +215,6 @@ func pathAndMatch(
|
||||
case path.EventsCategory:
|
||||
return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory)
|
||||
default:
|
||||
return nil, false
|
||||
return dirPath, false
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@ -20,22 +19,12 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
|
||||
const (
|
||||
errEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
||||
errItemNotFound = "ErrorItemNotFound"
|
||||
)
|
||||
|
||||
var errContainerDeleted = errors.New("container deleted")
|
||||
|
||||
func hasErrorCode(err error, code string) bool {
|
||||
var oDataError *odataerrors.ODataError
|
||||
if !errors.As(err, &oDataError) {
|
||||
return false
|
||||
}
|
||||
|
||||
return oDataError.GetError() != nil &&
|
||||
oDataError.GetError().GetCode() != nil &&
|
||||
*oDataError.GetError().GetCode() == code
|
||||
// carries details about delta retrieval in aggregators
|
||||
type deltaUpdate struct {
|
||||
// the deltaLink itself
|
||||
url string
|
||||
// true if the old delta was marked as invalid
|
||||
reset bool
|
||||
}
|
||||
|
||||
// filterContainersAndFillCollections is a utility function
|
||||
@ -60,8 +49,8 @@ func filterContainersAndFillCollections(
|
||||
deltaURLs = map[string]string{}
|
||||
currPaths = map[string]string{}
|
||||
// copy of previousPaths. any folder found in the resolver get
|
||||
// deleted from this map, leaving only the deleted maps behind
|
||||
deletedPaths = map[string]DeltaPath{}
|
||||
// deleted from this map, leaving only the deleted folders behind
|
||||
tombstones = makeTombstones(dps)
|
||||
)
|
||||
|
||||
getJobs, err := getFetchIDFunc(qp.Category)
|
||||
@ -83,12 +72,10 @@ func filterContainersAndFillCollections(
|
||||
}
|
||||
|
||||
cID := *c.GetId()
|
||||
delete(tombstones, cID)
|
||||
|
||||
// this folder exists (probably), do not delete it.
|
||||
delete(deletedPaths, cID)
|
||||
|
||||
currPath, ok := includeContainer(qp, c, scope)
|
||||
// Only create a collection if the path matches the scope.
|
||||
currPath, ok := pathAndMatch(qp, c, scope)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
@ -103,22 +90,35 @@ func filterContainersAndFillCollections(
|
||||
if len(prevPathStr) > 0 {
|
||||
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
|
||||
logger.Ctx(ctx).Error(err)
|
||||
// if the previous path is unusable, then the delta must be, too.
|
||||
prevDelta = ""
|
||||
}
|
||||
}
|
||||
|
||||
jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
||||
jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
||||
if err != nil {
|
||||
// race conditions happen, the container might get
|
||||
// deleted while this process in flight.
|
||||
if errors.Is(err, errContainerDeleted) {
|
||||
currPath = nil
|
||||
} else {
|
||||
// race conditions happen, containers might get deleted while
|
||||
// this process is in flight. If it was deleted, we remake the
|
||||
// tombstone, just to be sure it gets deleted from storage.
|
||||
if graph.IsErrDeletedInFlight(err) == nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
}
|
||||
} else {
|
||||
tombstones[cID] = dp.path
|
||||
}
|
||||
|
||||
if len(currDelta) > 0 {
|
||||
deltaURLs[cID] = currDelta
|
||||
continue
|
||||
}
|
||||
|
||||
if len(newDelta.url) > 0 {
|
||||
deltaURLs[cID] = newDelta.url
|
||||
}
|
||||
|
||||
if newDelta.reset {
|
||||
// the previous delta was invalid or otherwise unusable.
|
||||
// We need to mark the collection as New, and tombstone
|
||||
// the path to clear out any data for a clean refresh.
|
||||
prevPath = nil
|
||||
tombstones[cID] = dp.path
|
||||
}
|
||||
|
||||
edc := NewCollection(
|
||||
@ -130,12 +130,8 @@ func filterContainersAndFillCollections(
|
||||
statusUpdater,
|
||||
ctrlOpts,
|
||||
)
|
||||
|
||||
collections[cID] = &edc
|
||||
|
||||
if edc.State() == data.DeletedState {
|
||||
continue
|
||||
}
|
||||
|
||||
edc.jobs = append(edc.jobs, jobs...)
|
||||
|
||||
// add the current path for the container ID to be used in the next backup
|
||||
@ -143,20 +139,31 @@ func filterContainersAndFillCollections(
|
||||
currPaths[cID] = currPath.String()
|
||||
}
|
||||
|
||||
// any path that wasn't present in the resolver was deleted by the user.
|
||||
// relocations and renames will have removed the dir by id earlier. What's
|
||||
// left in deletedPaths are only the previous paths that did not appear as
|
||||
// children of the root.
|
||||
for fID, dp := range deletedPaths {
|
||||
// A tombstone is a collection path that needs to be marked for deletion.
|
||||
// Tombstones can occur for a number of reasons: the delta token expired,
|
||||
// the container was deleted in flight, or the user deleted the container
|
||||
// between backup runs. If events combine to both delete and write content
|
||||
// to the same container (ex: container_1 gets deleted, then container_2
|
||||
// gets created with the same name), it is assumed that the backup consumer
|
||||
// processes deletions before creations, making the combined operation safe.
|
||||
for id, p := range tombstones {
|
||||
service, err := createService(qp.Credentials)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||
continue
|
||||
}
|
||||
|
||||
prevPath, err := pathFromPrevString(dp.path)
|
||||
// only occurs if it was a new folder that we picked up during the container
|
||||
// resolver phase that got deleted in flight by the time we hit this stage.
|
||||
if len(p) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
prevPath, err := pathFromPrevString(p)
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Error(err)
|
||||
// technically shouldn't ever happen. But just in case, we need to catch
|
||||
// it for protection.
|
||||
logger.Ctx(ctx).Errorw("parsing tombstone path", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -169,7 +176,7 @@ func filterContainersAndFillCollections(
|
||||
statusUpdater,
|
||||
ctrlOpts,
|
||||
)
|
||||
collections[fID] = &edc
|
||||
collections[id] = &edc
|
||||
}
|
||||
|
||||
entries := []graph.MetadataCollectionEntry{
|
||||
@ -196,6 +203,19 @@ func filterContainersAndFillCollections(
|
||||
return errs
|
||||
}
|
||||
|
||||
// produces a set of id:path pairs from the deltapaths map.
|
||||
// Each entry in the set will, if not removed, produce a collection
|
||||
// that will delete the tombstone by path.
|
||||
func makeTombstones(dps DeltaPaths) map[string]string {
|
||||
r := make(map[string]string, len(dps))
|
||||
|
||||
for id, v := range dps {
|
||||
r[id] = v.path
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func pathFromPrevString(ps string) (path.Path, error) {
|
||||
p, err := path.FromDataLayerPath(ps, false)
|
||||
if err != nil {
|
||||
@ -260,7 +280,7 @@ type FetchIDFunc func(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, containerID, oldDeltaToken string,
|
||||
) ([]string, string, error)
|
||||
) ([]string, deltaUpdate, error)
|
||||
|
||||
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
||||
switch category {
|
||||
@ -275,12 +295,16 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// events
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar.
|
||||
func FetchEventIDsFromCalendar(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, calendarID, oldDelta string,
|
||||
) ([]string, string, error) {
|
||||
) ([]string, deltaUpdate, error) {
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
@ -288,7 +312,7 @@ func FetchEventIDsFromCalendar(
|
||||
|
||||
options, err := optionsForEventsByCalendar([]string{"id"})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
@ -299,15 +323,11 @@ func FetchEventIDsFromCalendar(
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if hasErrorCode(err, errItemNotFound) {
|
||||
// The folder was deleted between the time we populated the container
|
||||
// cache and when we tried to fetch data for it. All we can do is
|
||||
// return no jobs because we've only pulled basic info about each
|
||||
// item.
|
||||
return nil, "", errors.WithStack(errContainerDeleted)
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
@ -333,56 +353,52 @@ func FetchEventIDsFromCalendar(
|
||||
}
|
||||
|
||||
// Events don't have a delta endpoint so just return an empty string.
|
||||
return ids, "", errs.ErrorOrNil()
|
||||
return ids, deltaUpdate{}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// contacts
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts
|
||||
// of the targeted directory
|
||||
func FetchContactIDsFromDirectory(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, directoryID, oldDelta string,
|
||||
) ([]string, string, error) {
|
||||
) ([]string, deltaUpdate, error) {
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
deltaURL string
|
||||
resetDelta bool
|
||||
)
|
||||
|
||||
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
|
||||
if err != nil {
|
||||
return nil, deltaURL, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
UsersById(user).
|
||||
ContactFoldersById(directoryID).
|
||||
Contacts().
|
||||
Delta()
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter())
|
||||
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error {
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if hasErrorCode(err, errItemNotFound) {
|
||||
// The folder was deleted between the time we populated the container
|
||||
// cache and when we tried to fetch data for it. All we can do is
|
||||
// return no jobs because we've only pulled basic info about each
|
||||
// item.
|
||||
return nil, "", errors.WithStack(errContainerDeleted)
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("contact with nil ID in folder %s", directoryID),
|
||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
@ -405,49 +421,74 @@ func FetchContactIDsFromDirectory(
|
||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||
}
|
||||
|
||||
return ids, deltaURL, errs.ErrorOrNil()
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()))
|
||||
// happy path
|
||||
if err == nil {
|
||||
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
}
|
||||
// only return on error if it is NOT a delta issue.
|
||||
// otherwise we'll retry the call with the regular builder
|
||||
if graph.IsErrInvalidDelta(err) == nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
resetDelta = true
|
||||
errs = nil
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
UsersById(user).
|
||||
ContactFoldersById(directoryID).
|
||||
Contacts().
|
||||
Delta()
|
||||
|
||||
if err := getIDs(builder); err != nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// messages
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
|
||||
// of the targeted directory
|
||||
func FetchMessageIDsFromDirectory(
|
||||
ctx context.Context,
|
||||
gs graph.Servicer,
|
||||
user, directoryID, oldDelta string,
|
||||
) ([]string, string, error) {
|
||||
) ([]string, deltaUpdate, error) {
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
deltaURL string
|
||||
resetDelta bool
|
||||
)
|
||||
|
||||
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
|
||||
if err != nil {
|
||||
return nil, deltaURL, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
UsersById(user).
|
||||
MailFoldersById(directoryID).
|
||||
Messages().
|
||||
Delta()
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())
|
||||
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error {
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if hasErrorCode(err, errEmailFolderNotFound) {
|
||||
// The folder was deleted between the time we populated the container
|
||||
// cache and when we tried to fetch data for it. All we can do is
|
||||
// return no jobs because we've only pulled basic info about each
|
||||
// item.
|
||||
return nil, "", errors.WithStack(errContainerDeleted)
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
@ -477,5 +518,34 @@ func FetchMessageIDsFromDirectory(
|
||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||
}
|
||||
|
||||
return ids, deltaURL, errs.ErrorOrNil()
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()))
|
||||
// happy path
|
||||
if err == nil {
|
||||
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
}
|
||||
// only return on error if it is NOT a delta issue.
|
||||
// otherwise we'll retry the call with the regular builder
|
||||
if graph.IsErrInvalidDelta(err) == nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
resetDelta = true
|
||||
errs = nil
|
||||
}
|
||||
|
||||
builder := gs.Client().
|
||||
UsersById(user).
|
||||
MailFoldersById(directoryID).
|
||||
Messages().
|
||||
Delta()
|
||||
|
||||
if err := getIDs(builder); err != nil {
|
||||
return nil, deltaUpdate{}, err
|
||||
}
|
||||
|
||||
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
127
src/internal/connector/graph/errors.go
Normal file
127
src/internal/connector/graph/errors.go
Normal file
@ -0,0 +1,127 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Error Interpretation Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const (
|
||||
errCodeItemNotFound = "ErrorItemNotFound"
|
||||
errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
||||
errCodeResyncRequired = "ResyncRequired"
|
||||
errCodeSyncStateNotFound = "SyncStateNotFound"
|
||||
)
|
||||
|
||||
// The folder or item was deleted between the time we identified
|
||||
// it and when we tried to fetch data for it.
|
||||
type ErrDeletedInFlight struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func IsErrDeletedInFlight(err error) error {
|
||||
if asDeletedInFlight(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if hasErrorCode(err, errCodeItemNotFound) {
|
||||
return ErrDeletedInFlight{*common.EncapsulateError(err)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func asDeletedInFlight(err error) bool {
|
||||
e := ErrDeletedInFlight{}
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
// Delta tokens can be desycned or expired. In either case, the token
|
||||
// becomes invalid, and cannot be used again.
|
||||
// https://learn.microsoft.com/en-us/graph/errors#code-property
|
||||
type ErrInvalidDelta struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func IsErrInvalidDelta(err error) error {
|
||||
if asInvalidDelta(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if hasErrorCode(err, errCodeSyncStateNotFound) ||
|
||||
hasErrorCode(err, errCodeResyncRequired) {
|
||||
return ErrInvalidDelta{*common.EncapsulateError(err)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func asInvalidDelta(err error) bool {
|
||||
e := ErrInvalidDelta{}
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
// Timeout errors are identified for tracking the need to retry calls.
|
||||
// Other delay errors, like throttling, are already handled by the
|
||||
// graph client's built-in retries.
|
||||
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
|
||||
type ErrTimeout struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func IsErrTimeout(err error) error {
|
||||
if asTimeout(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if isTimeoutErr(err) {
|
||||
return ErrTimeout{*common.EncapsulateError(err)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func asTimeout(err error) bool {
|
||||
e := ErrTimeout{}
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// error parsers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func hasErrorCode(err error, code string) bool {
|
||||
if err == nil {
|
||||
fmt.Println("nil")
|
||||
return false
|
||||
}
|
||||
|
||||
var oDataError *odataerrors.ODataError
|
||||
if !errors.As(err, &oDataError) {
|
||||
return false
|
||||
}
|
||||
|
||||
return oDataError.GetError().GetCode() != nil &&
|
||||
*oDataError.GetError().GetCode() == code
|
||||
}
|
||||
|
||||
// isTimeoutErr is used to determine if the Graph error returned is
|
||||
// because of Timeout. This is used to restrict retries to just
|
||||
// timeouts as other errors are handled within a middleware in the
|
||||
// client.
|
||||
func isTimeoutErr(err error) bool {
|
||||
switch err := err.(type) {
|
||||
case *url.Error:
|
||||
return err.Timeout()
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -127,6 +127,10 @@ func (md MetadataCollection) State() data.CollectionState {
|
||||
return data.NewState
|
||||
}
|
||||
|
||||
func (md MetadataCollection) DoNotMergeItems() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (md MetadataCollection) Items() <-chan data.Stream {
|
||||
res := make(chan data.Stream)
|
||||
|
||||
|
||||
@ -57,10 +57,13 @@ func CreateHTTPClient() *nethttp.Client {
|
||||
return httpClient
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Logging Middleware
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// LoggingMiddleware can be used to log the http request sent by the graph client
|
||||
type LoggingMiddleware struct{}
|
||||
|
||||
// Intercept implements the RequestInterceptor interface and decodes the parameters name
|
||||
func (handler *LoggingMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
@ -97,6 +100,10 @@ func (handler *LoggingMiddleware) Intercept(
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Other Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func StringToPathCategory(input string) path.CategoryType {
|
||||
param := strings.ToLower(input)
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ type MockExchangeDataCollection struct {
|
||||
ColState data.CollectionState
|
||||
PrevPath path.Path
|
||||
DeletedItems []bool
|
||||
doNotMergeItems bool
|
||||
}
|
||||
|
||||
var (
|
||||
@ -104,6 +105,10 @@ func (medc MockExchangeDataCollection) State() data.CollectionState {
|
||||
return medc.ColState
|
||||
}
|
||||
|
||||
func (medc MockExchangeDataCollection) DoNotMergeItems() bool {
|
||||
return medc.doNotMergeItems
|
||||
}
|
||||
|
||||
// Items returns a channel that has the next items in the collection. The
|
||||
// channel is closed when there are no more items available.
|
||||
func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {
|
||||
|
||||
@ -4,7 +4,6 @@ package onedrive
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -58,6 +57,9 @@ type Collection struct {
|
||||
statusUpdater support.StatusUpdater
|
||||
itemReader itemReaderFunc
|
||||
ctrl control.Options
|
||||
|
||||
// should only be true if the old delta token expired
|
||||
doNotMergeItems bool
|
||||
}
|
||||
|
||||
// itemReadFunc returns a reader for the specified item
|
||||
@ -124,6 +126,10 @@ func (oc Collection) State() data.CollectionState {
|
||||
return data.NewState
|
||||
}
|
||||
|
||||
func (oc Collection) DoNotMergeItems() bool {
|
||||
return oc.doNotMergeItems
|
||||
}
|
||||
|
||||
// Item represents a single item retrieved from OneDrive
|
||||
type Item struct {
|
||||
id string
|
||||
@ -156,19 +162,6 @@ func (od *Item) Info() details.ItemInfo {
|
||||
// return od.info.Modified
|
||||
//}
|
||||
|
||||
// isTimeoutErr is used to determine if the Graph error returned is
|
||||
// because of Timeout. This is used to restrict retries to just
|
||||
// timeouts as other errors are handled within a middleware in the
|
||||
// client.
|
||||
func isTimeoutErr(err error) bool {
|
||||
switch err := err.(type) {
|
||||
case *url.Error:
|
||||
return err.Timeout()
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
// and uses the collection `itemReader` to read the item
|
||||
func (oc *Collection) populateItems(ctx context.Context) {
|
||||
@ -228,11 +221,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
for i := 1; i <= maxRetries; i++ {
|
||||
itemInfo, itemData, err = oc.itemReader(ctx, item)
|
||||
|
||||
// We only retry if it is a timeout error. Other
|
||||
// errors like throttling are already handled within
|
||||
// the graph client via a retry middleware.
|
||||
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
|
||||
if err == nil || !isTimeoutErr(err) {
|
||||
// retry on Timeout type errors, break otherwise.
|
||||
if err == nil || graph.IsErrTimeout(err) == nil {
|
||||
break
|
||||
}
|
||||
|
||||
|
||||
@ -81,12 +81,14 @@ func (sc Collection) PreviousPath() path.Path {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
|
||||
// hierarchies.
|
||||
func (sc Collection) State() data.CollectionState {
|
||||
return data.NewState
|
||||
}
|
||||
|
||||
func (sc Collection) DoNotMergeItems() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (sc *Collection) Items() <-chan data.Stream {
|
||||
go sc.populate(context.TODO())
|
||||
return sc.data
|
||||
|
||||
@ -47,6 +47,15 @@ type Collection interface {
|
||||
// backup along with all items and Collections below them in the hierarchy
|
||||
// unless said items/Collections were moved.
|
||||
State() CollectionState
|
||||
// DoNotMergeItems informs kopia that the collection is rebuilding its contents
|
||||
// from scratch, and that any items currently stored in that collection should
|
||||
// be skipped during the process of merging historical data into the new backup.
|
||||
// This flag is normally expected to be false. It should only be flagged under
|
||||
// specific circumstances. Example: if the link token used for incremental queries
|
||||
// expires or otherwise becomes unusable, thus requiring the backup producer to
|
||||
// re-discover all data in the container. This flag only affects the path of the
|
||||
// collection, and does not cascade to subfolders.
|
||||
DoNotMergeItems() bool
|
||||
}
|
||||
|
||||
// Stream represents a single item within a Collection
|
||||
|
||||
@ -30,6 +30,10 @@ func (mc mockColl) State() CollectionState {
|
||||
return NewState
|
||||
}
|
||||
|
||||
func (mc mockColl) DoNotMergeItems() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type CollectionSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
@ -43,6 +43,10 @@ func (kdc kopiaDataCollection) State() data.CollectionState {
|
||||
return data.NewState
|
||||
}
|
||||
|
||||
func (kdc kopiaDataCollection) DoNotMergeItems() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type kopiaDataStream struct {
|
||||
reader io.ReadCloser
|
||||
uuid string
|
||||
|
||||
@ -177,6 +177,10 @@ func (dc *streamCollection) State() data.CollectionState {
|
||||
return data.NewState
|
||||
}
|
||||
|
||||
func (dc *streamCollection) DoNotMergeItems() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Items() always returns a channel with a single data.Stream
|
||||
// representing the object to be persisted
|
||||
func (dc *streamCollection) Items() <-chan data.Stream {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user