Compare commits

...

7 Commits

Author SHA1 Message Date
ryanfkeepers
f4ec628df9 add doNotMergeItems func to collections
Adds a new func to the data.Collections iface:
DoNotMergeItems.  This propagates a flag that
tells kopia when it should not add items from
previous snapshots into the current snapshot
for the given collection.  This is needed in cases
like delta token expiration, where we are forced
to re-discover all items in the container instead
of performing an incremental lookup.
2022-12-22 13:19:11 -07:00
ryanfkeepers
3089879cec track tombstones by id, not path 2022-12-22 12:51:08 -07:00
ryanfkeepers
f0c638e8b9 comment out delta if prevpath is bad 2022-12-22 12:51:08 -07:00
ryanfkeepers
e413ca1742 fix contact folder options 2022-12-22 12:51:08 -07:00
ryanfkeepers
1ef0daace4 fix import order in graph/errors 2022-12-22 12:51:08 -07:00
ryanfkeepers
2acf01af31 minor fixes from code review 2022-12-22 12:51:08 -07:00
ryanfkeepers
591b13388d handle invalid or expired deltas
If a delta token expires or is otherwise invalid,
the backup should fall back to the same
behavior as if the collection were new.  It must
collect the full delta of information, and create
a collection with New state.  If the previous
container moved between the two deltas,
it should be marked for deletion with a tomb-
stone collection.
2022-12-22 12:51:08 -07:00
13 changed files with 420 additions and 187 deletions

View File

@ -69,6 +69,9 @@ type Collection struct {
prevPath path.Path prevPath path.Path
state data.CollectionState state data.CollectionState
// doNotMergeItems should only be true if the old delta token expired.
doNotMergeItems bool
} }
// NewExchangeDataCollection creates an ExchangeDataCollection. // NewExchangeDataCollection creates an ExchangeDataCollection.
@ -156,12 +159,14 @@ func (col Collection) PreviousPath() path.Path {
return nil return nil
} }
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
// hierarchies.
func (col Collection) State() data.CollectionState { func (col Collection) State() data.CollectionState {
return col.state return col.state
} }
func (col Collection) DoNotMergeItems() bool {
return col.doNotMergeItems
}
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize // populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the jobs field. data channel is closed by this function // all the M365IDs defined in the jobs field. data channel is closed by this function
func (col *Collection) populateByOptionIdentifier(ctx context.Context) { func (col *Collection) populateByOptionIdentifier(ctx context.Context) {

View File

@ -176,7 +176,9 @@ func populateExchangeContainerResolver(
return res, nil return res, nil
} }
func pathAndMatch( // Returns true if the container passes the scope comparison and should be included.
// Also returns the path representing the directory.
func includeContainer(
qp graph.QueryParams, qp graph.QueryParams,
c graph.CachedContainer, c graph.CachedContainer,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
@ -213,6 +215,6 @@ func pathAndMatch(
case path.EventsCategory: case path.EventsCategory:
return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory) return dirPath, scope.Matches(selectors.ExchangeEventCalendar, directory)
default: default:
return nil, false return dirPath, false
} }
} }

View File

