diff --git a/src/internal/common/maps.go b/src/internal/common/maps.go index 901a3b4c6..5a8b873b2 100644 --- a/src/internal/common/maps.go +++ b/src/internal/common/maps.go @@ -14,3 +14,13 @@ func UnionMaps[K comparable, V any](ms ...map[K]V) map[K]V { return r } + +func CopyMap[K comparable, V any](m map[K]V) map[K]V { + r := map[K]V{} + + for k, v := range m { + r[k] = v + } + + return r +} diff --git a/src/internal/common/slices.go b/src/internal/common/slices.go index 2a9e02003..7600400e9 100644 --- a/src/internal/common/slices.go +++ b/src/internal/common/slices.go @@ -1,7 +1,5 @@ package common -import "strconv" - func ContainsString(super []string, sub string) bool { for _, s := range super { if s == sub { @@ -22,14 +20,3 @@ func First(vs ...string) string { return "" } - -// parseBool returns the bool value represented by the string -// or false on error -func ParseBool(v string) bool { - s, err := strconv.ParseBool(v) - if err != nil { - return false - } - - return s -} diff --git a/src/internal/common/strings.go b/src/internal/common/strings.go new file mode 100644 index 000000000..e8db07c94 --- /dev/null +++ b/src/internal/common/strings.go @@ -0,0 +1,14 @@ +package common + +import "strconv" + +// parseBool returns the bool value represented by the string +// or false on error +func ParseBool(v string) bool { + s, err := strconv.ParseBool(v) + if err != nil { + return false + } + + return s +} diff --git a/src/internal/connector/exchange/container_resolver.go b/src/internal/connector/exchange/container_resolver.go index bc0851724..d1d8e2a35 100644 --- a/src/internal/connector/exchange/container_resolver.go +++ b/src/internal/connector/exchange/container_resolver.go @@ -86,8 +86,7 @@ func (cr *containerResolver) PathInCache(pathString string) (string, bool) { // addFolder adds a folder to the cache with the given ID. If the item is // already in the cache does nothing. The path for the item is not modified. func (cr *containerResolver) addFolder(cf graph.CacheFolder) error { - // Only require a non-nil non-empty parent if the path isn't already - // populated. + // Only require a non-nil non-empty parent if the path isn't already populated. if cf.Path() != nil { if err := checkIDAndName(cf.Container); err != nil { return errors.Wrap(err, "adding item to cache") diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index c57afb037..173189abc 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -155,11 +155,11 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { deltas, paths := emails.deltas, emails.paths if len(test.expectDeltas) > 0 { - assert.NotEmpty(t, deltas, "deltas") + assert.Len(t, deltas, len(test.expectDeltas), "deltas len") } if len(test.expectPaths) > 0 { - assert.NotEmpty(t, paths, "paths") + assert.Len(t, paths, len(test.expectPaths), "paths len") } for k, v := range test.expectDeltas { diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 4795370af..62e05d52d 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -13,7 +13,7 @@ import ( "time" absser "github.com/microsoft/kiota-abstractions-go/serialization" - kw "github.com/microsoft/kiota-serialization-json-go" + kioser "github.com/microsoft/kiota-serialization-json-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/pkg/errors" @@ -67,12 +67,19 @@ type Collection struct { // It may be the same as fullPath, if the folder was not renamed or // moved. It will be empty on its first retrieval. prevPath path.Path + + state data.CollectionState } -// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated +// NewExchangeDataCollection creates an ExchangeDataCollection. +// State of the collection is set as an observation of the current +// and previous paths. If the curr path is nil, the state is assumed +// to be deleted. If the prev path is nil, it is assumed newly created. +// If both are populated, then state is either moved (if they differ), +// or notMoved (if they match). func NewCollection( user string, - fullPath, prevPath path.Path, + curr, prev path.Path, collectionType optionIdentifier, service graph.Servicer, statusUpdater support.StatusUpdater, @@ -84,15 +91,32 @@ func NewCollection( jobs: make([]string, 0), service: service, statusUpdater: statusUpdater, - fullPath: fullPath, - prevPath: prevPath, + fullPath: curr, + prevPath: prev, collectionType: collectionType, ctrl: ctrlOpts, + state: stateOf(prev, curr), } return collection } +func stateOf(prev, curr path.Path) data.CollectionState { + if curr == nil || len(curr.String()) == 0 { + return data.DeletedState + } + + if prev == nil || len(prev.String()) == 0 { + return data.NewState + } + + if curr.Folder() != prev.Folder() { + return data.MovedState + } + + return data.NotMovedState +} + // AddJob appends additional objectID to structure's jobs field func (col *Collection) AddJob(objID string) { col.jobs = append(col.jobs, objID) @@ -135,14 +159,12 @@ func (col Collection) PreviousPath() path.Path { // TODO(ashmrtn): Fill in once GraphConnector compares old and new folder // hierarchies. func (col Collection) State() data.CollectionState { - return data.NewState + return col.state } // populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize // all the M365IDs defined in the jobs field. data channel is closed by this function -func (col *Collection) populateByOptionIdentifier( - ctx context.Context, -) { +func (col *Collection) populateByOptionIdentifier(ctx context.Context) { var ( errs error success int64 @@ -210,7 +232,13 @@ func (col *Collection) populateByOptionIdentifier( return } - byteCount, err := serializeFunc(ctx, col.service.Client(), kw.NewJsonSerializationWriter(), col.data, response, user) + byteCount, err := serializeFunc( + ctx, + col.service.Client(), + kioser.NewJsonSerializationWriter(), + col.data, + response, + user) if err != nil { errUpdater(user, err) return @@ -264,7 +292,7 @@ func getModTime(mt modTimer) time.Time { type GraphSerializeFunc func( ctx context.Context, client *msgraphsdk.GraphServiceClient, - objectWriter *kw.JsonSerializationWriter, + objectWriter *kioser.JsonSerializationWriter, dataChannel chan<- data.Stream, parsable absser.Parsable, user string, @@ -275,7 +303,7 @@ type GraphSerializeFunc func( func eventToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, - objectWriter *kw.JsonSerializationWriter, + objectWriter *kioser.JsonSerializationWriter, dataChannel chan<- data.Stream, parsable absser.Parsable, user string, @@ -342,7 +370,7 @@ func eventToDataCollection( func contactToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, - objectWriter *kw.JsonSerializationWriter, + objectWriter *kioser.JsonSerializationWriter, dataChannel chan<- data.Stream, parsable absser.Parsable, user string, @@ -359,28 +387,32 @@ func contactToDataCollection( return 0, support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId())) } - byteArray, err := objectWriter.GetSerializedContent() + bs, err := objectWriter.GetSerializedContent() if err != nil { return 0, support.WrapAndAppend(*contact.GetId(), err, nil) } - if len(byteArray) > 0 { + addtl := contact.GetAdditionalData() + _, removed := addtl[graph.AddtlDataRemoved] + + if len(bs) > 0 || removed { dataChannel <- &Stream{ id: *contact.GetId(), - message: byteArray, - info: ContactInfo(contact, int64(len(byteArray))), + message: bs, + info: ContactInfo(contact, int64(len(bs))), modTime: getModTime(contact), + deleted: removed, } } - return len(byteArray), nil + return len(bs), nil } // messageToDataCollection is the GraphSerializeFunc for models.Messageable func messageToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, - objectWriter *kw.JsonSerializationWriter, + objectWriter *kioser.JsonSerializationWriter, dataChannel chan<- data.Stream, parsable absser.Parsable, user string, @@ -389,54 +421,58 @@ func messageToDataCollection( defer objectWriter.Close() - aMessage, ok := parsable.(models.Messageable) + msg, ok := parsable.(models.Messageable) if !ok { return 0, fmt.Errorf("expected Messageable, got %T", parsable) } - if *aMessage.GetHasAttachments() { + if *msg.GetHasAttachments() { // getting all the attachments might take a couple attempts due to filesize var retriesErr error for count := 0; count < numberOfRetries; count++ { attached, err := client. UsersById(user). - MessagesById(*aMessage.GetId()). + MessagesById(*msg.GetId()). Attachments(). Get(ctx, nil) retriesErr = err if err == nil { - aMessage.SetAttachments(attached.GetValue()) + msg.SetAttachments(attached.GetValue()) break } } if retriesErr != nil { logger.Ctx(ctx).Debug("exceeded maximum retries") - return 0, support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil) + return 0, support.WrapAndAppend(*msg.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil) } } - err = objectWriter.WriteObjectValue("", aMessage) + err = objectWriter.WriteObjectValue("", msg) if err != nil { - return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId())) + return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *msg.GetId())) } - byteArray, err := objectWriter.GetSerializedContent() + bs, err := objectWriter.GetSerializedContent() if err != nil { - err = support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) + err = support.WrapAndAppend(*msg.GetId(), errors.Wrap(err, "serializing mail content"), nil) return 0, support.SetNonRecoverableError(err) } + addtl := msg.GetAdditionalData() + _, removed := addtl[graph.AddtlDataRemoved] + dataChannel <- &Stream{ - id: *aMessage.GetId(), - message: byteArray, - info: MessageInfo(aMessage, int64(len(byteArray))), - modTime: getModTime(aMessage), + id: *msg.GetId(), + message: bs, + info: MessageInfo(msg, int64(len(bs))), + modTime: getModTime(msg), + deleted: removed, } - return len(byteArray), nil + return len(bs), nil } // Stream represents a single item retrieved from exchange @@ -450,6 +486,9 @@ type Stream struct { // TODO(ashmrtn): Can probably eventually be sourced from info as there's a // request to provide modtime in ItemInfo structs. modTime time.Time + + // true if the item was marked by graph as deleted. + deleted bool } func (od *Stream) UUID() string { @@ -460,9 +499,8 @@ func (od *Stream) ToReader() io.ReadCloser { return io.NopCloser(bytes.NewReader(od.message)) } -// TODO(ashmrtn): Fill in once delta tokens return deleted items. func (od Stream) Deleted() bool { - return false + return od.deleted } func (od *Stream) Info() details.ItemInfo { diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index aa572410d..7192729e8 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -8,6 +8,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" ) @@ -104,3 +106,53 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() { suite.Equal(len(shopping), len(eoc.jobs)) } + +func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() { + fooP, err := path.Builder{}. + Append("foo"). + ToDataLayerExchangePathForCategory("t", "u", path.EmailCategory, false) + require.NoError(suite.T(), err) + barP, err := path.Builder{}. + Append("bar"). + ToDataLayerExchangePathForCategory("t", "u", path.EmailCategory, false) + require.NoError(suite.T(), err) + + table := []struct { + name string + prev path.Path + curr path.Path + expect data.CollectionState + }{ + { + name: "new", + curr: fooP, + expect: data.NewState, + }, + { + name: "not moved", + prev: fooP, + curr: fooP, + expect: data.NotMovedState, + }, + { + name: "moved", + prev: fooP, + curr: barP, + expect: data.MovedState, + }, + { + name: "deleted", + prev: fooP, + expect: data.DeletedState, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + c := NewCollection( + "u", + test.curr, test.prev, + 0, nil, nil, control.Options{}) + assert.Equal(t, test.expect, c.State()) + }) + } +} diff --git a/src/internal/connector/exchange/mail_folder_cache.go b/src/internal/connector/exchange/mail_folder_cache.go index c1e993f6c..aa39df1e8 100644 --- a/src/internal/connector/exchange/mail_folder_cache.go +++ b/src/internal/connector/exchange/mail_folder_cache.go @@ -79,6 +79,9 @@ func (mc *mailFolderCache) Populate( return err } + // Even though this uses the `Delta` query, we do no store or re-use + // the delta-link tokens like with other queries. The goal is always + // to retrieve the complete history of folders. query := mc. gs. Client(). diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 085f8f20b..aee60df19 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -11,6 +11,7 @@ import ( msuser "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" @@ -56,9 +57,12 @@ func FilterContainersAndFillCollections( var ( errs error oi = CategoryToOptionIdentifier(qp.Category) - // folder ID -> delta url for folder. + // folder ID -> delta url or folder path lookups deltaURLs = map[string]string{} currPaths = map[string]string{} + // copy of previousPaths. any folder found in the resolver get + // deleted from this map, leaving only the deleted maps behind + deletedPaths = common.CopyMap(dps.paths) ) for _, c := range resolver.Items() { @@ -68,26 +72,23 @@ func FilterContainersAndFillCollections( cID := *c.GetId() - dirPath, ok := pathAndMatch(qp, c, scope) + delete(deletedPaths, cID) + + // Only create a collection if the path matches the scope. + currPath, ok := pathAndMatch(qp, c, scope) if !ok { continue } var prevPath path.Path - if ps, ok := dps.paths[cID]; ok { - // see below for the issue with building paths for root - // folders that have no displayName. - ps = strings.TrimSuffix(ps, rootFolderAlias) - - if pp, err := path.FromDataLayerPath(ps, false); err != nil { - logger.Ctx(ctx).Error("parsing previous path string") - } else { - prevPath = pp + if p, ok := dps.paths[cID]; ok { + var err error + if prevPath, err = pathFromPrevString(p); err != nil { + logger.Ctx(ctx).Error(err) } } - // Create only those that match service, err := createService(qp.Credentials) if err != nil { errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) @@ -100,11 +101,11 @@ func FilterContainersAndFillCollections( continue } - var deleted bool + var deletedInFlight bool jobs, delta, err := fetchFunc(ctx, service, qp.ResourceOwner, cID, dps.deltas[cID]) if err != nil && !errors.Is(err, errContainerDeleted) { - deleted = true + deletedInFlight = true errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) } @@ -114,13 +115,13 @@ func FilterContainersAndFillCollections( // Delay creating the new container so we can handle setting the current // path correctly if the folder was deleted. - if deleted { - dirPath = nil + if deletedInFlight { + currPath = nil } edc := NewCollection( qp.ResourceOwner, - dirPath, + currPath, prevPath, oi, service, @@ -129,7 +130,7 @@ func FilterContainersAndFillCollections( ) collections[cID] = &edc - if deleted { + if deletedInFlight { continue } @@ -137,7 +138,36 @@ func FilterContainersAndFillCollections( // add the current path for the container ID to be used in the next backup // as the "previous path", for reference in case of a rename or relocation. - currPaths[cID] = dirPath.Folder() + currPaths[cID] = currPath.String() + } + + // any path that wasn't present in the resolver was deleted by the user. + // relocations and renames will have removed the dir by id earlier. What's + // left in deletedPaths are only the previous paths that did not appear as + // children of the root. + for fID, ps := range deletedPaths { + service, err := createService(qp.Credentials) + if err != nil { + errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) + continue + } + + prevPath, err := pathFromPrevString(ps) + if err != nil { + logger.Ctx(ctx).Error(err) + continue + } + + edc := NewCollection( + qp.ResourceOwner, + nil, // marks the collection as deleted + prevPath, + oi, + service, + statusUpdater, + ctrlOpts, + ) + collections[fID] = &edc } entries := []graph.MetadataCollectionEntry{ @@ -164,6 +194,15 @@ func FilterContainersAndFillCollections( return errs } +func pathFromPrevString(ps string) (path.Path, error) { + p, err := path.FromDataLayerPath(ps, false) + if err != nil { + return nil, errors.Wrap(err, "parsing previous path string") + } + + return p, nil +} + func IterativeCollectContactContainers( containers map[string]graph.Container, nameContains string, @@ -319,8 +358,8 @@ func FetchContactIDsFromDirectory( Contacts(). Delta() - // TODO(rkeepers): Awaiting full integration of incremental support, else this - // will cause unexpected behavior/errors. + // TODO(rkeepers): Awaiting full integration of incremental support, else this + // will cause unexpected behavior/errors. // if len(oldDelta) > 0 { // builder = msuser.NewUsersItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, gs.Adapter()) // } @@ -393,8 +432,8 @@ func FetchMessageIDsFromDirectory( Messages(). Delta() - // TODO(rkeepers): Awaiting full integration of incremental support, else this - // will cause unexpected behavior/errors. + // TODO(rkeepers): Awaiting full integration of incremental support, else this + // will cause unexpected behavior/errors. // if len(oldDelta) > 0 { // builder = msuser.NewUsersItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()) // } diff --git a/src/internal/connector/graph/consts.go b/src/internal/connector/graph/consts.go new file mode 100644 index 000000000..14dac934f --- /dev/null +++ b/src/internal/connector/graph/consts.go @@ -0,0 +1,27 @@ +package graph + +// --------------------------------------------------------------------------- +// item response AdditionalData +// --------------------------------------------------------------------------- + +const ( + // AddtlDataRemoved is the key value in the AdditionalData map + // for when an item was deleted. + //nolint:lll + // https://learn.microsoft.com/en-us/graph/delta-query-overview?tabs=http#resource-representation-in-the-delta-query-response + AddtlDataRemoved = "@removed" +) + +// --------------------------------------------------------------------------- +// Metadata Files +// --------------------------------------------------------------------------- + +const ( + // DeltaURLsFileName is the name of the file containing delta token(s) for a + // given endpoint. The endpoint granularity varies by service. + DeltaURLsFileName = "delta" + + // PreviousPathFileName is the name of the file containing previous path(s) for a + // given endpoint. + PreviousPathFileName = "previouspath" +) diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index 4218a9fca..f861666e1 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -9,15 +9,6 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -const ( - // DeltaURLsFileName is the name of the file containing delta token(s) for a - // given endpoint. The endpoint granularity varies by service. - DeltaURLsFileName = "delta" - // PreviousPathFileName is the name of the file containing previous path(s) for a - // given endpoint. - PreviousPathFileName = "previouspath" -) - // AllMetadataFileNames produces the standard set of filenames used to store graph // metadata such as delta tokens and folderID->path references. func AllMetadataFileNames() []string { diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index a381e3b25..3f9f51fe5 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -129,6 +129,9 @@ type Item struct { id string data io.ReadCloser info details.ItemInfo + + // true if the item was marked by graph as deleted. + deleted bool } func (od *Item) UUID() string { @@ -141,7 +144,7 @@ func (od *Item) ToReader() io.ReadCloser { // TODO(ashmrtn): Fill in once delta tokens return deleted items. func (od Item) Deleted() bool { - return false + return od.deleted } func (od *Item) Info() details.ItemInfo { diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index 9b04b9d7a..c17b9c732 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -91,6 +91,9 @@ type Item struct { data io.ReadCloser info *details.SharePointInfo modTime time.Time + + // true if the item was marked by graph as deleted. + deleted bool } func (sd *Item) UUID() string { @@ -101,9 +104,8 @@ func (sd *Item) ToReader() io.ReadCloser { return sd.data } -// TODO(ashmrtn): Fill in once delta tokens return deleted items. func (sd Item) Deleted() bool { - return false + return sd.deleted } func (sd *Item) Info() details.ItemInfo {