produce coll state and item deletion (#1815)
## Description Fulfills the data collection interfaces to provide item deleted flags and collection state enums. A note on the additional data consumption: I'm making an assumption that the @removed property will appear in that map. This is awaiting testing for verification, which I'll get into as a follow-up step. ## Does this PR need a docs update or release note? - [x] ⛔ No ## Type of change - [x] 🌻 Feature ## Issue(s) * #1727 ## Test Plan - [x] ❓ In a follow-up PR
This commit is contained in:
parent
0de2b40024
commit
c8ad2e03ce
@ -14,3 +14,13 @@ func UnionMaps[K comparable, V any](ms ...map[K]V) map[K]V {
|
|||||||
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CopyMap[K comparable, V any](m map[K]V) map[K]V {
|
||||||
|
r := map[K]V{}
|
||||||
|
|
||||||
|
for k, v := range m {
|
||||||
|
r[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|||||||
@ -1,7 +1,5 @@
|
|||||||
package common
|
package common
|
||||||
|
|
||||||
import "strconv"
|
|
||||||
|
|
||||||
func ContainsString(super []string, sub string) bool {
|
func ContainsString(super []string, sub string) bool {
|
||||||
for _, s := range super {
|
for _, s := range super {
|
||||||
if s == sub {
|
if s == sub {
|
||||||
@ -22,14 +20,3 @@ func First(vs ...string) string {
|
|||||||
|
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseBool returns the bool value represented by the string
|
|
||||||
// or false on error
|
|
||||||
func ParseBool(v string) bool {
|
|
||||||
s, err := strconv.ParseBool(v)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|||||||
14
src/internal/common/strings.go
Normal file
14
src/internal/common/strings.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import "strconv"
|
||||||
|
|
||||||
|
// parseBool returns the bool value represented by the string
|
||||||
|
// or false on error
|
||||||
|
func ParseBool(v string) bool {
|
||||||
|
s, err := strconv.ParseBool(v)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
@ -86,8 +86,7 @@ func (cr *containerResolver) PathInCache(pathString string) (string, bool) {
|
|||||||
// addFolder adds a folder to the cache with the given ID. If the item is
|
// addFolder adds a folder to the cache with the given ID. If the item is
|
||||||
// already in the cache does nothing. The path for the item is not modified.
|
// already in the cache does nothing. The path for the item is not modified.
|
||||||
func (cr *containerResolver) addFolder(cf graph.CacheFolder) error {
|
func (cr *containerResolver) addFolder(cf graph.CacheFolder) error {
|
||||||
// Only require a non-nil non-empty parent if the path isn't already
|
// Only require a non-nil non-empty parent if the path isn't already populated.
|
||||||
// populated.
|
|
||||||
if cf.Path() != nil {
|
if cf.Path() != nil {
|
||||||
if err := checkIDAndName(cf.Container); err != nil {
|
if err := checkIDAndName(cf.Container); err != nil {
|
||||||
return errors.Wrap(err, "adding item to cache")
|
return errors.Wrap(err, "adding item to cache")
|
||||||
|
|||||||
@ -155,11 +155,11 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
|||||||
deltas, paths := emails.deltas, emails.paths
|
deltas, paths := emails.deltas, emails.paths
|
||||||
|
|
||||||
if len(test.expectDeltas) > 0 {
|
if len(test.expectDeltas) > 0 {
|
||||||
assert.NotEmpty(t, deltas, "deltas")
|
assert.Len(t, deltas, len(test.expectDeltas), "deltas len")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(test.expectPaths) > 0 {
|
if len(test.expectPaths) > 0 {
|
||||||
assert.NotEmpty(t, paths, "paths")
|
assert.Len(t, paths, len(test.expectPaths), "paths len")
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range test.expectDeltas {
|
for k, v := range test.expectDeltas {
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
absser "github.com/microsoft/kiota-abstractions-go/serialization"
|
absser "github.com/microsoft/kiota-abstractions-go/serialization"
|
||||||
kw "github.com/microsoft/kiota-serialization-json-go"
|
kioser "github.com/microsoft/kiota-serialization-json-go"
|
||||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -67,12 +67,19 @@ type Collection struct {
|
|||||||
// It may be the same as fullPath, if the folder was not renamed or
|
// It may be the same as fullPath, if the folder was not renamed or
|
||||||
// moved. It will be empty on its first retrieval.
|
// moved. It will be empty on its first retrieval.
|
||||||
prevPath path.Path
|
prevPath path.Path
|
||||||
|
|
||||||
|
state data.CollectionState
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
|
// NewExchangeDataCollection creates an ExchangeDataCollection.
|
||||||
|
// State of the collection is set as an observation of the current
|
||||||
|
// and previous paths. If the curr path is nil, the state is assumed
|
||||||
|
// to be deleted. If the prev path is nil, it is assumed newly created.
|
||||||
|
// If both are populated, then state is either moved (if they differ),
|
||||||
|
// or notMoved (if they match).
|
||||||
func NewCollection(
|
func NewCollection(
|
||||||
user string,
|
user string,
|
||||||
fullPath, prevPath path.Path,
|
curr, prev path.Path,
|
||||||
collectionType optionIdentifier,
|
collectionType optionIdentifier,
|
||||||
service graph.Servicer,
|
service graph.Servicer,
|
||||||
statusUpdater support.StatusUpdater,
|
statusUpdater support.StatusUpdater,
|
||||||
@ -84,15 +91,32 @@ func NewCollection(
|
|||||||
jobs: make([]string, 0),
|
jobs: make([]string, 0),
|
||||||
service: service,
|
service: service,
|
||||||
statusUpdater: statusUpdater,
|
statusUpdater: statusUpdater,
|
||||||
fullPath: fullPath,
|
fullPath: curr,
|
||||||
prevPath: prevPath,
|
prevPath: prev,
|
||||||
collectionType: collectionType,
|
collectionType: collectionType,
|
||||||
ctrl: ctrlOpts,
|
ctrl: ctrlOpts,
|
||||||
|
state: stateOf(prev, curr),
|
||||||
}
|
}
|
||||||
|
|
||||||
return collection
|
return collection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stateOf(prev, curr path.Path) data.CollectionState {
|
||||||
|
if curr == nil || len(curr.String()) == 0 {
|
||||||
|
return data.DeletedState
|
||||||
|
}
|
||||||
|
|
||||||
|
if prev == nil || len(prev.String()) == 0 {
|
||||||
|
return data.NewState
|
||||||
|
}
|
||||||
|
|
||||||
|
if curr.Folder() != prev.Folder() {
|
||||||
|
return data.MovedState
|
||||||
|
}
|
||||||
|
|
||||||
|
return data.NotMovedState
|
||||||
|
}
|
||||||
|
|
||||||
// AddJob appends additional objectID to structure's jobs field
|
// AddJob appends additional objectID to structure's jobs field
|
||||||
func (col *Collection) AddJob(objID string) {
|
func (col *Collection) AddJob(objID string) {
|
||||||
col.jobs = append(col.jobs, objID)
|
col.jobs = append(col.jobs, objID)
|
||||||
@ -135,14 +159,12 @@ func (col Collection) PreviousPath() path.Path {
|
|||||||
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
|
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
|
||||||
// hierarchies.
|
// hierarchies.
|
||||||
func (col Collection) State() data.CollectionState {
|
func (col Collection) State() data.CollectionState {
|
||||||
return data.NewState
|
return col.state
|
||||||
}
|
}
|
||||||
|
|
||||||
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
|
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
|
||||||
// all the M365IDs defined in the jobs field. data channel is closed by this function
|
// all the M365IDs defined in the jobs field. data channel is closed by this function
|
||||||
func (col *Collection) populateByOptionIdentifier(
|
func (col *Collection) populateByOptionIdentifier(ctx context.Context) {
|
||||||
ctx context.Context,
|
|
||||||
) {
|
|
||||||
var (
|
var (
|
||||||
errs error
|
errs error
|
||||||
success int64
|
success int64
|
||||||
@ -210,7 +232,13 @@ func (col *Collection) populateByOptionIdentifier(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
byteCount, err := serializeFunc(ctx, col.service.Client(), kw.NewJsonSerializationWriter(), col.data, response, user)
|
byteCount, err := serializeFunc(
|
||||||
|
ctx,
|
||||||
|
col.service.Client(),
|
||||||
|
kioser.NewJsonSerializationWriter(),
|
||||||
|
col.data,
|
||||||
|
response,
|
||||||
|
user)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errUpdater(user, err)
|
errUpdater(user, err)
|
||||||
return
|
return
|
||||||
@ -264,7 +292,7 @@ func getModTime(mt modTimer) time.Time {
|
|||||||
type GraphSerializeFunc func(
|
type GraphSerializeFunc func(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
client *msgraphsdk.GraphServiceClient,
|
client *msgraphsdk.GraphServiceClient,
|
||||||
objectWriter *kw.JsonSerializationWriter,
|
objectWriter *kioser.JsonSerializationWriter,
|
||||||
dataChannel chan<- data.Stream,
|
dataChannel chan<- data.Stream,
|
||||||
parsable absser.Parsable,
|
parsable absser.Parsable,
|
||||||
user string,
|
user string,
|
||||||
@ -275,7 +303,7 @@ type GraphSerializeFunc func(
|
|||||||
func eventToDataCollection(
|
func eventToDataCollection(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
client *msgraphsdk.GraphServiceClient,
|
client *msgraphsdk.GraphServiceClient,
|
||||||
objectWriter *kw.JsonSerializationWriter,
|
objectWriter *kioser.JsonSerializationWriter,
|
||||||
dataChannel chan<- data.Stream,
|
dataChannel chan<- data.Stream,
|
||||||
parsable absser.Parsable,
|
parsable absser.Parsable,
|
||||||
user string,
|
user string,
|
||||||
@ -342,7 +370,7 @@ func eventToDataCollection(
|
|||||||
func contactToDataCollection(
|
func contactToDataCollection(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
client *msgraphsdk.GraphServiceClient,
|
client *msgraphsdk.GraphServiceClient,
|
||||||
objectWriter *kw.JsonSerializationWriter,
|
objectWriter *kioser.JsonSerializationWriter,
|
||||||
dataChannel chan<- data.Stream,
|
dataChannel chan<- data.Stream,
|
||||||
parsable absser.Parsable,
|
parsable absser.Parsable,
|
||||||
user string,
|
user string,
|
||||||
@ -359,28 +387,32 @@ func contactToDataCollection(
|
|||||||
return 0, support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
|
return 0, support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
|
||||||
}
|
}
|
||||||
|
|
||||||
byteArray, err := objectWriter.GetSerializedContent()
|
bs, err := objectWriter.GetSerializedContent()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, support.WrapAndAppend(*contact.GetId(), err, nil)
|
return 0, support.WrapAndAppend(*contact.GetId(), err, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(byteArray) > 0 {
|
addtl := contact.GetAdditionalData()
|
||||||
|
_, removed := addtl[graph.AddtlDataRemoved]
|
||||||
|
|
||||||
|
if len(bs) > 0 || removed {
|
||||||
dataChannel <- &Stream{
|
dataChannel <- &Stream{
|
||||||
id: *contact.GetId(),
|
id: *contact.GetId(),
|
||||||
message: byteArray,
|
message: bs,
|
||||||
info: ContactInfo(contact, int64(len(byteArray))),
|
info: ContactInfo(contact, int64(len(bs))),
|
||||||
modTime: getModTime(contact),
|
modTime: getModTime(contact),
|
||||||
|
deleted: removed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(byteArray), nil
|
return len(bs), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// messageToDataCollection is the GraphSerializeFunc for models.Messageable
|
// messageToDataCollection is the GraphSerializeFunc for models.Messageable
|
||||||
func messageToDataCollection(
|
func messageToDataCollection(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
client *msgraphsdk.GraphServiceClient,
|
client *msgraphsdk.GraphServiceClient,
|
||||||
objectWriter *kw.JsonSerializationWriter,
|
objectWriter *kioser.JsonSerializationWriter,
|
||||||
dataChannel chan<- data.Stream,
|
dataChannel chan<- data.Stream,
|
||||||
parsable absser.Parsable,
|
parsable absser.Parsable,
|
||||||
user string,
|
user string,
|
||||||
@ -389,54 +421,58 @@ func messageToDataCollection(
|
|||||||
|
|
||||||
defer objectWriter.Close()
|
defer objectWriter.Close()
|
||||||
|
|
||||||
aMessage, ok := parsable.(models.Messageable)
|
msg, ok := parsable.(models.Messageable)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, fmt.Errorf("expected Messageable, got %T", parsable)
|
return 0, fmt.Errorf("expected Messageable, got %T", parsable)
|
||||||
}
|
}
|
||||||
|
|
||||||
if *aMessage.GetHasAttachments() {
|
if *msg.GetHasAttachments() {
|
||||||
// getting all the attachments might take a couple attempts due to filesize
|
// getting all the attachments might take a couple attempts due to filesize
|
||||||
var retriesErr error
|
var retriesErr error
|
||||||
|
|
||||||
for count := 0; count < numberOfRetries; count++ {
|
for count := 0; count < numberOfRetries; count++ {
|
||||||
attached, err := client.
|
attached, err := client.
|
||||||
UsersById(user).
|
UsersById(user).
|
||||||
MessagesById(*aMessage.GetId()).
|
MessagesById(*msg.GetId()).
|
||||||
Attachments().
|
Attachments().
|
||||||
Get(ctx, nil)
|
Get(ctx, nil)
|
||||||
retriesErr = err
|
retriesErr = err
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
aMessage.SetAttachments(attached.GetValue())
|
msg.SetAttachments(attached.GetValue())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if retriesErr != nil {
|
if retriesErr != nil {
|
||||||
logger.Ctx(ctx).Debug("exceeded maximum retries")
|
logger.Ctx(ctx).Debug("exceeded maximum retries")
|
||||||
return 0, support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
|
return 0, support.WrapAndAppend(*msg.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = objectWriter.WriteObjectValue("", aMessage)
|
err = objectWriter.WriteObjectValue("", msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId()))
|
return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *msg.GetId()))
|
||||||
}
|
}
|
||||||
|
|
||||||
byteArray, err := objectWriter.GetSerializedContent()
|
bs, err := objectWriter.GetSerializedContent()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
|
err = support.WrapAndAppend(*msg.GetId(), errors.Wrap(err, "serializing mail content"), nil)
|
||||||
return 0, support.SetNonRecoverableError(err)
|
return 0, support.SetNonRecoverableError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addtl := msg.GetAdditionalData()
|
||||||
|
_, removed := addtl[graph.AddtlDataRemoved]
|
||||||
|
|
||||||
dataChannel <- &Stream{
|
dataChannel <- &Stream{
|
||||||
id: *aMessage.GetId(),
|
id: *msg.GetId(),
|
||||||
message: byteArray,
|
message: bs,
|
||||||
info: MessageInfo(aMessage, int64(len(byteArray))),
|
info: MessageInfo(msg, int64(len(bs))),
|
||||||
modTime: getModTime(aMessage),
|
modTime: getModTime(msg),
|
||||||
|
deleted: removed,
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(byteArray), nil
|
return len(bs), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream represents a single item retrieved from exchange
|
// Stream represents a single item retrieved from exchange
|
||||||
@ -450,6 +486,9 @@ type Stream struct {
|
|||||||
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
|
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
|
||||||
// request to provide modtime in ItemInfo structs.
|
// request to provide modtime in ItemInfo structs.
|
||||||
modTime time.Time
|
modTime time.Time
|
||||||
|
|
||||||
|
// true if the item was marked by graph as deleted.
|
||||||
|
deleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (od *Stream) UUID() string {
|
func (od *Stream) UUID() string {
|
||||||
@ -460,9 +499,8 @@ func (od *Stream) ToReader() io.ReadCloser {
|
|||||||
return io.NopCloser(bytes.NewReader(od.message))
|
return io.NopCloser(bytes.NewReader(od.message))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ashmrtn): Fill in once delta tokens return deleted items.
|
|
||||||
func (od Stream) Deleted() bool {
|
func (od Stream) Deleted() bool {
|
||||||
return false
|
return od.deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (od *Stream) Info() details.ItemInfo {
|
func (od *Stream) Info() details.ItemInfo {
|
||||||
|
|||||||
@ -8,6 +8,8 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -104,3 +106,53 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() {
|
|||||||
|
|
||||||
suite.Equal(len(shopping), len(eoc.jobs))
|
suite.Equal(len(shopping), len(eoc.jobs))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() {
|
||||||
|
fooP, err := path.Builder{}.
|
||||||
|
Append("foo").
|
||||||
|
ToDataLayerExchangePathForCategory("t", "u", path.EmailCategory, false)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
barP, err := path.Builder{}.
|
||||||
|
Append("bar").
|
||||||
|
ToDataLayerExchangePathForCategory("t", "u", path.EmailCategory, false)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
prev path.Path
|
||||||
|
curr path.Path
|
||||||
|
expect data.CollectionState
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "new",
|
||||||
|
curr: fooP,
|
||||||
|
expect: data.NewState,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "not moved",
|
||||||
|
prev: fooP,
|
||||||
|
curr: fooP,
|
||||||
|
expect: data.NotMovedState,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "moved",
|
||||||
|
prev: fooP,
|
||||||
|
curr: barP,
|
||||||
|
expect: data.MovedState,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "deleted",
|
||||||
|
prev: fooP,
|
||||||
|
expect: data.DeletedState,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
c := NewCollection(
|
||||||
|
"u",
|
||||||
|
test.curr, test.prev,
|
||||||
|
0, nil, nil, control.Options{})
|
||||||
|
assert.Equal(t, test.expect, c.State())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -79,6 +79,9 @@ func (mc *mailFolderCache) Populate(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Even though this uses the `Delta` query, we do no store or re-use
|
||||||
|
// the delta-link tokens like with other queries. The goal is always
|
||||||
|
// to retrieve the complete history of folders.
|
||||||
query := mc.
|
query := mc.
|
||||||
gs.
|
gs.
|
||||||
Client().
|
Client().
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import (
|
|||||||
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
|
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
@ -56,9 +57,12 @@ func FilterContainersAndFillCollections(
|
|||||||
var (
|
var (
|
||||||
errs error
|
errs error
|
||||||
oi = CategoryToOptionIdentifier(qp.Category)
|
oi = CategoryToOptionIdentifier(qp.Category)
|
||||||
// folder ID -> delta url for folder.
|
// folder ID -> delta url or folder path lookups
|
||||||
deltaURLs = map[string]string{}
|
deltaURLs = map[string]string{}
|
||||||
currPaths = map[string]string{}
|
currPaths = map[string]string{}
|
||||||
|
// copy of previousPaths. any folder found in the resolver get
|
||||||
|
// deleted from this map, leaving only the deleted maps behind
|
||||||
|
deletedPaths = common.CopyMap(dps.paths)
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, c := range resolver.Items() {
|
for _, c := range resolver.Items() {
|
||||||
@ -68,26 +72,23 @@ func FilterContainersAndFillCollections(
|
|||||||
|
|
||||||
cID := *c.GetId()
|
cID := *c.GetId()
|
||||||
|
|
||||||
dirPath, ok := pathAndMatch(qp, c, scope)
|
delete(deletedPaths, cID)
|
||||||
|
|
||||||
|
// Only create a collection if the path matches the scope.
|
||||||
|
currPath, ok := pathAndMatch(qp, c, scope)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var prevPath path.Path
|
var prevPath path.Path
|
||||||
|
|
||||||
if ps, ok := dps.paths[cID]; ok {
|
if p, ok := dps.paths[cID]; ok {
|
||||||
// see below for the issue with building paths for root
|
var err error
|
||||||
// folders that have no displayName.
|
if prevPath, err = pathFromPrevString(p); err != nil {
|
||||||
ps = strings.TrimSuffix(ps, rootFolderAlias)
|
logger.Ctx(ctx).Error(err)
|
||||||
|
|
||||||
if pp, err := path.FromDataLayerPath(ps, false); err != nil {
|
|
||||||
logger.Ctx(ctx).Error("parsing previous path string")
|
|
||||||
} else {
|
|
||||||
prevPath = pp
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create only those that match
|
|
||||||
service, err := createService(qp.Credentials)
|
service, err := createService(qp.Credentials)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||||
@ -100,11 +101,11 @@ func FilterContainersAndFillCollections(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var deleted bool
|
var deletedInFlight bool
|
||||||
|
|
||||||
jobs, delta, err := fetchFunc(ctx, service, qp.ResourceOwner, cID, dps.deltas[cID])
|
jobs, delta, err := fetchFunc(ctx, service, qp.ResourceOwner, cID, dps.deltas[cID])
|
||||||
if err != nil && !errors.Is(err, errContainerDeleted) {
|
if err != nil && !errors.Is(err, errContainerDeleted) {
|
||||||
deleted = true
|
deletedInFlight = true
|
||||||
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,13 +115,13 @@ func FilterContainersAndFillCollections(
|
|||||||
|
|
||||||
// Delay creating the new container so we can handle setting the current
|
// Delay creating the new container so we can handle setting the current
|
||||||
// path correctly if the folder was deleted.
|
// path correctly if the folder was deleted.
|
||||||
if deleted {
|
if deletedInFlight {
|
||||||
dirPath = nil
|
currPath = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
edc := NewCollection(
|
edc := NewCollection(
|
||||||
qp.ResourceOwner,
|
qp.ResourceOwner,
|
||||||
dirPath,
|
currPath,
|
||||||
prevPath,
|
prevPath,
|
||||||
oi,
|
oi,
|
||||||
service,
|
service,
|
||||||
@ -129,7 +130,7 @@ func FilterContainersAndFillCollections(
|
|||||||
)
|
)
|
||||||
collections[cID] = &edc
|
collections[cID] = &edc
|
||||||
|
|
||||||
if deleted {
|
if deletedInFlight {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,7 +138,36 @@ func FilterContainersAndFillCollections(
|
|||||||
|
|
||||||
// add the current path for the container ID to be used in the next backup
|
// add the current path for the container ID to be used in the next backup
|
||||||
// as the "previous path", for reference in case of a rename or relocation.
|
// as the "previous path", for reference in case of a rename or relocation.
|
||||||
currPaths[cID] = dirPath.Folder()
|
currPaths[cID] = currPath.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// any path that wasn't present in the resolver was deleted by the user.
|
||||||
|
// relocations and renames will have removed the dir by id earlier. What's
|
||||||
|
// left in deletedPaths are only the previous paths that did not appear as
|
||||||
|
// children of the root.
|
||||||
|
for fID, ps := range deletedPaths {
|
||||||
|
service, err := createService(qp.Credentials)
|
||||||
|
if err != nil {
|
||||||
|
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
prevPath, err := pathFromPrevString(ps)
|
||||||
|
if err != nil {
|
||||||
|
logger.Ctx(ctx).Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
edc := NewCollection(
|
||||||
|
qp.ResourceOwner,
|
||||||
|
nil, // marks the collection as deleted
|
||||||
|
prevPath,
|
||||||
|
oi,
|
||||||
|
service,
|
||||||
|
statusUpdater,
|
||||||
|
ctrlOpts,
|
||||||
|
)
|
||||||
|
collections[fID] = &edc
|
||||||
}
|
}
|
||||||
|
|
||||||
entries := []graph.MetadataCollectionEntry{
|
entries := []graph.MetadataCollectionEntry{
|
||||||
@ -164,6 +194,15 @@ func FilterContainersAndFillCollections(
|
|||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pathFromPrevString(ps string) (path.Path, error) {
|
||||||
|
p, err := path.FromDataLayerPath(ps, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "parsing previous path string")
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
func IterativeCollectContactContainers(
|
func IterativeCollectContactContainers(
|
||||||
containers map[string]graph.Container,
|
containers map[string]graph.Container,
|
||||||
nameContains string,
|
nameContains string,
|
||||||
|
|||||||
27
src/internal/connector/graph/consts.go
Normal file
27
src/internal/connector/graph/consts.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// item response AdditionalData
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
const (
|
||||||
|
// AddtlDataRemoved is the key value in the AdditionalData map
|
||||||
|
// for when an item was deleted.
|
||||||
|
//nolint:lll
|
||||||
|
// https://learn.microsoft.com/en-us/graph/delta-query-overview?tabs=http#resource-representation-in-the-delta-query-response
|
||||||
|
AddtlDataRemoved = "@removed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Metadata Files
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DeltaURLsFileName is the name of the file containing delta token(s) for a
|
||||||
|
// given endpoint. The endpoint granularity varies by service.
|
||||||
|
DeltaURLsFileName = "delta"
|
||||||
|
|
||||||
|
// PreviousPathFileName is the name of the file containing previous path(s) for a
|
||||||
|
// given endpoint.
|
||||||
|
PreviousPathFileName = "previouspath"
|
||||||
|
)
|
||||||
@ -9,15 +9,6 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// DeltaURLsFileName is the name of the file containing delta token(s) for a
|
|
||||||
// given endpoint. The endpoint granularity varies by service.
|
|
||||||
DeltaURLsFileName = "delta"
|
|
||||||
// PreviousPathFileName is the name of the file containing previous path(s) for a
|
|
||||||
// given endpoint.
|
|
||||||
PreviousPathFileName = "previouspath"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AllMetadataFileNames produces the standard set of filenames used to store graph
|
// AllMetadataFileNames produces the standard set of filenames used to store graph
|
||||||
// metadata such as delta tokens and folderID->path references.
|
// metadata such as delta tokens and folderID->path references.
|
||||||
func AllMetadataFileNames() []string {
|
func AllMetadataFileNames() []string {
|
||||||
|
|||||||
@ -129,6 +129,9 @@ type Item struct {
|
|||||||
id string
|
id string
|
||||||
data io.ReadCloser
|
data io.ReadCloser
|
||||||
info details.ItemInfo
|
info details.ItemInfo
|
||||||
|
|
||||||
|
// true if the item was marked by graph as deleted.
|
||||||
|
deleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (od *Item) UUID() string {
|
func (od *Item) UUID() string {
|
||||||
@ -141,7 +144,7 @@ func (od *Item) ToReader() io.ReadCloser {
|
|||||||
|
|
||||||
// TODO(ashmrtn): Fill in once delta tokens return deleted items.
|
// TODO(ashmrtn): Fill in once delta tokens return deleted items.
|
||||||
func (od Item) Deleted() bool {
|
func (od Item) Deleted() bool {
|
||||||
return false
|
return od.deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (od *Item) Info() details.ItemInfo {
|
func (od *Item) Info() details.ItemInfo {
|
||||||
|
|||||||
@ -91,6 +91,9 @@ type Item struct {
|
|||||||
data io.ReadCloser
|
data io.ReadCloser
|
||||||
info *details.SharePointInfo
|
info *details.SharePointInfo
|
||||||
modTime time.Time
|
modTime time.Time
|
||||||
|
|
||||||
|
// true if the item was marked by graph as deleted.
|
||||||
|
deleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sd *Item) UUID() string {
|
func (sd *Item) UUID() string {
|
||||||
@ -101,9 +104,8 @@ func (sd *Item) ToReader() io.ReadCloser {
|
|||||||
return sd.data
|
return sd.data
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ashmrtn): Fill in once delta tokens return deleted items.
|
|
||||||
func (sd Item) Deleted() bool {
|
func (sd Item) Deleted() bool {
|
||||||
return false
|
return sd.deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sd *Item) Info() details.ItemInfo {
|
func (sd *Item) Info() details.ItemInfo {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user