@ -7,7 +7,6 @@ import (
multierror "github.com/hashicorp/go-multierror" multierror "github.com/hashicorp/go-multierror"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
msuser "github.com/microsoftgraph/msgraph-sdk-go/users" msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -20,22 +19,12 @@ import (
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
) )
const ( // carries details about delta retrieval in aggregators
errEmailFolderNotFound = "ErrorSyncFolderNotFound" type deltaUpdate struct {
errItemNotFound = "ErrorItemNotFound" // the deltaLink itself
) url string
// true if the old delta was marked as invalid
var errContainerDeleted = errors.New("container deleted") reset bool
func hasErrorCode(err error, code string) bool {
var oDataError *odataerrors.ODataError
if !errors.As(err, &oDataError) {
return false
}
return oDataError.GetError() != nil &&
oDataError.GetError().GetCode() != nil &&
*oDataError.GetError().GetCode() == code
} }
// filterContainersAndFillCollections is a utility function // filterContainersAndFillCollections is a utility function
@ -60,8 +49,8 @@ func filterContainersAndFillCollections(
deltaURLs = map[string]string{} deltaURLs = map[string]string{}
currPaths = map[string]string{} currPaths = map[string]string{}
// copy of previousPaths. any folder found in the resolver get // copy of previousPaths. any folder found in the resolver get
// deleted from this map, leaving only the deleted maps behind // deleted from this map, leaving only the deleted folders behind
deletedPaths = map[string]DeltaPath{} tombstones = makeTombstones(dps)
) )
getJobs, err := getFetchIDFunc(qp.Category) getJobs, err := getFetchIDFunc(qp.Category)
@ -83,12 +72,10 @@ func filterContainersAndFillCollections(
} }
cID := *c.GetId() cID := *c.GetId()
delete(tombstones, cID)
// this folder exists (probably), do not delete it. currPath, ok := includeContainer(qp, c, scope)
delete(deletedPaths, cID)
// Only create a collection if the path matches the scope. // Only create a collection if the path matches the scope.
currPath, ok := pathAndMatch(qp, c, scope)
if !ok { if !ok {
continue continue
} }
@ -103,22 +90,35 @@ func filterContainersAndFillCollections(
if len(prevPathStr) > 0 { if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil { if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
logger.Ctx(ctx).Error(err) logger.Ctx(ctx).Error(err)
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
} }
} }
jobs, currDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta) jobs, newDelta, err := getJobs(ctx, service, qp.ResourceOwner, cID, prevDelta)
if err != nil { if err != nil {
// race conditions happen, the container might get // race conditions happen, containers might get deleted while
// deleted while this process in flight. // this process is in flight. If it was deleted, we remake the
if errors.Is(err, errContainerDeleted) { // tombstone, just to be sure it gets deleted from storage.
currPath = nil if graph.IsErrDeletedInFlight(err) == nil {
} else {
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
} else {
tombstones[cID] = dp.path
} }
continue
} }
if len(currDelta) > 0 { if len(newDelta.url) > 0 {
deltaURLs[cID] = currDelta deltaURLs[cID] = newDelta.url
}
if newDelta.reset {
// the previous delta was invalid or otherwise unusable.
// We need to mark the collection as New, and tombstone
// the path to clear out any data for a clean refresh.
prevPath = nil
tombstones[cID] = dp.path
} }
edc := NewCollection( edc := NewCollection(
@ -130,12 +130,8 @@ func filterContainersAndFillCollections(
statusUpdater, statusUpdater,
ctrlOpts, ctrlOpts,
) )
collections[cID] = &edc collections[cID] = &edc
if edc.State() == data.DeletedState {
continue
}
edc.jobs = append(edc.jobs, jobs...) edc.jobs = append(edc.jobs, jobs...)
// add the current path for the container ID to be used in the next backup // add the current path for the container ID to be used in the next backup
@ -143,20 +139,31 @@ func filterContainersAndFillCollections(
currPaths[cID] = currPath.String() currPaths[cID] = currPath.String()
} }
// any path that wasn't present in the resolver was deleted by the user. // A tombstone is a collection path that needs to be marked for deletion.
// relocations and renames will have removed the dir by id earlier. What's // Tombstones can occur for a number of reasons: the delta token expired,
// left in deletedPaths are only the previous paths that did not appear as // the container was deleted in flight, or the user deleted the container
// children of the root. // between backup runs. If events combine to both delete and write content
for fID, dp := range deletedPaths { // to the same container (ex: container_1 gets deleted, then container_2
// gets created with the same name), it is assumed that the backup consumer
// processes deletions before creations, making the combined operation safe.
for id, p := range tombstones {
service, err := createService(qp.Credentials) service, err := createService(qp.Credentials)
if err != nil { if err != nil {
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
continue continue
} }
prevPath, err := pathFromPrevString(dp.path) // only occurs if it was a new folder that we picked up during the container
// resolver phase that got deleted in flight by the time we hit this stage.
if len(p) == 0 {
continue
}
prevPath, err := pathFromPrevString(p)
if err != nil { if err != nil {
logger.Ctx(ctx).Error(err) // technically shouldn't ever happen. But just in case, we need to catch
// it for protection.
logger.Ctx(ctx).Errorw("parsing tombstone path", "err", err)
continue continue
} }
@ -169,7 +176,7 @@ func filterContainersAndFillCollections(
statusUpdater, statusUpdater,
ctrlOpts, ctrlOpts,
) )
collections[fID] = &edc collections[id] = &edc
} }
entries := []graph.MetadataCollectionEntry{ entries := []graph.MetadataCollectionEntry{
@ -196,6 +203,19 @@ func filterContainersAndFillCollections(
return errs return errs
} }
// produces a set of id:path pairs from the deltapaths map.
// Each entry in the set will, if not removed, produce a collection
// that will delete the tombstone by path.
func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]string, len(dps))
for id, v := range dps {
r[id] = v.path
}
return r
}
func pathFromPrevString(ps string) (path.Path, error) { func pathFromPrevString(ps string) (path.Path, error) {
p, err := path.FromDataLayerPath(ps, false) p, err := path.FromDataLayerPath(ps, false)
if err != nil { if err != nil {
@ -260,7 +280,7 @@ type FetchIDFunc func(
ctx context.Context, ctx context.Context,
gs graph.Servicer, gs graph.Servicer,
user, containerID, oldDeltaToken string, user, containerID, oldDeltaToken string,
) ([]string, string, error) ) ([]string, deltaUpdate, error)
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
switch category { switch category {
@ -275,12 +295,16 @@ func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
} }
} }
// ---------------------------------------------------------------------------
// events
// ---------------------------------------------------------------------------
// FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar. // FetchEventIDsFromCalendar returns a list of all M365IDs of events of the targeted Calendar.
func FetchEventIDsFromCalendar( func FetchEventIDsFromCalendar(
ctx context.Context, ctx context.Context,
gs graph.Servicer, gs graph.Servicer,
user, calendarID, oldDelta string, user, calendarID, oldDelta string,
) ([]string, string, error) { ) ([]string, deltaUpdate, error) {
var ( var (
errs *multierror.Error errs *multierror.Error
ids []string ids []string
@ -288,7 +312,7 @@ func FetchEventIDsFromCalendar(
options, err := optionsForEventsByCalendar([]string{"id"}) options, err := optionsForEventsByCalendar([]string{"id"})
if err != nil { if err != nil {
return nil, "", err return nil, deltaUpdate{}, err
} }
builder := gs.Client(). builder := gs.Client().
@ -299,15 +323,11 @@ func FetchEventIDsFromCalendar(
for { for {
resp, err := builder.Get(ctx, options) resp, err := builder.Get(ctx, options)
if err != nil { if err != nil {
if hasErrorCode(err, errItemNotFound) { if err := graph.IsErrDeletedInFlight(err); err != nil {
// The folder was deleted between the time we populated the container return nil, deltaUpdate{}, err
// cache and when we tried to fetch data for it. All we can do is
// return no jobs because we've only pulled basic info about each
// item.
return nil, "", errors.WithStack(errContainerDeleted)
} }
return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err)) return nil, deltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
} }
for _, item := range resp.GetValue() { for _, item := range resp.GetValue() {
@ -333,25 +353,91 @@ func FetchEventIDsFromCalendar(
} }
// Events don't have a delta endpoint so just return an empty string. // Events don't have a delta endpoint so just return an empty string.
return ids, "", errs.ErrorOrNil() return ids, deltaUpdate{}, errs.ErrorOrNil()
} }
// ---------------------------------------------------------------------------
// contacts
// ---------------------------------------------------------------------------
// FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts // FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts
// of the targeted directory // of the targeted directory
func FetchContactIDsFromDirectory( func FetchContactIDsFromDirectory(
ctx context.Context, ctx context.Context,
gs graph.Servicer, gs graph.Servicer,
user, directoryID, oldDelta string, user, directoryID, oldDelta string,
) ([]string, string, error) { ) ([]string, deltaUpdate, error) {
var ( var (
errs *multierror.Error errs *multierror.Error
ids []string ids []string
deltaURL string deltaURL string
resetDelta bool
) )
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"}) options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
if err != nil { if err != nil {
return nil, deltaURL, errors.Wrap(err, "getting query options") return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
}
getIDs := func(builder *msuser.ItemContactFoldersItemContactsDeltaRequestBuilder) error {
for {
resp, err := builder.Get(ctx, options)
if err != nil {
if err := graph.IsErrDeletedInFlight(err); err != nil {
return err
}
if err := graph.IsErrInvalidDelta(err); err != nil {
return err
}
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
if item.GetId() == nil {
errs = multierror.Append(
errs,
errors.Errorf("item with nil ID in folder %s", directoryID),
)
// TODO(ashmrtn): Handle fail-fast.
continue
}
ids = append(ids, *item.GetId())
}
delta := resp.GetOdataDeltaLink()
if delta != nil && len(*delta) > 0 {
deltaURL = *delta
}
nextLink := resp.GetOdataNextLink()
if nextLink == nil || len(*nextLink) == 0 {
break
}
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
}
return nil
}
if len(oldDelta) > 0 {
err := getIDs(msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()))
// happy path
if err == nil {
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
}
// only return on error if it is NOT a delta issue.
// otherwise we'll retry the call with the regular builder
if graph.IsErrInvalidDelta(err) == nil {
return nil, deltaUpdate{}, err
}
resetDelta = true
errs = nil
} }
builder := gs.Client(). builder := gs.Client().
@ -360,70 +446,95 @@ func FetchContactIDsFromDirectory(
Contacts(). Contacts().
Delta() Delta()
if len(oldDelta) > 0 { if err := getIDs(builder); err != nil {
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()) return nil, deltaUpdate{}, err
} }
for { return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
resp, err := builder.Get(ctx, options)
if err != nil {
if hasErrorCode(err, errItemNotFound) {
// The folder was deleted between the time we populated the container
// cache and when we tried to fetch data for it. All we can do is
// return no jobs because we've only pulled basic info about each
// item.
return nil, "", errors.WithStack(errContainerDeleted)
}
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
if item.GetId() == nil {
errs = multierror.Append(
errs,
errors.Errorf("contact with nil ID in folder %s", directoryID),
)
// TODO(ashmrtn): Handle fail-fast.
continue
}
ids = append(ids, *item.GetId())
}
delta := resp.GetOdataDeltaLink()
if delta != nil && len(*delta) > 0 {
deltaURL = *delta
}
nextLink := resp.GetOdataNextLink()
if nextLink == nil || len(*nextLink) == 0 {
break
}
builder = msuser.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, gs.Adapter())
}
return ids, deltaURL, errs.ErrorOrNil()
} }
// ---------------------------------------------------------------------------
// messages
// ---------------------------------------------------------------------------
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail // FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
// of the targeted directory // of the targeted directory
func FetchMessageIDsFromDirectory( func FetchMessageIDsFromDirectory(
ctx context.Context, ctx context.Context,
gs graph.Servicer, gs graph.Servicer,
user, directoryID, oldDelta string, user, directoryID, oldDelta string,
) ([]string, string, error) { ) ([]string, deltaUpdate, error) {
var ( var (
errs *multierror.Error errs *multierror.Error
ids []string ids []string
deltaURL string deltaURL string
resetDelta bool
) )
options, err := optionsForFolderMessagesDelta([]string{"isRead"}) options, err := optionsForFolderMessagesDelta([]string{"isRead"})
if err != nil { if err != nil {
return nil, deltaURL, errors.Wrap(err, "getting query options") return nil, deltaUpdate{}, errors.Wrap(err, "getting query options")
}
getIDs := func(builder *msuser.ItemMailFoldersItemMessagesDeltaRequestBuilder) error {
for {
resp, err := builder.Get(ctx, options)
if err != nil {
if err := graph.IsErrDeletedInFlight(err); err != nil {
return err
}
if err := graph.IsErrInvalidDelta(err); err != nil {
return err
}
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
if item.GetId() == nil {
errs = multierror.Append(
errs,
errors.Errorf("item with nil ID in folder %s", directoryID),
)
// TODO(ashmrtn): Handle fail-fast.
continue
}
ids = append(ids, *item.GetId())
}
delta := resp.GetOdataDeltaLink()
if delta != nil && len(*delta) > 0 {
deltaURL = *delta
}
nextLink := resp.GetOdataNextLink()
if nextLink == nil || len(*nextLink) == 0 {
break
}
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
}
return nil
}
if len(oldDelta) > 0 {
err := getIDs(msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()))
// happy path
if err == nil {
return ids, deltaUpdate{deltaURL, false}, errs.ErrorOrNil()
}
// only return on error if it is NOT a delta issue.
// otherwise we'll retry the call with the regular builder
if graph.IsErrInvalidDelta(err) == nil {
return nil, deltaUpdate{}, err
}
resetDelta = true
errs = nil
} }
builder := gs.Client(). builder := gs.Client().
@ -432,50 +543,9 @@ func FetchMessageIDsFromDirectory(
Messages(). Messages().
Delta() Delta()
if len(oldDelta) > 0 { if err := getIDs(builder); err != nil {
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()) return nil, deltaUpdate{}, err
} }
for { return ids, deltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
resp, err := builder.Get(ctx, options)
if err != nil {
if hasErrorCode(err, errEmailFolderNotFound) {
// The folder was deleted between the time we populated the container
// cache and when we tried to fetch data for it. All we can do is
// return no jobs because we've only pulled basic info about each
// item.
return nil, "", errors.WithStack(errContainerDeleted)
}
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
for _, item := range resp.GetValue() {
if item.GetId() == nil {
errs = multierror.Append(
errs,
errors.Errorf("item with nil ID in folder %s", directoryID),
)
// TODO(ashmrtn): Handle fail-fast.
continue
}
ids = append(ids, *item.GetId())
}
delta := resp.GetOdataDeltaLink()
if delta != nil && len(*delta) > 0 {
deltaURL = *delta
}
nextLink := resp.GetOdataNextLink()
if nextLink == nil || len(*nextLink) == 0 {
break
}
builder = msuser.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, gs.Adapter())
}
return ids, deltaURL, errs.ErrorOrNil()
} }

View File

@ -0,0 +1,127 @@
package graph
import (
"fmt"
"net/url"
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common"
)
// ---------------------------------------------------------------------------
// Error Interpretation Helpers
// ---------------------------------------------------------------------------
const (
errCodeItemNotFound = "ErrorItemNotFound"
errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound"
errCodeResyncRequired = "ResyncRequired"
errCodeSyncStateNotFound = "SyncStateNotFound"
)
// The folder or item was deleted between the time we identified
// it and when we tried to fetch data for it.
type ErrDeletedInFlight struct {
common.Err
}
func IsErrDeletedInFlight(err error) error {
if asDeletedInFlight(err) {
return err
}
if hasErrorCode(err, errCodeItemNotFound) {
return ErrDeletedInFlight{*common.EncapsulateError(err)}
}
return nil
}
func asDeletedInFlight(err error) bool {
e := ErrDeletedInFlight{}
return errors.As(err, &e)
}
// Delta tokens can be desycned or expired. In either case, the token
// becomes invalid, and cannot be used again.
// https://learn.microsoft.com/en-us/graph/errors#code-property
type ErrInvalidDelta struct {
common.Err
}
func IsErrInvalidDelta(err error) error {
if asInvalidDelta(err) {
return err
}
if hasErrorCode(err, errCodeSyncStateNotFound) ||
hasErrorCode(err, errCodeResyncRequired) {
return ErrInvalidDelta{*common.EncapsulateError(err)}
}
return nil
}
func asInvalidDelta(err error) bool {
e := ErrInvalidDelta{}
return errors.As(err, &e)
}
// Timeout errors are identified for tracking the need to retry calls.
// Other delay errors, like throttling, are already handled by the
// graph client's built-in retries.
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
type ErrTimeout struct {
common.Err
}
func IsErrTimeout(err error) error {
if asTimeout(err) {
return err
}
if isTimeoutErr(err) {
return ErrTimeout{*common.EncapsulateError(err)}
}
return nil
}
func asTimeout(err error) bool {
e := ErrTimeout{}
return errors.As(err, &e)
}
// ---------------------------------------------------------------------------
// error parsers
// ---------------------------------------------------------------------------
func hasErrorCode(err error, code string) bool {
if err == nil {
fmt.Println("nil")
return false
}
var oDataError *odataerrors.ODataError
if !errors.As(err, &oDataError) {
return false
}
return oDataError.GetError().GetCode() != nil &&
*oDataError.GetError().GetCode() == code
}
// isTimeoutErr is used to determine if the Graph error returned is
// because of Timeout. This is used to restrict retries to just
// timeouts as other errors are handled within a middleware in the
// client.
func isTimeoutErr(err error) bool {
switch err := err.(type) {
case *url.Error:
return err.Timeout()
default:
return false
}
}

View File

@ -127,6 +127,10 @@ func (md MetadataCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (md MetadataCollection) DoNotMergeItems() bool {
return false
}
func (md MetadataCollection) Items() <-chan data.Stream { func (md MetadataCollection) Items() <-chan data.Stream {
res := make(chan data.Stream) res := make(chan data.Stream)

View File

@ -57,10 +57,13 @@ func CreateHTTPClient() *nethttp.Client {
return httpClient return httpClient
} }
// ---------------------------------------------------------------------------
// Logging Middleware
// ---------------------------------------------------------------------------
// LoggingMiddleware can be used to log the http request sent by the graph client // LoggingMiddleware can be used to log the http request sent by the graph client
type LoggingMiddleware struct{} type LoggingMiddleware struct{}
// Intercept implements the RequestInterceptor interface and decodes the parameters name
func (handler *LoggingMiddleware) Intercept( func (handler *LoggingMiddleware) Intercept(
pipeline khttp.Pipeline, pipeline khttp.Pipeline,
middlewareIndex int, middlewareIndex int,
@ -97,6 +100,10 @@ func (handler *LoggingMiddleware) Intercept(
return resp, err return resp, err
} }
// ---------------------------------------------------------------------------
// Other Helpers
// ---------------------------------------------------------------------------
func StringToPathCategory(input string) path.CategoryType { func StringToPathCategory(input string) path.CategoryType {
param := strings.ToLower(input) param := strings.ToLower(input)

View File

@ -15,14 +15,15 @@ import (
// MockExchangeDataCollection represents a mock exchange mailbox // MockExchangeDataCollection represents a mock exchange mailbox
type MockExchangeDataCollection struct { type MockExchangeDataCollection struct {
fullPath path.Path fullPath path.Path
messageCount int messageCount int
Data [][]byte Data [][]byte
Names []string Names []string
ModTimes []time.Time ModTimes []time.Time
ColState data.CollectionState ColState data.CollectionState
PrevPath path.Path PrevPath path.Path
DeletedItems []bool DeletedItems []bool
doNotMergeItems bool
} }
var ( var (
@ -104,6 +105,10 @@ func (medc MockExchangeDataCollection) State() data.CollectionState {
return medc.ColState return medc.ColState
} }
func (medc MockExchangeDataCollection) DoNotMergeItems() bool {
return medc.doNotMergeItems
}
// Items returns a channel that has the next items in the collection. The // Items returns a channel that has the next items in the collection. The
// channel is closed when there are no more items available. // channel is closed when there are no more items available.
func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {

View File

@ -4,7 +4,6 @@ package onedrive
import ( import (
"context" "context"
"io" "io"
"net/url"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -58,6 +57,9 @@ type Collection struct {
statusUpdater support.StatusUpdater statusUpdater support.StatusUpdater
itemReader itemReaderFunc itemReader itemReaderFunc
ctrl control.Options ctrl control.Options
// should only be true if the old delta token expired
doNotMergeItems bool
} }
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
@ -124,6 +126,10 @@ func (oc Collection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (oc Collection) DoNotMergeItems() bool {
return oc.doNotMergeItems
}
// Item represents a single item retrieved from OneDrive // Item represents a single item retrieved from OneDrive
type Item struct { type Item struct {
id string id string
@ -156,19 +162,6 @@ func (od *Item) Info() details.ItemInfo {
// return od.info.Modified // return od.info.Modified
//} //}
// isTimeoutErr is used to determine if the Graph error returned is
// because of Timeout. This is used to restrict retries to just
// timeouts as other errors are handled within a middleware in the
// client.
func isTimeoutErr(err error) bool {
switch err := err.(type) {
case *url.Error:
return err.Timeout()
default:
return false
}
}
// populateItems iterates through items added to the collection // populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item // and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context) { func (oc *Collection) populateItems(ctx context.Context) {
@ -228,11 +221,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
for i := 1; i <= maxRetries; i++ { for i := 1; i <= maxRetries; i++ {
itemInfo, itemData, err = oc.itemReader(ctx, item) itemInfo, itemData, err = oc.itemReader(ctx, item)
// We only retry if it is a timeout error. Other // retry on Timeout type errors, break otherwise.
// errors like throttling are already handled within if err == nil || graph.IsErrTimeout(err) == nil {
// the graph client via a retry middleware.
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
if err == nil || !isTimeoutErr(err) {
break break
} }

View File

@ -81,12 +81,14 @@ func (sc Collection) PreviousPath() path.Path {
return nil return nil
} }
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
// hierarchies.
func (sc Collection) State() data.CollectionState { func (sc Collection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (sc Collection) DoNotMergeItems() bool {
return false
}
func (sc *Collection) Items() <-chan data.Stream { func (sc *Collection) Items() <-chan data.Stream {
go sc.populate(context.TODO()) go sc.populate(context.TODO())
return sc.data return sc.data

View File

@ -47,6 +47,15 @@ type Collection interface {
// backup along with all items and Collections below them in the hierarchy // backup along with all items and Collections below them in the hierarchy
// unless said items/Collections were moved. // unless said items/Collections were moved.
State() CollectionState State() CollectionState
// DoNotMergeItems informs kopia that the collection is rebuilding its contents
// from scratch, and that any items currently stored in that collection should
// be skipped during the process of merging historical data into the new backup.
// This flag is normally expected to be false. It should only be flagged under
// specific circumstances. Example: if the link token used for incremental queries
// expires or otherwise becomes unusable, thus requiring the backup producer to
// re-discover all data in the container. This flag only affects the path of the
// collection, and does not cascade to subfolders.
DoNotMergeItems() bool
} }
// Stream represents a single item within a Collection // Stream represents a single item within a Collection

View File

@ -30,6 +30,10 @@ func (mc mockColl) State() CollectionState {
return NewState return NewState
} }
func (mc mockColl) DoNotMergeItems() bool {
return false
}
type CollectionSuite struct { type CollectionSuite struct {
suite.Suite suite.Suite
} }

View File

@ -43,6 +43,10 @@ func (kdc kopiaDataCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (kdc kopiaDataCollection) DoNotMergeItems() bool {
return false
}
type kopiaDataStream struct { type kopiaDataStream struct {
reader io.ReadCloser reader io.ReadCloser
uuid string uuid string

View File

@ -177,6 +177,10 @@ func (dc *streamCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (dc *streamCollection) DoNotMergeItems() bool {
return false
}
// Items() always returns a channel with a single data.Stream // Items() always returns a channel with a single data.Stream
// representing the object to be persisted // representing the object to be persisted
func (dc *streamCollection) Items() <-chan data.Stream { func (dc *streamCollection) Items() <-chan data.Stream {