Compare commits
7 Commits
main
...
issue-1914
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4ec628df9 | ||
|
|
3089879cec | ||
|
|
f0c638e8b9 | ||
|
|
e413ca1742 | ||
|
|
1ef0daace4 | ||
|
|
2acf01af31 | ||
|
|
591b13388d |
@ -69,6 +69,9 @@ type Collection struct {
|
|||||||
prevPath path.Path
|
prevPath path.Path
|
||||||
|
|
||||||
state data.CollectionState
|
state data.CollectionState
|
||||||
|
|
||||||
|
// doNotMergeItems should only be true if the old delta token expired.
|
||||||
|
doNotMergeItems bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExchangeDataCollection creates an ExchangeDataCollection.
|
// NewExchangeDataCollection creates an ExchangeDataCollection.
|
||||||
@ -156,12 +159,14 @@ func (col Collection) PreviousPath() path.Path {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
|
|
||||||
// hierarchies.
|
|
||||||
func (col Collection) State() data.CollectionState {
|
func (col Collection) State() data.CollectionState {
|
||||||
return col.state
|
return col.state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (col Collection) DoNotMergeItems() bool {
|
||||||
|
return col.doNotMergeItems
|
||||||
|
}
|
||||||
|
|
||||||
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
|
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
|
||||||
// all the M365IDs defined in the jobs field. data channel is closed by this function
|
// all the M365IDs defined in the jobs field. data channel is closed by this function
|
||||||
func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
|
func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
|
||||||
|
|||||||
@ -176,7 +176,9 @@ func populateExchangeContainerResolver(
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func pathAndMatch(
|
// Returns true if the container passes the scope comparison and should be included.
|
||||||
|
// Also returns the path representing the directory.
|
||||||
|
func includeContainer(
|
||||||
qp graph.QueryParams,
|
qp graph.QueryParams,
|
||||||
c graph.CachedContainer,
|
c graph.CachedContainer,
|
||||||
scope selectors.ExchangeScope,
|
scope selectors.ExchangeScope,
|
||||||
@ -213,6 +215,6 @@ func pathAndMatch(
|
|||||||
case path.EventsCategory:
|
case path.EventsCategory:
|
||||||
return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory)
|
return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory)
|
||||||
default:
|
default:
|
||||||
return nil, false
|
return dirPath, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,7 +7,6 @@ import (
|
|||||||
|
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
|
||||||
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
|
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
@ -20,22 +19,12 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// carries details about delta retrieval in aggregators
|
||||||
errEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
type deltaUpdate struct {
|
||||||
errItemNotFound = "ErrorItemNotFound"
|
// the deltaLink itself
|
||||||
)
|
url string
|
||||||
|
// true if the old delta was marked as invalid
|
||||||
var errContainerDeleted = errors.New("container deleted")
|
reset bool
|
||||||
|
|
||||||
func hasErrorCode(err error, code string) bool {
|
|
||||||
var oDataError *odataerrors.ODataError
|
|
||||||
if !errors.As(err, &oDataError) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return oDataError.GetError() != nil &&
|
|
||||||
oDataError.GetError().GetCode() != nil &&
|
|
||||||
*oDataError.GetError().GetCode() == code
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterContainersAndFillCollections is a utility function
|
// filterContainersAndFillCollections is a utility function
|
||||||
@ -60,8 +49,8 @@ func filterContainersAndFillCollections(
|
|||||||
deltaURLs = map[string]string{}
|
deltaURLs = map[string]string{}
|
||||||
currPaths = map[string]string{}
|
currPaths = map[string]string{}
|
||||||
// copy of previousPaths. any folder found in the resolver get
|
// copy of previousPaths. any folder found in the resolver get
|
||||||
// deleted from this map, leaving only the deleted maps behind
|
// deleted from this map, leaving only the deleted folders behind
|
||||||
deletedPaths = map[string]DeltaPath{}
|
tombstones = makeTombstones(dps)
|
||||||
)
|
)
|
||||||
|
|
||||||
getJobs, err := getFetchIDFunc(qp.Category)
|
getJobs, err := getFetchIDFunc(qp.Category)
|
||||||
@ -83,12 +72,10 @@ func filterContainersAndFillCollections(
|
|||||||
}
|
}
|
||||||
|
|
||||||
cID := *c.GetId()
|
cID := *c.GetId()
|
||||||
|
delete(tombstones, cID)
|
||||||
|
|
||||||
// this folder exists (probably), do not delete it.
|
currPath, ok := includeContainer(qp, c, scope)
|
||||||
delete(deletedPaths, cID)
|
|
||||||
|
|
||||||
// Only create a collection if the path matches the scope.
|
// Only create a collection if the path matches the scope.
|
||||||
currPath, ok := pathAndMatch(qp, c, scope)
|
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -103,22 +90,35 @@ func filterContainersAndFillCollections(
|
|||||||
if len(prevPathStr) > 0 {
|
if len(prevPathStr) > 0 {
|
||||||
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
|
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
|
||||||
logger.Ctx(ctx).Error(err)
|
logger.Ctx(ctx).Error(err)
|
||||||
|
// if the previous path is unusable, then the delta must be, too.
|
||||||
|
prevDelta = ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// race conditions happen, the container might get
|
// race conditions happen, containers might get deleted while
|
||||||
// deleted while this process in flight.
|
// this process is in flight. If it was deleted, we remake the
|
||||||
if errors.Is(err, errContainerDeleted) {
|
// tombstone, just to be sure it gets deleted from storage.
|
||||||
currPath = nil
|
if graph.IsErrDeletedInFlight(err) == nil {
|
||||||
} else {
|
|
||||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||||
|
} else {
|
||||||
|
tombstones[cID] = dp.path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(currDelta) > 0 {
|
if len(newDelta.url) > 0 {
|
||||||
deltaURLs[cID] = currDelta
|
deltaURLs[cID] = newDelta.url
|
||||||
|
}
|
||||||
|
|
||||||
|
if newDelta.reset {
|
||||||
|
// the previous delta was invalid or otherwise unusable.
|
||||||
|
// We need to mark the collection as New, and tombstone
|
||||||
|
// the path to clear out any data for a clean refresh.
|
||||||
|
prevPath = nil
|
||||||
|
tombstones[cID] = dp.path
|
||||||
}
|
}
|
||||||
|
|
||||||
edc := NewCollection(
|
edc := NewCollection(
|
||||||
@ -130,12 +130,8 @@ func filterContainersAndFillCollections(
|
|||||||
statusUpdater,
|
statusUpdater,
|
||||||
ctrlOpts,
|
ctrlOpts,
|
||||||
)
|
)
|
||||||
|
|
||||||
collections[cID] = &edc
|
collections[cID] = &edc
|
||||||
|
|
||||||
if edc.State() == data.DeletedState {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
edc.jobs = append(edc.jobs, jobs...)
|
edc.jobs = append(edc.jobs, jobs...)
|
||||||
|
|
||||||
// add the current path for the container ID to be used in the next backup
|
// add the current path for the container ID to be used in the next backup
|
||||||
@ -143,20 +139,31 @@ func filterContainersAndFillCollections(
|
|||||||
currPaths[cID] = currPath.String()
|
currPaths[cID] = currPath.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// any path that wasn't present in the resolver was deleted by the user.
|
// A tombstone is a collection path that needs to be marked for deletion.
|
||||||
// relocations and renames will have removed the dir by id earlier. What's
|
// Tombstones can occur for a number of reasons: the delta token expired,
|
||||||
// left in deletedPaths are only the previous paths that did not appear as
|
// the container was deleted in flight, or the user deleted the container
|
||||||
// children of the root.
|
// between backup runs. If events combine to both delete and write content
|
||||||
for fID, dp := range deletedPaths {
|
// to the same container (ex: container_1 gets deleted, then container_2
|
||||||
|
// gets created with the same name), it is assumed that the backup consumer
|
||||||
|
// processes deletions before creations, making the combined operation safe.
|
||||||
|
for id, p := range tombstones {
|
||||||
service, err := createService(qp.Credentials)
|
service, err := createService(qp.Credentials)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
prevPath, err := pathFromPrevString(dp.path)
|
// only occurs if it was a new folder that we picked up during the container
|
||||||
|
// resolver phase that got deleted in flight by the time we hit this stage.
|
||||||
|
if len(p) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
prevPath, err := pathFromPrevString(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Ctx(ctx).Error(err)
|
// technically shouldn't ever happen. But just in case, we need to catch
|
||||||
|
// it for protection.
|
||||||
|
logger.Ctx(ctx).Errorw("parsing tombstone path", "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,7 +176,7 @@ func filterContainersAndFillCollections(
|
|||||||
statusUpdater,
|
statusUpdater,
|
||||||
ctrlOpts,
|
ctrlOpts,
|
||||||
)
|
)
|
||||||
collections[fID] = &edc
|
collections[id] = &edc
|
||||||
}
|
}
|
||||||
|
|
||||||
entries := []graph.MetadataCollectionEntry{
|
entries := []graph.MetadataCollectionEntry{
|
||||||
@ -196,6 +203,19 @@ func filterContainersAndFillCollections(
|
|||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// produces a set of id:path pairs from the deltapaths map.
|
||||||
|
// Each entry in the set will, if not removed, produce a collection
|
||||||
|
// that will delete the tombstone by path.
|
||||||
|
func makeTombstones(dps DeltaPaths) map[string]string {
|
||||||
|
r := make(map[string]string, len(dps))
|
||||||
|
|
||||||
|
for id, v := range dps {
|
||||||
|
r[id] = v.path
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func pathFromPrevString(ps string) (path.Path, error) {
|
func pathFromPrevString(ps string) (path.Path, error) {
|
||||||
p, err := path.FromDataLayerPath(ps, false)
|
p, err := path.FromDataLayerPath(ps, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -260,7 +280,7 @@ type FetchIDFunc func(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
gs graph.Servicer,
|
gs graph.Servicer,
|
||||||
user, containerID, oldDeltaToken string,
|
user, containerID, oldDeltaToken string,
|
||||||
) ([]string, string, error)
|
) ([]string, deltaUpdate, error)
|
||||||
|
|
||||||
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
||||||
switch category {
|
switch category {
|
||||||
@ -275,12 +295,16 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// events
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar.
|
// FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar.
|
||||||
func FetchEventIDsFromCalendar(
|
func FetchEventIDsFromCalendar(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
gs graph.Servicer,
|
gs graph.Servicer,
|
||||||
user, calendarID, oldDelta string,
|
user, calendarID, oldDelta string,
|
||||||
) ([]string, string, error) {
|
) ([]string, deltaUpdate, error) {
|
||||||
var (
|
var (
|
||||||
errs *multierror.Error
|
errs *multierror.Error
|
||||||
ids []string
|
ids []string
|
||||||
@ -288,7 +312,7 @@ func FetchEventIDsFromCalendar(
|
|||||||
|
|
||||||
options, err := optionsForEventsByCalendar([]string{"id"})
|
options, err := optionsForEventsByCalendar([]string{"id"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, deltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := gs.Client().
|
builder := gs.Client().
|
||||||
@ -299,15 +323,11 @@ func FetchEventIDsFromCalendar(
|
|||||||
for {
|
for {
|
||||||
resp, err := builder.Get(ctx, options)
|
resp, err := builder.Get(ctx, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if hasErrorCode(err, errItemNotFound) {
|
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||||
// The folder was deleted between the time we populated the container
|
return nil, deltaUpdate{}, err
|
||||||
// cache and when we tried to fetch data for it. All we can do is
|
|
||||||
// return no jobs because we've only pulled basic info about each
|
|
||||||
// item.
|
|
||||||
return nil, "", errors.WithStack(errContainerDeleted)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range resp.GetValue() {
|
for _, item := range resp.GetValue() {
|
||||||
@ -333,25 +353,91 @@ func FetchEventIDsFromCalendar(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Events don't have a delta endpoint so just return an empty string.
|
// Events don't have a delta endpoint so just return an empty string.
|
||||||
return ids, "", errs.ErrorOrNil()
|
return ids, deltaUpdate{}, errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// contacts
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts
|
// FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts
|
||||||
// of the targeted directory
|
// of the targeted directory
|
||||||
func FetchContactIDsFromDirectory(
|
func FetchContactIDsFromDirectory(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
gs graph.Servicer,
|
gs graph.Servicer,
|
||||||
user, directoryID, oldDelta string,
|
user, directoryID, oldDelta string,
|
||||||
) ([]string, string, error) {
|
) ([]string, deltaUpdate, error) {
|
||||||
var (
|
var (
|
||||||
errs *multierror.Error
|
errs *multierror.Error
|
||||||
ids []string
|
ids []string
|
||||||
deltaURL string
|
deltaURL string
|
||||||
|
resetDelta bool
|
||||||
)
|
)
|
||||||
|
|
||||||
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
|
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, deltaURL, errors.Wrap(err, "getting query options")
|
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||||
|
}
|
||||||
|
|
||||||
|
getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error {
|
||||||
|
for {
|
||||||
|
resp, err := builder.Get(ctx, options)
|
||||||
|
if err != nil {
|
||||||
|
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range resp.GetValue() {
|
||||||
|
if item.GetId() == nil {
|
||||||
|
errs = multierror.Append(
|
||||||
|
errs,
|
||||||
|
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(ashmrtn): Handle fail-fast.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ids = append(ids, *item.GetId())
|
||||||
|
}
|
||||||
|
|
||||||
|
delta := resp.GetOdataDeltaLink()
|
||||||
|
if delta != nil && len(*delta) > 0 {
|
||||||
|
deltaURL = *delta
|
||||||
|
}
|
||||||
|
|
||||||
|
nextLink := resp.GetOdataNextLink()
|
||||||
|
if nextLink == nil || len(*nextLink) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(oldDelta) > 0 {
|
||||||
|
err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()))
|
||||||
|
// happy path
|
||||||
|
if err == nil {
|
||||||
|
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||||
|
}
|
||||||
|
// only return on error if it is NOT a delta issue.
|
||||||
|
// otherwise we'll retry the call with the regular builder
|
||||||
|
if graph.IsErrInvalidDelta(err) == nil {
|
||||||
|
return nil, deltaUpdate{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resetDelta = true
|
||||||
|
errs = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := gs.Client().
|
builder := gs.Client().
|
||||||
@ -360,70 +446,95 @@ func FetchContactIDsFromDirectory(
|
|||||||
Contacts().
|
Contacts().
|
||||||
Delta()
|
Delta()
|
||||||
|
|
||||||
if len(oldDelta) > 0 {
|
if err := getIDs(builder); err != nil {
|
||||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter())
|
return nil, deltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||||
resp, err := builder.Get(ctx, options)
|
|
||||||
if err != nil {
|
|
||||||
if hasErrorCode(err, errItemNotFound) {
|
|
||||||
// The folder was deleted between the time we populated the container
|
|
||||||
// cache and when we tried to fetch data for it. All we can do is
|
|
||||||
// return no jobs because we've only pulled basic info about each
|
|
||||||
// item.
|
|
||||||
return nil, "", errors.WithStack(errContainerDeleted)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range resp.GetValue() {
|
|
||||||
if item.GetId() == nil {
|
|
||||||
errs = multierror.Append(
|
|
||||||
errs,
|
|
||||||
errors.Errorf("contact with nil ID in folder %s", directoryID),
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO(ashmrtn): Handle fail-fast.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ids = append(ids, *item.GetId())
|
|
||||||
}
|
|
||||||
|
|
||||||
delta := resp.GetOdataDeltaLink()
|
|
||||||
if delta != nil && len(*delta) > 0 {
|
|
||||||
deltaURL = *delta
|
|
||||||
}
|
|
||||||
|
|
||||||
nextLink := resp.GetOdataNextLink()
|
|
||||||
if nextLink == nil || len(*nextLink) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
|
|
||||||
}
|
|
||||||
|
|
||||||
return ids, deltaURL, errs.ErrorOrNil()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// messages
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
|
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
|
||||||
// of the targeted directory
|
// of the targeted directory
|
||||||
func FetchMessageIDsFromDirectory(
|
func FetchMessageIDsFromDirectory(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
gs graph.Servicer,
|
gs graph.Servicer,
|
||||||
user, directoryID, oldDelta string,
|
user, directoryID, oldDelta string,
|
||||||
) ([]string, string, error) {
|
) ([]string, deltaUpdate, error) {
|
||||||
var (
|
var (
|
||||||
errs *multierror.Error
|
errs *multierror.Error
|
||||||
ids []string
|
ids []string
|
||||||
deltaURL string
|
deltaURL string
|
||||||
|
resetDelta bool
|
||||||
)
|
)
|
||||||
|
|
||||||
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
|
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, deltaURL, errors.Wrap(err, "getting query options")
|
return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||||
|
}
|
||||||
|
|
||||||
|
getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error {
|
||||||
|
for {
|
||||||
|
resp, err := builder.Get(ctx, options)
|
||||||
|
if err != nil {
|
||||||
|
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range resp.GetValue() {
|
||||||
|
if item.GetId() == nil {
|
||||||
|
errs = multierror.Append(
|
||||||
|
errs,
|
||||||
|
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(ashmrtn): Handle fail-fast.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ids = append(ids, *item.GetId())
|
||||||
|
}
|
||||||
|
|
||||||
|
delta := resp.GetOdataDeltaLink()
|
||||||
|
if delta != nil && len(*delta) > 0 {
|
||||||
|
deltaURL = *delta
|
||||||
|
}
|
||||||
|
|
||||||
|
nextLink := resp.GetOdataNextLink()
|
||||||
|
if nextLink == nil || len(*nextLink) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(oldDelta) > 0 {
|
||||||
|
err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()))
|
||||||
|
// happy path
|
||||||
|
if err == nil {
|
||||||
|
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||||
|
}
|
||||||
|
// only return on error if it is NOT a delta issue.
|
||||||
|
// otherwise we'll retry the call with the regular builder
|
||||||
|
if graph.IsErrInvalidDelta(err) == nil {
|
||||||
|
return nil, deltaUpdate{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resetDelta = true
|
||||||
|
errs = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := gs.Client().
|
builder := gs.Client().
|
||||||
@ -432,50 +543,9 @@ func FetchMessageIDsFromDirectory(
|
|||||||
Messages().
|
Messages().
|
||||||
Delta()
|
Delta()
|
||||||
|
|
||||||
if len(oldDelta) > 0 {
|
if err := getIDs(builder); err != nil {
|
||||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())
|
return nil, deltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||||
resp, err := builder.Get(ctx, options)
|
|
||||||
if err != nil {
|
|
||||||
if hasErrorCode(err, errEmailFolderNotFound) {
|
|
||||||
// The folder was deleted between the time we populated the container
|
|
||||||
// cache and when we tried to fetch data for it. All we can do is
|
|
||||||
// return no jobs because we've only pulled basic info about each
|
|
||||||
// item.
|
|
||||||
return nil, "", errors.WithStack(errContainerDeleted)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range resp.GetValue() {
|
|
||||||
if item.GetId() == nil {
|
|
||||||
errs = multierror.Append(
|
|
||||||
errs,
|
|
||||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO(ashmrtn): Handle fail-fast.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ids = append(ids, *item.GetId())
|
|
||||||
}
|
|
||||||
|
|
||||||
delta := resp.GetOdataDeltaLink()
|
|
||||||
if delta != nil && len(*delta) > 0 {
|
|
||||||
deltaURL = *delta
|
|
||||||
}
|
|
||||||
|
|
||||||
nextLink := resp.GetOdataNextLink()
|
|
||||||
if nextLink == nil || len(*nextLink) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
|
|
||||||
}
|
|
||||||
|
|
||||||
return ids, deltaURL, errs.ErrorOrNil()
|
|
||||||
}
|
}
|
||||||
|
|||||||
127
src/internal/connector/graph/errors.go
Normal file
127
src/internal/connector/graph/errors.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Error Interpretation Helpers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
const (
|
||||||
|
errCodeItemNotFound = "ErrorItemNotFound"
|
||||||
|
errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound"
|
||||||
|
errCodeResyncRequired = "ResyncRequired"
|
||||||
|
errCodeSyncStateNotFound = "SyncStateNotFound"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The folder or item was deleted between the time we identified
|
||||||
|
// it and when we tried to fetch data for it.
|
||||||
|
type ErrDeletedInFlight struct {
|
||||||
|
common.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsErrDeletedInFlight(err error) error {
|
||||||
|
if asDeletedInFlight(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasErrorCode(err, errCodeItemNotFound) {
|
||||||
|
return ErrDeletedInFlight{*common.EncapsulateError(err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func asDeletedInFlight(err error) bool {
|
||||||
|
e := ErrDeletedInFlight{}
|
||||||
|
return errors.As(err, &e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delta tokens can be desycned or expired. In either case, the token
|
||||||
|
// becomes invalid, and cannot be used again.
|
||||||
|
// https://learn.microsoft.com/en-us/graph/errors#code-property
|
||||||
|
type ErrInvalidDelta struct {
|
||||||
|
common.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsErrInvalidDelta(err error) error {
|
||||||
|
if asInvalidDelta(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasErrorCode(err, errCodeSyncStateNotFound) ||
|
||||||
|
hasErrorCode(err, errCodeResyncRequired) {
|
||||||
|
return ErrInvalidDelta{*common.EncapsulateError(err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func asInvalidDelta(err error) bool {
|
||||||
|
e := ErrInvalidDelta{}
|
||||||
|
return errors.As(err, &e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timeout errors are identified for tracking the need to retry calls.
|
||||||
|
// Other delay errors, like throttling, are already handled by the
|
||||||
|
// graph client's built-in retries.
|
||||||
|
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
|
||||||
|
type ErrTimeout struct {
|
||||||
|
common.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsErrTimeout(err error) error {
|
||||||
|
if asTimeout(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isTimeoutErr(err) {
|
||||||
|
return ErrTimeout{*common.EncapsulateError(err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func asTimeout(err error) bool {
|
||||||
|
e := ErrTimeout{}
|
||||||
|
return errors.As(err, &e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// error parsers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
func hasErrorCode(err error, code string) bool {
|
||||||
|
if err == nil {
|
||||||
|
fmt.Println("nil")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var oDataError *odataerrors.ODataError
|
||||||
|
if !errors.As(err, &oDataError) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return oDataError.GetError().GetCode() != nil &&
|
||||||
|
*oDataError.GetError().GetCode() == code
|
||||||
|
}
|
||||||
|
|
||||||
|
// isTimeoutErr is used to determine if the Graph error returned is
|
||||||
|
// because of Timeout. This is used to restrict retries to just
|
||||||
|
// timeouts as other errors are handled within a middleware in the
|
||||||
|
// client.
|
||||||
|
func isTimeoutErr(err error) bool {
|
||||||
|
switch err := err.(type) {
|
||||||
|
case *url.Error:
|
||||||
|
return err.Timeout()
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -127,6 +127,10 @@ func (md MetadataCollection) State() data.CollectionState {
|
|||||||
return data.NewState
|
return data.NewState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (md MetadataCollection) DoNotMergeItems() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (md MetadataCollection) Items() <-chan data.Stream {
|
func (md MetadataCollection) Items() <-chan data.Stream {
|
||||||
res := make(chan data.Stream)
|
res := make(chan data.Stream)
|
||||||
|
|
||||||
|
|||||||
@ -57,10 +57,13 @@ func CreateHTTPClient() *nethttp.Client {
|
|||||||
return httpClient
|
return httpClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Logging Middleware
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// LoggingMiddleware can be used to log the http request sent by the graph client
|
// LoggingMiddleware can be used to log the http request sent by the graph client
|
||||||
type LoggingMiddleware struct{}
|
type LoggingMiddleware struct{}
|
||||||
|
|
||||||
// Intercept implements the RequestInterceptor interface and decodes the parameters name
|
|
||||||
func (handler *LoggingMiddleware) Intercept(
|
func (handler *LoggingMiddleware) Intercept(
|
||||||
pipeline khttp.Pipeline,
|
pipeline khttp.Pipeline,
|
||||||
middlewareIndex int,
|
middlewareIndex int,
|
||||||
@ -97,6 +100,10 @@ func (handler *LoggingMiddleware) Intercept(
|
|||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Other Helpers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
func StringToPathCategory(input string) path.CategoryType {
|
func StringToPathCategory(input string) path.CategoryType {
|
||||||
param := strings.ToLower(input)
|
param := strings.ToLower(input)
|
||||||
|
|
||||||
|
|||||||
@ -15,14 +15,15 @@ import (
|
|||||||
|
|
||||||
// MockExchangeDataCollection represents a mock exchange mailbox
|
// MockExchangeDataCollection represents a mock exchange mailbox
|
||||||
type MockExchangeDataCollection struct {
|
type MockExchangeDataCollection struct {
|
||||||
fullPath path.Path
|
fullPath path.Path
|
||||||
messageCount int
|
messageCount int
|
||||||
Data [][]byte
|
Data [][]byte
|
||||||
Names []string
|
Names []string
|
||||||
ModTimes []time.Time
|
ModTimes []time.Time
|
||||||
ColState data.CollectionState
|
ColState data.CollectionState
|
||||||
PrevPath path.Path
|
PrevPath path.Path
|
||||||
DeletedItems []bool
|
DeletedItems []bool
|
||||||
|
doNotMergeItems bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -104,6 +105,10 @@ func (medc MockExchangeDataCollection) State() data.CollectionState {
|
|||||||
return medc.ColState
|
return medc.ColState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (medc MockExchangeDataCollection) DoNotMergeItems() bool {
|
||||||
|
return medc.doNotMergeItems
|
||||||
|
}
|
||||||
|
|
||||||
// Items returns a channel that has the next items in the collection. The
|
// Items returns a channel that has the next items in the collection. The
|
||||||
// channel is closed when there are no more items available.
|
// channel is closed when there are no more items available.
|
||||||
func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {
|
func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {
|
||||||
|
|||||||
@ -4,7 +4,6 @@ package onedrive
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/url"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -58,6 +57,9 @@ type Collection struct {
|
|||||||
statusUpdater support.StatusUpdater
|
statusUpdater support.StatusUpdater
|
||||||
itemReader itemReaderFunc
|
itemReader itemReaderFunc
|
||||||
ctrl control.Options
|
ctrl control.Options
|
||||||
|
|
||||||
|
// should only be true if the old delta token expired
|
||||||
|
doNotMergeItems bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// itemReadFunc returns a reader for the specified item
|
// itemReadFunc returns a reader for the specified item
|
||||||
@ -124,6 +126,10 @@ func (oc Collection) State() data.CollectionState {
|
|||||||
return data.NewState
|
return data.NewState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (oc Collection) DoNotMergeItems() bool {
|
||||||
|
return oc.doNotMergeItems
|
||||||
|
}
|
||||||
|
|
||||||
// Item represents a single item retrieved from OneDrive
|
// Item represents a single item retrieved from OneDrive
|
||||||
type Item struct {
|
type Item struct {
|
||||||
id string
|
id string
|
||||||
@ -156,19 +162,6 @@ func (od *Item) Info() details.ItemInfo {
|
|||||||
// return od.info.Modified
|
// return od.info.Modified
|
||||||
//}
|
//}
|
||||||
|
|
||||||
// isTimeoutErr is used to determine if the Graph error returned is
|
|
||||||
// because of Timeout. This is used to restrict retries to just
|
|
||||||
// timeouts as other errors are handled within a middleware in the
|
|
||||||
// client.
|
|
||||||
func isTimeoutErr(err error) bool {
|
|
||||||
switch err := err.(type) {
|
|
||||||
case *url.Error:
|
|
||||||
return err.Timeout()
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// populateItems iterates through items added to the collection
|
// populateItems iterates through items added to the collection
|
||||||
// and uses the collection `itemReader` to read the item
|
// and uses the collection `itemReader` to read the item
|
||||||
func (oc *Collection) populateItems(ctx context.Context) {
|
func (oc *Collection) populateItems(ctx context.Context) {
|
||||||
@ -228,11 +221,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
for i := 1; i <= maxRetries; i++ {
|
for i := 1; i <= maxRetries; i++ {
|
||||||
itemInfo, itemData, err = oc.itemReader(ctx, item)
|
itemInfo, itemData, err = oc.itemReader(ctx, item)
|
||||||
|
|
||||||
// We only retry if it is a timeout error. Other
|
// retry on Timeout type errors, break otherwise.
|
||||||
// errors like throttling are already handled within
|
if err == nil || graph.IsErrTimeout(err) == nil {
|
||||||
// the graph client via a retry middleware.
|
|
||||||
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
|
|
||||||
if err == nil || !isTimeoutErr(err) {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -81,12 +81,14 @@ func (sc Collection) PreviousPath() path.Path {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
|
|
||||||
// hierarchies.
|
|
||||||
func (sc Collection) State() data.CollectionState {
|
func (sc Collection) State() data.CollectionState {
|
||||||
return data.NewState
|
return data.NewState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sc Collection) DoNotMergeItems() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (sc *Collection) Items() <-chan data.Stream {
|
func (sc *Collection) Items() <-chan data.Stream {
|
||||||
go sc.populate(context.TODO())
|
go sc.populate(context.TODO())
|
||||||
return sc.data
|
return sc.data
|
||||||
|
|||||||
@ -47,6 +47,15 @@ type Collection interface {
|
|||||||
// backup along with all items and Collections below them in the hierarchy
|
// backup along with all items and Collections below them in the hierarchy
|
||||||
// unless said items/Collections were moved.
|
// unless said items/Collections were moved.
|
||||||
State() CollectionState
|
State() CollectionState
|
||||||
|
// DoNotMergeItems informs kopia that the collection is rebuilding its contents
|
||||||
|
// from scratch, and that any items currently stored in that collection should
|
||||||
|
// be skipped during the process of merging historical data into the new backup.
|
||||||
|
// This flag is normally expected to be false. It should only be flagged under
|
||||||
|
// specific circumstances. Example: if the link token used for incremental queries
|
||||||
|
// expires or otherwise becomes unusable, thus requiring the backup producer to
|
||||||
|
// re-discover all data in the container. This flag only affects the path of the
|
||||||
|
// collection, and does not cascade to subfolders.
|
||||||
|
DoNotMergeItems() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream represents a single item within a Collection
|
// Stream represents a single item within a Collection
|
||||||
|
|||||||
@ -30,6 +30,10 @@ func (mc mockColl) State() CollectionState {
|
|||||||
return NewState
|
return NewState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc mockColl) DoNotMergeItems() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type CollectionSuite struct {
|
type CollectionSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
}
|
}
|
||||||
|
|||||||
@ -43,6 +43,10 @@ func (kdc kopiaDataCollection) State() data.CollectionState {
|
|||||||
return data.NewState
|
return data.NewState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kdc kopiaDataCollection) DoNotMergeItems() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type kopiaDataStream struct {
|
type kopiaDataStream struct {
|
||||||
reader io.ReadCloser
|
reader io.ReadCloser
|
||||||
uuid string
|
uuid string
|
||||||
|
|||||||
@ -177,6 +177,10 @@ func (dc *streamCollection) State() data.CollectionState {
|
|||||||
return data.NewState
|
return data.NewState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dc *streamCollection) DoNotMergeItems() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Items() always returns a channel with a single data.Stream
|
// Items() always returns a channel with a single data.Stream
|
||||||
// representing the object to be persisted
|
// representing the object to be persisted
|
||||||
func (dc *streamCollection) Items() <-chan data.Stream {
|
func (dc *streamCollection) Items() <-chan data.Stream {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user