path pkg rename resouceOwner to protectedResource (#4193)

updaing the path package to the current naming convention. No logic changes.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-09-12 16:05:23 -06:00 committed by GitHub
parent 1fe37e4ba9
commit a2e80a178a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 724 additions and 1285 deletions

View File

@ -806,12 +806,12 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
reasons := []identity.Reasoner{
NewReason(
testTenant,
suite.storePath1.ResourceOwner(),
suite.storePath1.ProtectedResource(),
suite.storePath1.Service(),
suite.storePath1.Category()),
NewReason(
testTenant,
suite.storePath2.ResourceOwner(),
suite.storePath2.ProtectedResource(),
suite.storePath2.Service(),
suite.storePath2.Category()),
}
@ -1072,7 +1072,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() {
reasons := []identity.Reasoner{
NewReason(
testTenant,
storePath.ResourceOwner(),
storePath.ProtectedResource(),
storePath.Service(),
storePath.Category()),
}

View File

@ -249,7 +249,7 @@ func (c *Collections) Get(
// Enumerate drives for the specified resourceOwner
pager := c.handler.NewDrivePager(c.resourceOwner, nil)
drives, err := api.GetAllDrives(ctx, pager, true, maxDrivesRetries)
drives, err := api.GetAllDrives(ctx, pager)
if err != nil {
return nil, false, err
}

View File

@ -12,8 +12,6 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
const maxDrivesRetries = 3
// DeltaUpdate holds the results of a current delta token. It normally
// gets produced when aggregating the addition and removal of items in
// a delta-queryable folder.
@ -72,7 +70,7 @@ func collectItems(
if !invalidPrevDelta {
maps.Copy(newPaths, oldPaths)
pager.SetNext(prevDelta)
pager.SetNextLink(prevDelta)
}
for {
@ -94,10 +92,7 @@ func collectItems(
return DeltaUpdate{}, nil, nil, graph.Wrap(ctx, err, "getting page")
}
vals, err := pager.ValuesIn(page)
if err != nil {
return DeltaUpdate{}, nil, nil, graph.Wrap(ctx, err, "extracting items from response")
}
vals := page.GetValue()
err = collector(
ctx,
@ -126,7 +121,7 @@ func collectItems(
}
logger.Ctx(ctx).Debugw("Found nextLink", "link", nextLink)
pager.SetNext(nextLink)
pager.SetNextLink(nextLink)
}
return DeltaUpdate{URL: newDeltaURL, Reset: invalidPrevDelta}, newPaths, excluded, nil

View File

@ -1,7 +1,6 @@
package drive
import (
"context"
"testing"
"github.com/alcionai/clues"
@ -72,18 +71,9 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
resultDrives = append(resultDrives, d)
}
tooManyRetries := make([]mock.PagerResult[models.Driveable], 0, maxDrivesRetries+1)
for i := 0; i < maxDrivesRetries+1; i++ {
tooManyRetries = append(tooManyRetries, mock.PagerResult[models.Driveable]{
Err: context.DeadlineExceeded,
})
}
table := []struct {
name string
pagerResults []mock.PagerResult[models.Driveable]
retry bool
expectedErr assert.ErrorAssertionFunc
expectedResults []models.Driveable
}{
@ -96,7 +86,6 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: nil,
},
},
retry: false,
expectedErr: assert.NoError,
expectedResults: resultDrives,
},
@ -109,7 +98,6 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: nil,
},
},
retry: false,
expectedErr: assert.NoError,
expectedResults: resultDrives,
},
@ -127,7 +115,6 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: nil,
},
},
retry: false,
expectedErr: assert.NoError,
expectedResults: resultDrives,
},
@ -145,7 +132,6 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: nil,
},
},
retry: false,
expectedErr: assert.NoError,
expectedResults: resultDrives,
},
@ -163,7 +149,6 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: assert.AnError,
},
},
retry: true,
expectedErr: assert.Error,
expectedResults: nil,
},
@ -176,7 +161,6 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: graph.Stack(ctx, mySiteURLNotFound),
},
},
retry: true,
expectedErr: assert.NoError,
expectedResults: nil,
},
@ -189,71 +173,9 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: graph.Stack(ctx, mySiteNotFound),
},
},
retry: true,
expectedErr: assert.NoError,
expectedResults: nil,
},
{
name: "SplitResultsContextTimeoutWithRetries",
pagerResults: []mock.PagerResult[models.Driveable]{
{
Values: resultDrives[:numDriveResults/2],
NextLink: &link,
Err: nil,
},
{
Values: nil,
NextLink: nil,
Err: context.DeadlineExceeded,
},
{
Values: resultDrives[numDriveResults/2:],
NextLink: &emptyLink,
Err: nil,
},
},
retry: true,
expectedErr: assert.NoError,
expectedResults: resultDrives,
},
{
name: "SplitResultsContextTimeoutNoRetries",
pagerResults: []mock.PagerResult[models.Driveable]{
{
Values: resultDrives[:numDriveResults/2],
NextLink: &link,
Err: nil,
},
{
Values: nil,
NextLink: nil,
Err: context.DeadlineExceeded,
},
{
Values: resultDrives[numDriveResults/2:],
NextLink: &emptyLink,
Err: nil,
},
},
retry: false,
expectedErr: assert.Error,
expectedResults: nil,
},
{
name: "TooManyRetries",
pagerResults: append(
[]mock.PagerResult[models.Driveable]{
{
Values: resultDrives[:numDriveResults/2],
NextLink: &link,
Err: nil,
},
},
tooManyRetries...),
retry: true,
expectedErr: assert.Error,
expectedResults: nil,
},
}
for _, test := range table {
suite.Run(test.name, func() {
@ -266,7 +188,7 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
ToReturn: test.pagerResults,
}
drives, err := api.GetAllDrives(ctx, pager, test.retry, maxDrivesRetries)
drives, err := api.GetAllDrives(ctx, pager)
test.expectedErr(t, err, clues.ToCore(err))
assert.ElementsMatch(t, test.expectedResults, drives)

View File

@ -51,7 +51,7 @@ func (suite *ItemIntegrationSuite) SetupSuite() {
pager := suite.service.ac.Drives().NewUserDrivePager(suite.user, nil)
odDrives, err := api.GetAllDrives(ctx, pager, true, maxDrivesRetries)
odDrives, err := api.GetAllDrives(ctx, pager)
require.NoError(t, err, clues.ToCore(err))
// Test Requirement 1: Need a drive
require.Greaterf(t, len(odDrives), 0, "user %s does not have a drive", suite.user)

View File

@ -68,9 +68,7 @@ func (rc *restoreCaches) Populate(
) error {
drives, err := api.GetAllDrives(
ctx,
gdparf.NewDrivePager(protectedResourceID, nil),
true,
maxDrivesRetries)
gdparf.NewDrivePager(protectedResourceID, nil))
if err != nil {
return clues.Wrap(err, "getting drives")
}

View File

@ -704,9 +704,9 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() {
}{
{
name: "Default Contact Folder",
scope: selectors.NewExchangeBackup(users).ContactFolders(
[]string{api.DefaultContacts},
selectors.PrefixMatch())[0],
scope: selectors.
NewExchangeBackup(users).
ContactFolders([]string{api.DefaultContacts}, selectors.PrefixMatch())[0],
},
}
@ -742,8 +742,10 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() {
require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2")
for _, edc := range edcs {
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
count := 0
var (
isMetadata = edc.FullPath().Service() == path.ExchangeMetadataService
count = 0
)
for stream := range edc.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{}

View File

@ -227,7 +227,7 @@ func (sc *Collection) retrieveLists(
lists, err := loadSiteLists(
ctx,
sc.client.Stable,
sc.fullPath.ResourceOwner(),
sc.fullPath.ProtectedResource(),
sc.jobs,
errs)
if err != nil {
@ -290,14 +290,14 @@ func (sc *Collection) retrievePages(
return metrics, clues.New("beta service required").WithClues(ctx)
}
parent, err := as.GetByID(ctx, sc.fullPath.ResourceOwner())
parent, err := as.GetByID(ctx, sc.fullPath.ProtectedResource())
if err != nil {
return metrics, err
}
root := ptr.Val(parent.GetWebUrl())
pages, err := betaAPI.GetSitePages(ctx, betaService, sc.fullPath.ResourceOwner(), sc.jobs, errs)
pages, err := betaAPI.GetSitePages(ctx, betaService, sc.fullPath.ProtectedResource(), sc.jobs, errs)
if err != nil {
return metrics, err
}

View File

@ -69,7 +69,7 @@ func ConsumeRestoreCollections(
ictx = clues.Add(ctx,
"category", category,
"restore_location", clues.Hide(rcc.RestoreConfig.Location),
"resource_owner", clues.Hide(dc.FullPath().ResourceOwner()),
"resource_owner", clues.Hide(dc.FullPath().ProtectedResource()),
"full_path", dc.FullPath())
)
@ -219,7 +219,7 @@ func RestoreListCollection(
var (
metrics = support.CollectionMetrics{}
directory = dc.FullPath()
siteID = directory.ResourceOwner()
siteID = directory.ProtectedResource()
items = dc.Items(ctx, errs)
el = errs.Local()
)
@ -291,7 +291,7 @@ func RestorePageCollection(
var (
metrics = support.CollectionMetrics{}
directory = dc.FullPath()
siteID = directory.ResourceOwner()
siteID = directory.ProtectedResource()
)
trace.Log(ctx, "m365:sharepoint:restorePageCollection", directory.String())

View File

@ -293,7 +293,7 @@ func Wrap(ctx context.Context, e error, msg string) *clues.Err {
var oDataError odataerrors.ODataErrorable
if !errors.As(e, &oDataError) {
return clues.Wrap(e, msg).WithClues(ctx)
return clues.Wrap(e, msg).WithClues(ctx).WithTrace(1)
}
mainMsg, data, innerMsg := errData(oDataError)
@ -316,7 +316,7 @@ func Stack(ctx context.Context, e error) *clues.Err {
var oDataError *odataerrors.ODataError
if !errors.As(e, &oDataError) {
return clues.Stack(e).WithClues(ctx)
return clues.Stack(e).WithClues(ctx).WithTrace(1)
}
mainMsg, data, innerMsg := errData(oDataError)

View File

@ -356,16 +356,14 @@ func (aw *adapterWrap) Send(
ictx := clues.Add(ctx, "request_retry_iter", i)
sp, err = aw.RequestAdapter.Send(ctx, requestInfo, constructor, errorMappings)
if err != nil &&
!(IsErrConnectionReset(err) ||
connectionEnded.Compare(err.Error())) {
return nil, Stack(ictx, err)
}
if err == nil {
break
}
if !IsErrConnectionReset(err) && !connectionEnded.Compare(err.Error()) {
return nil, clues.Stack(err).WithTrace(1).WithClues(ictx)
}
logger.Ctx(ictx).Debug("http connection error")
events.Inc(events.APICall, "connectionerror")

View File

@ -939,7 +939,7 @@ func checkHasCollections(
p, err := loc.ToDataLayerPath(
fp.Tenant(),
fp.ResourceOwner(),
fp.ProtectedResource(),
fp.Service(),
fp.Category(),
false)

View File

@ -57,7 +57,7 @@ func ConsumeRestoreCollections(
ictx = clues.Add(ctx,
"category", category,
"restore_location", clues.Hide(rcc.RestoreConfig.Location),
"protected_resource", clues.Hide(dc.FullPath().ResourceOwner()),
"protected_resource", clues.Hide(dc.FullPath().ProtectedResource()),
"full_path", dc.FullPath())
)

View File

@ -61,7 +61,7 @@ func ConsumeRestoreCollections(
ictx = clues.Add(ctx,
"category", category,
"restore_location", clues.Hide(rcc.RestoreConfig.Location),
"resource_owner", clues.Hide(dc.FullPath().ResourceOwner()),
"resource_owner", clues.Hide(dc.FullPath().ProtectedResource()),
"full_path", dc.FullPath())
)

View File

@ -183,7 +183,7 @@ func backupOutputPathFromRestore(
return path.Build(
inputPath.Tenant(),
inputPath.ResourceOwner(),
inputPath.ProtectedResource(),
inputPath.Service(),
inputPath.Category(),
false,

View File

@ -547,7 +547,7 @@ func consumeBackupCollections(
func matchesReason(reasons []identity.Reasoner, p path.Path) bool {
for _, reason := range reasons {
if p.ResourceOwner() == reason.ProtectedResource() &&
if p.ProtectedResource() == reason.ProtectedResource() &&
p.Service() == reason.Service() &&
p.Category() == reason.Category() {
return true

View File

@ -592,12 +592,12 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems
pathReason1 = kopia.NewReason(
"",
itemPath1.ResourceOwner(),
itemPath1.ProtectedResource(),
itemPath1.Service(),
itemPath1.Category())
pathReason3 = kopia.NewReason(
"",
itemPath3.ResourceOwner(),
itemPath3.ProtectedResource(),
itemPath3.Service(),
itemPath3.Category())
@ -618,7 +618,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems
exchangeLocationPath1 = path.Builder{}.Append("work-display-name")
exchangePathReason1 = kopia.NewReason(
"",
exchangeItemPath1.ResourceOwner(),
exchangeItemPath1.ProtectedResource(),
exchangeItemPath1.Service(),
exchangeItemPath1.Category())
)
@ -728,7 +728,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems
[]string{
itemPath1.Tenant(),
itemPath1.Service().String(),
itemPath1.ResourceOwner(),
itemPath1.ProtectedResource(),
path.UnknownCategory.String(),
},
itemPath1.Folders()...)...),
@ -755,7 +755,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems
[]string{
itemPath1.Tenant(),
path.OneDriveService.String(),
itemPath1.ResourceOwner(),
itemPath1.ProtectedResource(),
path.FilesCategory.String(),
"personal",
"item1",
@ -1269,7 +1269,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsFolde
pathReason1 = kopia.NewReason(
"",
itemPath1.ResourceOwner(),
itemPath1.ProtectedResource(),
itemPath1.Service(),
itemPath1.Category())

View File

@ -44,7 +44,7 @@ func basicLocationPath(repoRef path.Path, locRef *path.Builder) (path.Path, erro
if len(locRef.Elements()) == 0 {
res, err := path.BuildPrefix(
repoRef.Tenant(),
repoRef.ResourceOwner(),
repoRef.ProtectedResource(),
repoRef.Service(),
repoRef.Category())
if err != nil {
@ -56,7 +56,7 @@ func basicLocationPath(repoRef path.Path, locRef *path.Builder) (path.Path, erro
return locRef.ToDataLayerPath(
repoRef.Tenant(),
repoRef.ResourceOwner(),
repoRef.ProtectedResource(),
repoRef.Service(),
repoRef.Category(),
false)

View File

@ -40,7 +40,7 @@ func (suite *RestorePathTransformerUnitSuite) TestGetPaths() {
Append(
repoRef.Tenant(),
repoRef.Service().String(),
repoRef.ResourceOwner(),
repoRef.ProtectedResource(),
repoRef.Category().String()).
Append(unescapedFolders...).
String()

View File

@ -63,28 +63,28 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchange() {
category path.CategoryType
metadataFiles []string
}{
{
name: "Mail",
selector: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup([]string{suite.its.user.ID})
sel.Include(sel.MailFolders([]string{api.MailInbox}, selectors.PrefixMatch()))
sel.DiscreteOwner = suite.its.user.ID
// {
// name: "Mail",
// selector: func() *selectors.ExchangeBackup {
// sel := selectors.NewExchangeBackup([]string{suite.its.user.ID})
// sel.Include(sel.MailFolders([]string{api.MailInbox}, selectors.PrefixMatch()))
// sel.DiscreteOwner = suite.its.user.ID
return sel
},
category: path.EmailCategory,
metadataFiles: exchange.MetadataFileNames(path.EmailCategory),
},
{
name: "Contacts",
selector: func() *selectors.ExchangeBackup {
sel := selectors.NewExchangeBackup([]string{suite.its.user.ID})
sel.Include(sel.ContactFolders([]string{api.DefaultContacts}, selectors.PrefixMatch()))
return sel
},
category: path.ContactsCategory,
metadataFiles: exchange.MetadataFileNames(path.ContactsCategory),
},
// return sel
// },
// category: path.EmailCategory,
// metadataFiles: exchange.MetadataFileNames(path.EmailCategory),
// },
// {
// name: "Contacts",
// selector: func() *selectors.ExchangeBackup {
// sel := selectors.NewExchangeBackup([]string{suite.its.user.ID})
// sel.Include(sel.ContactFolders([]string{api.DefaultContacts}, selectors.PrefixMatch()))
// return sel
// },
// category: path.ContactsCategory,
// metadataFiles: exchange.MetadataFileNames(path.ContactsCategory),
// },
{
name: "Calendar Events",
selector: func() *selectors.ExchangeBackup {

View File

@ -381,7 +381,7 @@ func (suite *SharePointRestoreNightlyIntgSuite) TestRestore_Run_sharepointDelete
Drives().
NewSiteDrivePager(suite.its.site.ID, []string{"id", "name"})
drives, err := api.GetAllDrives(ctx, pgr, false, -1)
drives, err := api.GetAllDrives(ctx, pgr)
require.NoError(t, err, clues.ToCore(err))
var created models.Driveable

View File

@ -104,7 +104,7 @@ func (p repoRefAndLocRef) locationAsRepoRef() path.Path {
res, err := tmp.ToDataLayerPath(
p.RR.Tenant(),
p.RR.ResourceOwner(),
p.RR.ProtectedResource(),
p.RR.Service(),
p.RR.Category(),
len(p.ItemLocation()) > 0)
@ -133,7 +133,7 @@ func mustPathRep(ref string, isItem bool) repoRefAndLocRef {
rr, err := rrPB.ToDataLayerPath(
tmp.Tenant(),
tmp.ResourceOwner(),
tmp.ProtectedResource(),
tmp.Service(),
tmp.Category(),
isItem)

View File

@ -21,7 +21,7 @@ var (
// Resource-specific paths allow access to more information like segments in the
// path. Builders that are turned into resource paths later on do not need to
// manually add prefixes for items that normally appear in the data layer (ex.
// tenant ID, service, user ID, etc).
// tenant ID, service, resource ID, etc).
type Builder struct {
// Unescaped version of elements.
elements Elements
@ -258,7 +258,7 @@ func (pb Builder) ToStreamStorePath(
}
func (pb Builder) ToServiceCategoryMetadataPath(
tenant, protectedResource string,
tenant, resource string,
service ServiceType,
category CategoryType,
isItem bool,
@ -267,7 +267,7 @@ func (pb Builder) ToServiceCategoryMetadataPath(
return nil, err
}
if err := verifyInputValues(tenant, protectedResource); err != nil {
if err := verifyInputValues(tenant, resource); err != nil {
return nil, err
}
@ -292,7 +292,7 @@ func (pb Builder) ToServiceCategoryMetadataPath(
Builder: *pb.withPrefix(
tenant,
metadataService.String(),
protectedResource,
resource,
category.String()),
service: metadataService,
category: category,
@ -303,7 +303,7 @@ func (pb Builder) ToServiceCategoryMetadataPath(
}
func (pb Builder) ToDataLayerPath(
tenant, user string,
tenant, resource string,
service ServiceType,
category CategoryType,
isItem bool,
@ -313,14 +313,14 @@ func (pb Builder) ToDataLayerPath(
return nil, err
}
if err := pb.verifyPrefix(tenant, user); err != nil {
if err := pb.verifyPrefix(tenant, resource); err != nil {
return nil, err
}
prefixItems := append([]string{
tenant,
service.String(),
user,
resource,
category.String(),
}, elems...)
@ -333,18 +333,18 @@ func (pb Builder) ToDataLayerPath(
}
func (pb Builder) ToDataLayerExchangePathForCategory(
tenant, user string,
tenant, resource string,
category CategoryType,
isItem bool,
) (Path, error) {
return pb.ToDataLayerPath(tenant, user, ExchangeService, category, isItem)
return pb.ToDataLayerPath(tenant, resource, ExchangeService, category, isItem)
}
func (pb Builder) ToDataLayerOneDrivePath(
tenant, user string,
tenant, resource string,
isItem bool,
) (Path, error) {
return pb.ToDataLayerPath(tenant, user, OneDriveService, FilesCategory, isItem)
return pb.ToDataLayerPath(tenant, resource, OneDriveService, FilesCategory, isItem)
}
func (pb Builder) ToDataLayerSharePointPath(

View File

@ -59,7 +59,7 @@ func (suite *OneDrivePathSuite) Test_ToOneDrivePath() {
suite.Run(tt.name, func() {
t := suite.T()
p, err := path.Build("tenant", "user", path.OneDriveService, path.FilesCategory, false, tt.pathElements...)
p, err := path.Build("tenant", "resource", path.OneDriveService, path.FilesCategory, false, tt.pathElements...)
require.NoError(suite.T(), err, clues.ToCore(err))
got, err := path.ToDrivePath(p)

View File

@ -82,7 +82,7 @@ type Path interface {
Service() ServiceType
Category() CategoryType
Tenant() string
ResourceOwner() string
ProtectedResource() string
Folder(escaped bool) string
Folders() Elements
Item() string

View File

@ -291,7 +291,7 @@ func (suite *PathUnitSuite) TestFromDataLayerPathErrors() {
func (suite *PathUnitSuite) TestFromDataLayerPath() {
const (
testTenant = "tenant"
testUser = "user"
testResource = "resource"
testElement1 = "folder/"
testElementTrimmed = "folder"
testElement2 = "folder2"
@ -331,7 +331,7 @@ func (suite *PathUnitSuite) TestFromDataLayerPath() {
unescapedPath: fmt.Sprintf(
"%s/%%s/%s/%%s/%s/%s/%s",
testTenant,
testUser,
testResource,
testElement1,
testElement2,
testElement3),
@ -360,7 +360,7 @@ func (suite *PathUnitSuite) TestFromDataLayerPath() {
unescapedPath: fmt.Sprintf(
"/%s//%%s//%s//%%s//%s///%s//%s//",
testTenant,
testUser,
testResource,
testElementTrimmed,
testElement2,
testElement3),
@ -401,7 +401,7 @@ func (suite *PathUnitSuite) TestFromDataLayerPath() {
assert.Equal(t, service, p.Service(), "service")
assert.Equal(t, cat, p.Category(), "category")
assert.Equal(t, testTenant, p.Tenant(), "tenant")
assert.Equal(t, testUser, p.ResourceOwner(), "resource owner")
assert.Equal(t, testResource, p.ProtectedResource(), "resource owner")
fld := p.Folder(false)
escfld := p.Folder(true)

View File

@ -38,9 +38,9 @@ func (rp dataLayerResourcePath) Category() CategoryType {
return rp.category
}
// ResourceOwner returns the user ID or group ID embedded in the
// ResourceOwner returns the resource ID or group ID embedded in the
// dataLayerResourcePath.
func (rp dataLayerResourcePath) ResourceOwner() string {
func (rp dataLayerResourcePath) ProtectedResource() string {
return rp.Builder.elements[2]
}

View File

@ -16,7 +16,7 @@ import (
const (
testTenant = "aTenant"
testUser = "aUser"
testResource = "aResources"
)
var (
@ -27,25 +27,25 @@ var (
missingInfo = []struct {
name string
tenant string
user string
resource string
rest []string
}{
{
name: "NoTenant",
tenant: "",
user: testUser,
resource: testResource,
rest: rest,
},
{
name: "NoResourceOwner",
tenant: testTenant,
user: "",
resource: "",
rest: rest,
},
{
name: "NoFolderOrItem",
tenant: testTenant,
user: testUser,
resource: testResource,
rest: nil,
},
}
@ -74,55 +74,55 @@ var (
serviceCategories = []struct {
service path.ServiceType
category path.CategoryType
pathFunc func(pb *path.Builder, tenant, user string, isItem bool) (path.Path, error)
pathFunc func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error)
}{
{
service: path.ExchangeService,
category: path.EmailCategory,
pathFunc: func(pb *path.Builder, tenant, user string, isItem bool) (path.Path, error) {
return pb.ToDataLayerExchangePathForCategory(tenant, user, path.EmailCategory, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerExchangePathForCategory(tenant, resource, path.EmailCategory, isItem)
},
},
{
service: path.ExchangeService,
category: path.ContactsCategory,
pathFunc: func(pb *path.Builder, tenant, user string, isItem bool) (path.Path, error) {
return pb.ToDataLayerExchangePathForCategory(tenant, user, path.ContactsCategory, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerExchangePathForCategory(tenant, resource, path.ContactsCategory, isItem)
},
},
{
service: path.ExchangeService,
category: path.EventsCategory,
pathFunc: func(pb *path.Builder, tenant, user string, isItem bool) (path.Path, error) {
return pb.ToDataLayerExchangePathForCategory(tenant, user, path.EventsCategory, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerExchangePathForCategory(tenant, resource, path.EventsCategory, isItem)
},
},
{
service: path.OneDriveService,
category: path.FilesCategory,
pathFunc: func(pb *path.Builder, tenant, user string, isItem bool) (path.Path, error) {
return pb.ToDataLayerOneDrivePath(tenant, user, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerOneDrivePath(tenant, resource, isItem)
},
},
{
service: path.SharePointService,
category: path.LibrariesCategory,
pathFunc: func(pb *path.Builder, tenant, site string, isItem bool) (path.Path, error) {
return pb.ToDataLayerSharePointPath(tenant, site, path.LibrariesCategory, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerSharePointPath(tenant, resource, path.LibrariesCategory, isItem)
},
},
{
service: path.SharePointService,
category: path.ListsCategory,
pathFunc: func(pb *path.Builder, tenant, site string, isItem bool) (path.Path, error) {
return pb.ToDataLayerSharePointPath(tenant, site, path.ListsCategory, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerSharePointPath(tenant, resource, path.ListsCategory, isItem)
},
},
{
service: path.SharePointService,
category: path.PagesCategory,
pathFunc: func(pb *path.Builder, tenant, site string, isItem bool) (path.Path, error) {
return pb.ToDataLayerSharePointPath(tenant, site, path.PagesCategory, isItem)
pathFunc: func(pb *path.Builder, tenant, resource string, isItem bool) (path.Path, error) {
return pb.ToDataLayerSharePointPath(tenant, resource, path.PagesCategory, isItem)
},
},
}
@ -154,7 +154,7 @@ func (suite *DataLayerResourcePath) TestMissingInfoErrors() {
_, err := types.pathFunc(
b,
test.tenant,
test.user,
test.resource,
m.isItem)
assert.Error(t, err)
})
@ -176,7 +176,7 @@ func (suite *DataLayerResourcePath) TestMailItemNoFolder() {
p, err := types.pathFunc(
b,
testTenant,
testUser,
testResource,
true)
require.NoError(t, err, clues.ToCore(err))
@ -189,7 +189,7 @@ func (suite *DataLayerResourcePath) TestMailItemNoFolder() {
func (suite *DataLayerResourcePath) TestPopFront() {
expected := path.Builder{}.Append(append(
[]string{path.ExchangeService.String(), testUser, path.EmailCategory.String()},
[]string{path.ExchangeService.String(), testResource, path.EmailCategory.String()},
rest...)...)
for _, m := range modes {
@ -199,7 +199,7 @@ func (suite *DataLayerResourcePath) TestPopFront() {
pb := path.Builder{}.Append(rest...)
p, err := pb.ToDataLayerExchangePathForCategory(
testTenant,
testUser,
testResource,
path.EmailCategory,
m.isItem)
require.NoError(t, err, clues.ToCore(err))
@ -214,7 +214,7 @@ func (suite *DataLayerResourcePath) TestDir() {
elements := []string{
testTenant,
path.ExchangeService.String(),
testUser,
testResource,
path.EmailCategory.String(),
}
@ -223,7 +223,7 @@ func (suite *DataLayerResourcePath) TestDir() {
pb := path.Builder{}.Append(rest...)
p, err := pb.ToDataLayerExchangePathForCategory(
testTenant,
testUser,
testResource,
path.EmailCategory,
m.isItem)
require.NoError(suite.T(), err, clues.ToCore(err))
@ -251,7 +251,7 @@ func (suite *DataLayerResourcePath) TestDir() {
func (suite *DataLayerResourcePath) TestToServiceCategoryMetadataPath() {
tenant := "a-tenant"
user := "a-user"
resource := "a-resource"
table := []struct {
name string
service path.ServiceType
@ -341,7 +341,7 @@ func (suite *DataLayerResourcePath) TestToServiceCategoryMetadataPath() {
t := suite.T()
p, err := path.BuildMetadata(
tenant,
user,
resource,
test.service,
test.category,
false,
@ -393,7 +393,7 @@ func (suite *DataLayerResourcePath) TestToExchangePathForCategory() {
p, err := b.ToDataLayerExchangePathForCategory(
testTenant,
testUser,
testResource,
test.category,
m.isItem)
test.check(t, err, clues.ToCore(err))
@ -405,7 +405,7 @@ func (suite *DataLayerResourcePath) TestToExchangePathForCategory() {
assert.Equal(t, testTenant, p.Tenant())
assert.Equal(t, path.ExchangeService, p.Service())
assert.Equal(t, test.category, p.Category())
assert.Equal(t, testUser, p.ResourceOwner())
assert.Equal(t, testResource, p.ProtectedResource())
assert.Equal(t, strings.Join(m.expectedFolders, "/"), p.Folder(false))
assert.Equal(t, path.Elements(m.expectedFolders), p.Folders())
assert.Equal(t, m.expectedItem, p.Item())
@ -432,7 +432,7 @@ func (suite *PopulatedDataLayerResourcePath) SetupSuite() {
for _, t := range []bool{true, false} {
p, err := base.ToDataLayerExchangePathForCategory(
testTenant,
testUser,
testResource,
path.EmailCategory,
t)
require.NoError(suite.T(), err, clues.ToCore(err))
@ -471,12 +471,12 @@ func (suite *PopulatedDataLayerResourcePath) TestCategory() {
}
}
func (suite *PopulatedDataLayerResourcePath) TestResourceOwner() {
func (suite *PopulatedDataLayerResourcePath) TestProtectedResource() {
for _, m := range modes {
suite.Run(m.name, func() {
t := suite.T()
assert.Equal(t, testUser, suite.paths[m.isItem].ResourceOwner())
assert.Equal(t, testResource, suite.paths[m.isItem].ProtectedResource())
})
}
}
@ -614,7 +614,7 @@ func (suite *PopulatedDataLayerResourcePath) TestUpdateParent() {
buildPath := func(t *testing.T, pth string, isItem bool) path.Path {
pathBuilder := path.Builder{}.Append(strings.Split(pth, "/")...)
item, err := pathBuilder.ToDataLayerOneDrivePath("tenant", "user", isItem)
item, err := pathBuilder.ToDataLayerOneDrivePath("tenant", "resource", isItem)
require.NoError(t, err, "err building path")
return item

View File

@ -348,7 +348,7 @@ func ensureAllUsersInDetails(
continue
}
ro := p.ResourceOwner()
ro := p.ProtectedResource()
if !assert.NotEmpty(t, ro, "resource owner in path: "+rr) {
continue
}

View File

@ -798,7 +798,7 @@ func (suite *ExchangeSelectorSuite) TestExchangeRestore_Reduce() {
joinedFldrs := strings.Join(newElems, "/")
return stubRepoRef(p.Service(), p.Category(), p.ResourceOwner(), joinedFldrs, p.Item())
return stubRepoRef(p.Service(), p.Category(), p.ProtectedResource(), joinedFldrs, p.Item())
}
makeDeets := func(refs ...path.Path) *details.Details {

View File

@ -545,7 +545,7 @@ func reduce[T scopeT, C categoryT](
}
// first check, every entry needs to match the selector's resource owners.
if !matchesResourceOwner.Compare(repoPath.ResourceOwner()) {
if !matchesResourceOwner.Compare(repoPath.ProtectedResource()) {
continue
}

View File

@ -24,13 +24,13 @@ type channelMessageDeltaPageCtrl struct {
options *teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration
}
func (p *channelMessageDeltaPageCtrl) SetNext(nextLink string) {
func (p *channelMessageDeltaPageCtrl) SetNextLink(nextLink string) {
p.builder = teams.NewItemChannelsItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *channelMessageDeltaPageCtrl) GetPage(
ctx context.Context,
) (DeltaPageLinker, error) {
) (DeltaLinkValuer[models.ChatMessageable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
@ -46,10 +46,6 @@ func (p *channelMessageDeltaPageCtrl) Reset(context.Context) {
Delta()
}
func (p *channelMessageDeltaPageCtrl) ValuesIn(l PageLinker) ([]models.ChatMessageable, error) {
return getValues[models.ChatMessageable](l)
}
func (c Channels) NewChannelMessageDeltaPager(
teamID, channelID, prevDelta string,
selectProps ...string,
@ -100,71 +96,30 @@ func (c Channels) GetChannelMessageIDsDelta(
// all replies to the message.
// selectProps = idAnd()
pager = c.NewChannelMessageDeltaPager(teamID, channelID, prevDelta)
invalidPrevDelta = len(prevDelta) == 0
newDeltaLink string
)
// Loop through all pages returned by Graph API.
for {
page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC))
if graph.IsErrInvalidDelta(err) {
logger.Ctx(ctx).Infow("Invalid previous delta", "delta_link", prevDelta)
invalidPrevDelta = true
added = map[string]struct{}{}
deleted = map[string]struct{}{}
pager.Reset(ctx)
continue
}
results, du, err := deltaEnumerateItems[models.ChatMessageable](ctx, pager, prevDelta)
if graph.IsErrInvalidDelta(err) {
logger.Ctx(ctx).Infow("delta token not supported", "delta_link", prevDelta)
added = map[string]struct{}{}
deleted = map[string]struct{}{}
break
return added, deleted, du, nil
}
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "retrieving page of channel messages")
}
vals, err := pager.ValuesIn(page)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "extracting channel messages from response")
}
for _, v := range vals {
if v.GetDeletedDateTime() == nil {
added[ptr.Val(v.GetId())] = struct{}{}
for _, r := range results {
if r.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
added[ptr.Val(r.GetId())] = struct{}{}
} else {
deleted[ptr.Val(v.GetId())] = struct{}{}
deleted[ptr.Val(r.GetId())] = struct{}{}
}
}
nextLink, deltaLink := NextAndDeltaLink(page)
if len(deltaLink) > 0 {
newDeltaLink = deltaLink
}
if len(nextLink) == 0 {
break
}
pager.SetNext(nextLink)
}
logger.Ctx(ctx).Debugf("retrieved %d channel messages", len(added))
du := DeltaUpdate{
URL: newDeltaLink,
Reset: invalidPrevDelta,
}
return added, deleted, du, nil
}
@ -180,13 +135,13 @@ type channelMessageRepliesPageCtrl struct {
options *teams.ItemChannelsItemMessagesItemRepliesRequestBuilderGetRequestConfiguration
}
func (p *channelMessageRepliesPageCtrl) SetNext(nextLink string) {
func (p *channelMessageRepliesPageCtrl) SetNextLink(nextLink string) {
p.builder = teams.NewItemChannelsItemMessagesItemRepliesRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *channelMessageRepliesPageCtrl) GetPage(
ctx context.Context,
) (PageLinker, error) {
) (NextLinkValuer[models.ChatMessageable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
@ -195,10 +150,6 @@ func (p *channelMessageRepliesPageCtrl) GetOdataNextLink() *string {
return ptr.To("")
}
func (p *channelMessageRepliesPageCtrl) ValuesIn(l PageLinker) ([]models.ChatMessageable, error) {
return getValues[models.ChatMessageable](l)
}
func (c Channels) NewChannelMessageRepliesPager(
teamID, channelID, messageID string,
selectProps ...string,
@ -233,42 +184,7 @@ func (c Channels) GetChannelMessageReplies(
ctx context.Context,
teamID, channelID, messageID string,
) ([]models.ChatMessageable, error) {
var (
vs = []models.ChatMessageable{}
// select is not currently enabled for replies.
// selectProps = idAnd(
// "messageType",
// "createdDateTime",
// "from",
// "body")
pager = c.NewChannelMessageRepliesPager(teamID, channelID, messageID)
)
// Loop through all pages returned by Graph API.
for {
page, err := pager.GetPage(ctx)
if err != nil {
return nil, graph.Wrap(ctx, err, "retrieving page of channels")
}
vals, err := pager.ValuesIn(page)
if err != nil {
return nil, graph.Wrap(ctx, err, "extracting channels from response")
}
vs = append(vs, vals...)
nextLink := ptr.Val(page.GetOdataNextLink())
if len(nextLink) == 0 {
break
}
pager.SetNext(nextLink)
}
logger.Ctx(ctx).Debugf("retrieved %d channel message replies", len(vs))
return vs, nil
return enumerateItems[models.ChatMessageable](ctx, c.NewChannelMessageRepliesPager(teamID, channelID, messageID))
}
// ---------------------------------------------------------------------------
@ -283,21 +199,17 @@ type channelPageCtrl struct {
options *teams.ItemChannelsRequestBuilderGetRequestConfiguration
}
func (p *channelPageCtrl) SetNext(nextLink string) {
func (p *channelPageCtrl) SetNextLink(nextLink string) {
p.builder = teams.NewItemChannelsRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *channelPageCtrl) GetPage(
ctx context.Context,
) (PageLinker, error) {
) (NextLinkValuer[models.Channelable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *channelPageCtrl) ValuesIn(l PageLinker) ([]models.Channelable, error) {
return getValues[models.Channelable](l)
}
func (c Channels) NewChannelPager(
teamID string,
) *channelPageCtrl {
@ -323,34 +235,5 @@ func (c Channels) GetChannels(
ctx context.Context,
teamID string,
) ([]models.Channelable, error) {
var (
vs = []models.Channelable{}
pager = c.NewChannelPager(teamID)
)
// Loop through all pages returned by Graph API.
for {
page, err := pager.GetPage(ctx)
if err != nil {
return nil, graph.Wrap(ctx, err, "retrieving page of channels")
}
vals, err := pager.ValuesIn(page)
if err != nil {
return nil, graph.Wrap(ctx, err, "extracting channels from response")
}
vs = append(vs, vals...)
nextLink := ptr.Val(page.GetOdataNextLink())
if len(nextLink) == 0 {
break
}
pager.SetNext(nextLink)
}
logger.Ctx(ctx).Debugf("retrieved %d channels", len(vs))
return vs, nil
return enumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID))
}

View File

@ -10,7 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/path"
)
// ---------------------------------------------------------------------------
@ -22,6 +22,7 @@ import (
// fn(cf) on each one.
// Folder hierarchy is represented in its current state, and does
// not contain historical data.
// TODO: use enumerateItems for containers
func (c Contacts) EnumerateContainers(
ctx context.Context,
userID, baseContainerID string,
@ -90,7 +91,7 @@ func (c Contacts) EnumerateContainers(
// item pager
// ---------------------------------------------------------------------------
var _ itemPager[models.Contactable] = &contactsPageCtrl{}
var _ Pager[models.Contactable] = &contactsPageCtrl{}
type contactsPageCtrl struct {
gs graph.Servicer
@ -100,10 +101,11 @@ type contactsPageCtrl struct {
func (c Contacts) NewContactsPager(
userID, containerID string,
immutableIDs bool,
selectProps ...string,
) itemPager[models.Contactable] {
) Pager[models.Contactable] {
options := &users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)),
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize), preferImmutableIDs(immutableIDs)),
QueryParameters: &users.ItemContactFoldersItemContactsRequestBuilderGetQueryParameters{},
// do NOT set Top. It limits the total items received.
}
@ -123,28 +125,23 @@ func (c Contacts) NewContactsPager(
return &contactsPageCtrl{c.Stable, builder, options}
}
//lint:ignore U1000 False Positive
func (p *contactsPageCtrl) getPage(ctx context.Context) (PageLinkValuer[models.Contactable], error) {
func (p *contactsPageCtrl) GetPage(
ctx context.Context,
) (NextLinkValuer[models.Contactable], error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return EmptyDeltaLinker[models.Contactable]{PageLinkValuer: resp}, nil
return resp, graph.Stack(ctx, err).OrNil()
}
//lint:ignore U1000 False Positive
func (p *contactsPageCtrl) setNext(nextLink string) {
func (p *contactsPageCtrl) SetNextLink(nextLink string) {
p.builder = users.NewItemContactFoldersItemContactsRequestBuilder(nextLink, p.gs.Adapter())
}
//lint:ignore U1000 False Positive
func (c Contacts) GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error) {
ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewContactsPager(userID, containerID, contactCollisionKeyProps()...)
pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager)
if err != nil {
@ -165,7 +162,7 @@ func (c Contacts) GetItemIDsInContainer(
userID, containerID string,
) (map[string]struct{}, error) {
ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewContactsPager(userID, containerID, "id")
pager := c.NewContactsPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager)
if err != nil {
@ -181,69 +178,13 @@ func (c Contacts) GetItemIDsInContainer(
return m, nil
}
// ---------------------------------------------------------------------------
// item ID pager
// ---------------------------------------------------------------------------
var _ DeltaPager[getIDAndAddtler] = &contactIDPager{}
type contactIDPager struct {
gs graph.Servicer
builder *users.ItemContactFoldersItemContactsRequestBuilder
options *users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration
}
func (c Contacts) NewContactIDsPager(
ctx context.Context,
userID, containerID string,
immutableIDs bool,
) DeltaPager[getIDAndAddtler] {
config := &users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration{
QueryParameters: &users.ItemContactFoldersItemContactsRequestBuilderGetQueryParameters{
Select: idAnd(parentFolderID),
// do NOT set Top. It limits the total items received.
},
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize), preferImmutableIDs(immutableIDs)),
}
builder := c.Stable.
Client().
Users().
ByUserIdString(userID).
ContactFolders().
ByContactFolderIdString(containerID).
Contacts()
return &contactIDPager{c.Stable, builder, config}
}
func (p *contactIDPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return EmptyDeltaLinker[models.Contactable]{PageLinkValuer: resp}, nil
}
func (p *contactIDPager) SetNext(nextLink string) {
p.builder = users.NewItemContactFoldersItemContactsRequestBuilder(nextLink, p.gs.Adapter())
}
// non delta pagers don't need reset
func (p *contactIDPager) Reset(context.Context) {}
func (p *contactIDPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Contactable](pl)
}
// ---------------------------------------------------------------------------
// delta item ID pager
// ---------------------------------------------------------------------------
var _ DeltaPager[getIDAndAddtler] = &contactDeltaIDPager{}
var _ DeltaPager[models.Contactable] = &contactDeltaPager{}
type contactDeltaIDPager struct {
type contactDeltaPager struct {
gs graph.Servicer
userID string
containerID string
@ -255,7 +196,6 @@ func getContactDeltaBuilder(
ctx context.Context,
gs graph.Servicer,
userID, containerID string,
options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration,
) *users.ItemContactFoldersItemContactsDeltaRequestBuilder {
builder := gs.Client().
Users().
@ -268,63 +208,75 @@ func getContactDeltaBuilder(
return builder
}
func (c Contacts) NewContactDeltaIDsPager(
func (c Contacts) NewContactsDeltaPager(
ctx context.Context,
userID, containerID, oldDelta string,
userID, containerID, prevDeltaLink string,
immutableIDs bool,
) DeltaPager[getIDAndAddtler] {
selectProps ...string,
) DeltaPager[models.Contactable] {
options := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetQueryParameters{
Select: idAnd(parentFolderID),
// do NOT set Top. It limits the total items received.
},
QueryParameters: &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
}
if len(selectProps) > 0 {
options.QueryParameters.Select = selectProps
}
var builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder
if oldDelta != "" {
builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, c.Stable.Adapter())
if len(prevDeltaLink) > 0 {
builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(prevDeltaLink, c.Stable.Adapter())
} else {
builder = getContactDeltaBuilder(ctx, c.Stable, userID, containerID, options)
builder = getContactDeltaBuilder(ctx, c.Stable, userID, containerID)
}
return &contactDeltaIDPager{c.Stable, userID, containerID, builder, options}
return &contactDeltaPager{c.Stable, userID, containerID, builder, options}
}
func (p *contactDeltaIDPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
func (p *contactDeltaPager) GetPage(
ctx context.Context,
) (DeltaLinkValuer[models.Contactable], error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *contactDeltaIDPager) SetNext(nextLink string) {
func (p *contactDeltaPager) SetNextLink(nextLink string) {
p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *contactDeltaIDPager) Reset(ctx context.Context) {
p.builder = getContactDeltaBuilder(ctx, p.gs, p.userID, p.containerID, p.options)
}
func (p *contactDeltaIDPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Contactable](pl)
func (p *contactDeltaPager) Reset(ctx context.Context) {
p.builder = getContactDeltaBuilder(ctx, p.gs, p.userID, p.containerID)
}
func (c Contacts) GetAddedAndRemovedItemIDs(
ctx context.Context,
userID, containerID, oldDelta string,
userID, containerID, prevDeltaLink string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
ctx = clues.Add(
ctx,
"category", selectors.ExchangeContact,
"data_category", path.ContactsCategory,
"container_id", containerID)
pager := c.NewContactIDsPager(ctx, userID, containerID, immutableIDs)
deltaPager := c.NewContactDeltaIDsPager(ctx, userID, containerID, oldDelta, immutableIDs)
deltaPager := c.NewContactsDeltaPager(
ctx,
userID,
containerID,
prevDeltaLink,
immutableIDs,
idAnd()...)
pager := c.NewContactsPager(
userID,
containerID,
immutableIDs,
idAnd()...)
return getAddedAndRemovedItemIDs(ctx, c.Stable, pager, deltaPager, oldDelta, canMakeDeltaQueries)
return getAddedAndRemovedItemIDs[models.Contactable](
ctx,
pager,
deltaPager,
prevDeltaLink,
canMakeDeltaQueries)
}

View File

@ -2,8 +2,6 @@ package api
import (
"context"
"fmt"
"time"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/drives"
@ -21,7 +19,7 @@ import (
// non-delta item pager
// ---------------------------------------------------------------------------
var _ itemPager[models.DriveItemable] = &driveItemPageCtrl{}
var _ Pager[models.DriveItemable] = &driveItemPageCtrl{}
type driveItemPageCtrl struct {
gs graph.Servicer
@ -32,7 +30,7 @@ type driveItemPageCtrl struct {
func (c Drives) NewDriveItemPager(
driveID, containerID string,
selectProps ...string,
) itemPager[models.DriveItemable] {
) Pager[models.DriveItemable] {
options := &drives.ItemItemsItemChildrenRequestBuilderGetRequestConfiguration{
QueryParameters: &drives.ItemItemsItemChildrenRequestBuilderGetQueryParameters{},
}
@ -52,18 +50,14 @@ func (c Drives) NewDriveItemPager(
return &driveItemPageCtrl{c.Stable, builder, options}
}
//lint:ignore U1000 False Positive
func (p *driveItemPageCtrl) getPage(ctx context.Context) (PageLinkValuer[models.DriveItemable], error) {
func (p *driveItemPageCtrl) GetPage(
ctx context.Context,
) (NextLinkValuer[models.DriveItemable], error) {
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return EmptyDeltaLinker[models.DriveItemable]{PageLinkValuer: page}, nil
return page, graph.Stack(ctx, err).OrNil()
}
//lint:ignore U1000 False Positive
func (p *driveItemPageCtrl) setNext(nextLink string) {
func (p *driveItemPageCtrl) SetNextLink(nextLink string) {
p.builder = drives.NewItemItemsItemChildrenRequestBuilder(nextLink, p.gs.Adapter())
}
@ -171,21 +165,14 @@ func (c Drives) NewDriveItemDeltaPager(
return res
}
func (p *DriveItemDeltaPageCtrl) GetPage(ctx context.Context) (DeltaPageLinker, error) {
var (
resp DeltaPageLinker
err error
)
resp, err = p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
func (p *DriveItemDeltaPageCtrl) GetPage(
ctx context.Context,
) (DeltaLinkValuer[models.DriveItemable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *DriveItemDeltaPageCtrl) SetNext(link string) {
func (p *DriveItemDeltaPageCtrl) SetNextLink(link string) {
p.builder = drives.NewItemItemsItemDeltaRequestBuilder(link, p.gs.Adapter())
}
@ -198,10 +185,6 @@ func (p *DriveItemDeltaPageCtrl) Reset(context.Context) {
Delta()
}
func (p *DriveItemDeltaPageCtrl) ValuesIn(l PageLinker) ([]models.DriveItemable, error) {
return getValues[models.DriveItemable](l)
}
// ---------------------------------------------------------------------------
// user's drives pager
// ---------------------------------------------------------------------------
@ -239,59 +222,36 @@ func (c Drives) NewUserDrivePager(
return res
}
type nopUserDrivePageLinker struct {
type nopUserDrivePage struct {
drive models.Driveable
}
func (nl nopUserDrivePageLinker) GetOdataNextLink() *string { return nil }
func (nl nopUserDrivePage) GetValue() []models.Driveable {
return []models.Driveable{nl.drive}
}
func (p *userDrivePager) GetPage(ctx context.Context) (PageLinker, error) {
var (
resp PageLinker
err error
)
func (nl nopUserDrivePage) GetOdataNextLink() *string {
return nil
}
func (p *userDrivePager) GetPage(
ctx context.Context,
) (NextLinkValuer[models.Driveable], error) {
// we only ever want to return the user's default drive.
d, err := p.gs.
Client().
Users().
ByUserIdString(p.userID).
Drive().
Get(ctx, nil)
if err != nil {
return nil, graph.Stack(ctx, err)
}
resp = &nopUserDrivePageLinker{drive: d}
// TODO(keepers): turn back on when we can separate drive enumeration
// from default drive lookup.
// resp, err = p.builder.Get(ctx, p.options)
// if err != nil {
// return nil, graph.Stack(ctx, err)
// }
return resp, nil
return &nopUserDrivePage{drive: d}, graph.Stack(ctx, err).OrNil()
}
func (p *userDrivePager) SetNext(link string) {
func (p *userDrivePager) SetNextLink(link string) {
p.builder = users.NewItemDrivesRequestBuilder(link, p.gs.Adapter())
}
func (p *userDrivePager) ValuesIn(l PageLinker) ([]models.Driveable, error) {
nl, ok := l.(*nopUserDrivePageLinker)
if !ok || nl == nil {
return nil, clues.New(fmt.Sprintf("improper page linker struct for user drives: %T", l))
}
// TODO(keepers): turn back on when we can separate drive enumeration
// from default drive lookup.
// return getValues[models.Driveable](l)
return []models.Driveable{nl.drive}, nil
}
// ---------------------------------------------------------------------------
// site's libraries pager
// ---------------------------------------------------------------------------
@ -332,28 +292,17 @@ func (c Drives) NewSiteDrivePager(
return res
}
func (p *siteDrivePager) GetPage(ctx context.Context) (PageLinker, error) {
var (
resp PageLinker
err error
)
resp, err = p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
func (p *siteDrivePager) GetPage(
ctx context.Context,
) (NextLinkValuer[models.Driveable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *siteDrivePager) SetNext(link string) {
func (p *siteDrivePager) SetNextLink(link string) {
p.builder = sites.NewItemDrivesRequestBuilder(link, p.gs.Adapter())
}
func (p *siteDrivePager) ValuesIn(l PageLinker) ([]models.Driveable, error) {
return getValues[models.Driveable](l)
}
// ---------------------------------------------------------------------------
// drive pager
// ---------------------------------------------------------------------------
@ -362,74 +311,13 @@ func (p *siteDrivePager) ValuesIn(l PageLinker) ([]models.Driveable, error) {
func GetAllDrives(
ctx context.Context,
pager Pager[models.Driveable],
retry bool,
maxRetryCount int,
) ([]models.Driveable, error) {
ds := []models.Driveable{}
if !retry {
maxRetryCount = 0
}
// Loop through all pages returned by Graph API.
for {
var (
err error
page PageLinker
)
// Retry Loop for Drive retrieval. Request can timeout
for i := 0; i <= maxRetryCount; i++ {
page, err = pager.GetPage(ctx)
if err != nil {
if clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense) {
ds, err := enumerateItems(ctx, pager)
if err != nil && (clues.HasLabel(err, graph.LabelsMysiteNotFound) ||
clues.HasLabel(err, graph.LabelsNoSharePointLicense)) {
logger.CtxErr(ctx, err).Infof("resource owner does not have a drive")
return make([]models.Driveable, 0), nil // no license or drives.
}
if graph.IsErrTimeout(err) && i < maxRetryCount {
time.Sleep(time.Duration(3*(i+1)) * time.Second)
continue
}
return nil, graph.Wrap(ctx, err, "retrieving drives")
}
// No error encountered, break the retry loop so we can extract results
// and see if there's another page to fetch.
break
}
tmp, err := pager.ValuesIn(page)
if err != nil {
return nil, graph.Wrap(ctx, err, "extracting drives from response")
}
ds = append(ds, tmp...)
nextLink := ptr.Val(page.GetOdataNextLink())
if len(nextLink) == 0 {
break
}
pager.SetNext(nextLink)
}
logger.Ctx(ctx).Debugf("retrieved %d valid drives", len(ds))
return ds, nil
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func getValues[T any](l PageLinker) ([]T, error) {
page, ok := l.(interface{ GetValue() []T })
if !ok {
return nil, clues.New("page does not comply with GetValue() interface").
With("page_item_type", fmt.Sprintf("%T", l))
}
return page.GetValue(), nil
return ds, graph.Stack(ctx, err).OrNil()
}

View File

@ -14,9 +14,7 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)
const (
eventBetaDeltaURLTemplate = "https://graph.microsoft.com/beta/users/%s/calendars/%s/events/delta"
)
const eventBetaDeltaURLTemplate = "https://graph.microsoft.com/beta/users/%s/calendars/%s/events/delta"
// ---------------------------------------------------------------------------
// container pager
@ -98,7 +96,7 @@ func (c Events) EnumerateContainers(
// item pager
// ---------------------------------------------------------------------------
var _ itemPager[models.Eventable] = &eventsPageCtrl{}
var _ Pager[models.Eventable] = &eventsPageCtrl{}
type eventsPageCtrl struct {
gs graph.Servicer
@ -108,13 +106,13 @@ type eventsPageCtrl struct {
func (c Events) NewEventsPager(
userID, containerID string,
immutableIDs bool,
selectProps ...string,
) itemPager[models.Eventable] {
) Pager[models.Eventable] {
options := &users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)),
QueryParameters: &users.ItemCalendarsItemEventsRequestBuilderGetQueryParameters{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize), preferImmutableIDs(immutableIDs)),
QueryParameters: &users.ItemCalendarsItemEventsRequestBuilderGetQueryParameters{},
// do NOT set Top. It limits the total items received.
},
}
if len(selectProps) > 0 {
@ -132,28 +130,23 @@ func (c Events) NewEventsPager(
return &eventsPageCtrl{c.Stable, builder, options}
}
//lint:ignore U1000 False Positive
func (p *eventsPageCtrl) getPage(ctx context.Context) (PageLinkValuer[models.Eventable], error) {
func (p *eventsPageCtrl) GetPage(
ctx context.Context,
) (NextLinkValuer[models.Eventable], error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
return resp, graph.Stack(ctx, err).OrNil()
}
//lint:ignore U1000 False Positive
func (p *eventsPageCtrl) setNext(nextLink string) {
func (p *eventsPageCtrl) SetNextLink(nextLink string) {
p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter())
}
//lint:ignore U1000 False Positive
func (c Events) GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error) {
ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewEventsPager(userID, containerID, eventCollisionKeyProps()...)
pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager)
if err != nil {
@ -169,68 +162,34 @@ func (c Events) GetItemsInContainerByCollisionKey(
return m, nil
}
// ---------------------------------------------------------------------------
// item ID pager
// ---------------------------------------------------------------------------
var _ DeltaPager[getIDAndAddtler] = &eventIDPager{}
type eventIDPager struct {
gs graph.Servicer
builder *users.ItemCalendarsItemEventsRequestBuilder
options *users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration
}
func (c Events) NewEventIDsPager(
func (c Events) GetItemIDsInContainer(
ctx context.Context,
userID, containerID string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
options := &users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize), preferImmutableIDs(immutableIDs)),
QueryParameters: &users.ItemCalendarsItemEventsRequestBuilderGetQueryParameters{
// do NOT set Top. It limits the total items received.
},
}
) (map[string]struct{}, error) {
ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewEventsPager(userID, containerID, false, idAnd()...)
builder := c.Stable.
Client().
Users().
ByUserIdString(userID).
Calendars().
ByCalendarIdString(containerID).
Events()
return &eventIDPager{c.Stable, builder, options}, nil
}
func (p *eventIDPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
items, err := enumerateItems(ctx, pager)
if err != nil {
return nil, graph.Stack(ctx, err)
return nil, graph.Wrap(ctx, err, "enumerating events")
}
return EmptyDeltaLinker[models.Eventable]{PageLinkValuer: resp}, nil
}
m := map[string]struct{}{}
func (p *eventIDPager) SetNext(nextLink string) {
p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter())
}
for _, item := range items {
m[ptr.Val(item.GetId())] = struct{}{}
}
// non delta pagers don't need reset
func (p *eventIDPager) Reset(context.Context) {}
func (p *eventIDPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Eventable](pl)
return m, nil
}
// ---------------------------------------------------------------------------
// delta item ID pager
// ---------------------------------------------------------------------------
var _ DeltaPager[getIDAndAddtler] = &eventDeltaIDPager{}
var _ DeltaPager[models.Eventable] = &eventDeltaPager{}
type eventDeltaIDPager struct {
type eventDeltaPager struct {
gs graph.Servicer
userID string
containerID string
@ -238,87 +197,85 @@ type eventDeltaIDPager struct {
options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration
}
func (c Events) NewEventDeltaIDsPager(
ctx context.Context,
userID, containerID, oldDelta string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
QueryParameters: &users.ItemCalendarsItemEventsDeltaRequestBuilderGetQueryParameters{
// do NOT set Top. It limits the total items received.
},
}
var builder *users.ItemCalendarsItemEventsDeltaRequestBuilder
if oldDelta == "" {
builder = getEventDeltaBuilder(ctx, c.Stable, userID, containerID, options)
} else {
builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(oldDelta, c.Stable.Adapter())
}
return &eventDeltaIDPager{c.Stable, userID, containerID, builder, options}, nil
}
func getEventDeltaBuilder(
ctx context.Context,
gs graph.Servicer,
userID, containerID string,
options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration,
) *users.ItemCalendarsItemEventsDeltaRequestBuilder {
// Graph SDK only supports delta queries against events on the beta version, so we're
// manufacturing use of the beta version url to make the call instead.
// See: https://learn.microsoft.com/ko-kr/graph/api/event-delta?view=graph-rest-beta&tabs=http
// Note that the delta item body is skeletal compared to the actual event struct. Lucky
// for us, we only need the item ID. As a result, even though we hacked the version, the
// response body parses properly into the v1.0 structs and complies with our wanted interfaces.
// Likewise, the NextLink and DeltaLink odata tags carry our hack forward, so the rest of the code
// works as intended (until, at least, we want to _not_ call the beta anymore).
rawURL := fmt.Sprintf(eventBetaDeltaURLTemplate, userID, containerID)
builder := users.NewItemCalendarsItemEventsDeltaRequestBuilder(rawURL, gs.Adapter())
return builder
return users.NewItemCalendarsItemEventsDeltaRequestBuilder(rawURL, gs.Adapter())
}
func (p *eventDeltaIDPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
func (c Events) NewEventsDeltaPager(
ctx context.Context,
userID, containerID, prevDeltaLink string,
immutableIDs bool,
selectProps ...string,
) DeltaPager[models.Eventable] {
options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{
// do NOT set Top. It limits the total items received.
QueryParameters: &users.ItemCalendarsItemEventsDeltaRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
}
return resp, nil
if len(selectProps) > 0 {
options.QueryParameters.Select = selectProps
}
var builder *users.ItemCalendarsItemEventsDeltaRequestBuilder
if len(prevDeltaLink) > 0 {
builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(prevDeltaLink, c.Stable.Adapter())
} else {
builder = getEventDeltaBuilder(ctx, c.Stable, userID, containerID)
}
return &eventDeltaPager{c.Stable, userID, containerID, builder, options}
}
func (p *eventDeltaIDPager) SetNext(nextLink string) {
func (p *eventDeltaPager) GetPage(
ctx context.Context,
) (DeltaLinkValuer[models.Eventable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *eventDeltaPager) SetNextLink(nextLink string) {
p.builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *eventDeltaIDPager) Reset(ctx context.Context) {
p.builder = getEventDeltaBuilder(ctx, p.gs, p.userID, p.containerID, p.options)
}
func (p *eventDeltaIDPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Eventable](pl)
func (p *eventDeltaPager) Reset(ctx context.Context) {
p.builder = getEventDeltaBuilder(ctx, p.gs, p.userID, p.containerID)
}
func (c Events) GetAddedAndRemovedItemIDs(
ctx context.Context,
userID, containerID, oldDelta string,
userID, containerID, prevDeltaLink string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
ctx = clues.Add(ctx, "container_id", containerID)
ctx = clues.Add(
ctx,
"data_category", path.EventsCategory,
"container_id", containerID)
pager, err := c.NewEventIDsPager(ctx, userID, containerID, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating non-delta pager")
}
deltaPager := c.NewEventsDeltaPager(
ctx,
userID,
containerID,
prevDeltaLink,
immutableIDs,
idAnd()...)
pager := c.NewEventsPager(
userID,
containerID,
immutableIDs,
idAnd()...)
deltaPager, err := c.NewEventDeltaIDsPager(ctx, userID, containerID, oldDelta, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager")
}
return getAddedAndRemovedItemIDs(ctx, c.Stable, pager, deltaPager, oldDelta, canMakeDeltaQueries)
return getAddedAndRemovedItemIDs[models.Eventable](
ctx,
pager,
deltaPager,
prevDeltaLink,
canMakeDeltaQueries)
}

View File

@ -16,53 +16,37 @@ import (
// common interfaces
// ---------------------------------------------------------------------------
type DeltaPager[T any] interface {
DeltaGetPager
Resetter
SetNextLinker
ValuesInPageLinker[T]
type GetPager[T any] interface {
GetPage(context.Context) (T, error)
}
type Pager[T any] interface {
GetPager
SetNextLinker
ValuesInPageLinker[T]
type NextLinkValuer[T any] interface {
NextLinker
Valuer[T]
}
type DeltaGetPager interface {
GetPage(context.Context) (DeltaPageLinker, error)
type NextLinker interface {
GetOdataNextLink() *string
}
type GetPager interface {
GetPage(context.Context) (PageLinker, error)
type SetNextLinker interface {
SetNextLink(nextLink string)
}
type DeltaLinker interface {
NextLinker
GetOdataDeltaLink() *string
}
type DeltaLinkValuer[T any] interface {
DeltaLinker
Valuer[T]
}
type Valuer[T any] interface {
GetValue() []T
}
type ValuesInPageLinker[T any] interface {
ValuesIn(PageLinker) ([]T, error)
}
type PageLinker interface {
GetOdataNextLink() *string
}
type DeltaPageLinker interface {
PageLinker
GetOdataDeltaLink() *string
}
type PageLinkValuer[T any] interface {
PageLinker
Valuer[T]
}
type SetNextLinker interface {
SetNext(nextLink string)
}
type Resetter interface {
Reset(context.Context)
}
@ -76,41 +60,26 @@ func IsNextLinkValid(next string) bool {
return !strings.Contains(next, `users//`)
}
func NextLink(pl PageLinker) string {
func NextLink(pl NextLinker) string {
return ptr.Val(pl.GetOdataNextLink())
}
func NextAndDeltaLink(pl DeltaPageLinker) (string, string) {
func NextAndDeltaLink(pl DeltaLinker) (string, string) {
return NextLink(pl), ptr.Val(pl.GetOdataDeltaLink())
}
// EmptyDeltaLinker is used to convert PageLinker to DeltaPageLinker
type EmptyDeltaLinker[T any] struct {
PageLinkValuer[T]
}
func (EmptyDeltaLinker[T]) GetOdataDeltaLink() *string {
return ptr.To("")
}
func (e EmptyDeltaLinker[T]) GetValue() []T {
return e.PageLinkValuer.GetValue()
}
// ---------------------------------------------------------------------------
// generic handler for non-delta item paging in a container
// non-delta item paging
// ---------------------------------------------------------------------------
type itemPager[T any] interface {
// getPage get a page with the specified options from graph
getPage(context.Context) (PageLinkValuer[T], error)
// setNext is used to pass in the next url got from graph
setNext(string)
type Pager[T any] interface {
GetPager[NextLinkValuer[T]]
SetNextLinker
}
func enumerateItems[T any](
ctx context.Context,
pager itemPager[T],
pager Pager[T],
) ([]T, error) {
var (
result = make([]T, 0)
@ -120,52 +89,118 @@ func enumerateItems[T any](
for len(nextLink) > 0 {
// get the next page of data, check for standard errors
resp, err := pager.getPage(ctx)
page, err := pager.GetPage(ctx)
if err != nil {
return nil, graph.Stack(ctx, err)
}
result = append(result, resp.GetValue()...)
nextLink = NextLink(resp)
result = append(result, page.GetValue()...)
nextLink = NextLink(page)
pager.setNext(nextLink)
pager.SetNextLink(nextLink)
}
logger.Ctx(ctx).Infow("completed enumeration", "count", len(result))
logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result))
return result, nil
}
// ---------------------------------------------------------------------------
// generic handler for delta-based ittem paging in a container
// generic handler for delta-based item paging
// ---------------------------------------------------------------------------
// uses a models interface compliant with { GetValues() []T }
// to transform its results into a slice of getIDer interfaces.
// Generics used here to handle the variation of msoft interfaces
// that all _almost_ comply with GetValue, but all return a different
// interface.
func toValues[T any](a any) ([]getIDAndAddtler, error) {
gv, ok := a.(interface{ GetValue() []T })
if !ok {
return nil, clues.New(fmt.Sprintf("type does not comply with the GetValue() interface: %T", a))
type DeltaPager[T any] interface {
GetPager[DeltaLinkValuer[T]]
Resetter
SetNextLinker
}
func deltaEnumerateItems[T any](
ctx context.Context,
pager DeltaPager[T],
prevDeltaLink string,
) ([]T, DeltaUpdate, error) {
var (
result = make([]T, 0)
// stubbed initial value to ensure we enter the loop.
newDeltaLink = ""
invalidPrevDelta = len(prevDeltaLink) == 0
nextLink = "do-while"
)
// Loop through all pages returned by Graph API.
for len(nextLink) > 0 {
page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC))
if graph.IsErrInvalidDelta(err) {
logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink)
invalidPrevDelta = true
result = make([]T, 0)
// Reset tells the pager to try again after ditching its delta history.
pager.Reset(ctx)
continue
}
items := gv.GetValue()
r := make([]getIDAndAddtler, 0, len(items))
for _, item := range items {
var a any = item
ri, ok := a.(getIDAndAddtler)
if !ok {
return nil, clues.New(fmt.Sprintf("type does not comply with the getIDAndAddtler interface: %T", item))
if err != nil {
return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "retrieving page")
}
r = append(r, ri)
result = append(result, page.GetValue()...)
nl, deltaLink := NextAndDeltaLink(page)
if len(deltaLink) > 0 {
newDeltaLink = deltaLink
}
return r, nil
nextLink = nl
pager.SetNextLink(nextLink)
}
logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result))
du := DeltaUpdate{
URL: newDeltaLink,
Reset: invalidPrevDelta,
}
return result, du, nil
}
// ---------------------------------------------------------------------------
// shared enumeration runner funcs
// ---------------------------------------------------------------------------
func getAddedAndRemovedItemIDs[T any](
ctx context.Context,
pager Pager[T],
deltaPager DeltaPager[T],
prevDeltaLink string,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
if canMakeDeltaQueries {
ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink)
if err != nil && (!graph.IsErrInvalidDelta(err) || len(prevDeltaLink) == 0) {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
}
if err == nil {
a, r, err := addedAndRemovedByAddtlData(ts)
return a, r, du, graph.Stack(ctx, err).OrNil()
}
}
du := DeltaUpdate{Reset: true}
ts, err := enumerateItems(ctx, pager)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
}
a, r, err := addedAndRemovedByAddtlData[T](ts)
return a, r, du, graph.Stack(ctx, err).OrNil()
}
type getIDAndAddtler interface {
@ -173,122 +208,25 @@ type getIDAndAddtler interface {
GetAdditionalData() map[string]any
}
func getAddedAndRemovedItemIDs(
ctx context.Context,
service graph.Servicer,
pager DeltaPager[getIDAndAddtler],
deltaPager DeltaPager[getIDAndAddtler],
oldDelta string,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
var (
pgr DeltaPager[getIDAndAddtler]
resetDelta bool
)
func addedAndRemovedByAddtlData[T any](items []T) ([]string, []string, error) {
added, removed := []string{}, []string{}
if canMakeDeltaQueries {
pgr = deltaPager
resetDelta = len(oldDelta) == 0
} else {
pgr = pager
resetDelta = true
}
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
// note: happy path, not the error condition
if err == nil {
return added, removed, DeltaUpdate{deltaURL, resetDelta}, err
}
// If we already tried with a non-delta url, we can return
if !canMakeDeltaQueries {
return nil, nil, DeltaUpdate{}, err
}
// return error if invalid not delta error or oldDelta was empty
if !graph.IsErrInvalidDelta(err) || len(oldDelta) == 0 {
return nil, nil, DeltaUpdate{}, err
}
// reset deltaPager
pgr.Reset(ctx)
added, removed, deltaURL, err = getItemsAddedAndRemovedFromContainer(ctx, pgr)
if err != nil {
return nil, nil, DeltaUpdate{}, err
}
return added, removed, DeltaUpdate{deltaURL, true}, nil
}
// generic controller for retrieving all item ids in a container.
func getItemsAddedAndRemovedFromContainer(
ctx context.Context,
pager DeltaPager[getIDAndAddtler],
) ([]string, []string, string, error) {
var (
addedIDs = []string{}
removedIDs = []string{}
deltaURL string
itemCount int
page int
)
for {
// get the next page of data, check for standard errors
resp, err := pager.GetPage(ctx)
if err != nil {
return nil, nil, deltaURL, graph.Stack(ctx, err)
}
// each category type responds with a different interface, but all
// of them comply with GetValue, which is where we'll get our item data.
items, err := pager.ValuesIn(resp)
if err != nil {
return nil, nil, "", graph.Stack(ctx, err)
}
itemCount += len(items)
page++
// Log every ~1000 items (the page size we use is 200)
if page%5 == 0 {
logger.Ctx(ctx).Infow("queried items", "count", itemCount)
}
// iterate through the items in the page
for _, item := range items {
giaa, ok := any(item).(getIDAndAddtler)
if !ok {
return nil, nil, clues.New("item does not provide id and additional data getters").
With("item_type", fmt.Sprintf("%T", item))
}
// if the additional data contains a `@removed` key, the value will either
// be 'changed' or 'deleted'. We don't really care about the cause: both
// cases are handled the same way in storage.
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
addedIDs = append(addedIDs, ptr.Val(item.GetId()))
if giaa.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
added = append(added, ptr.Val(giaa.GetId()))
} else {
removedIDs = append(removedIDs, ptr.Val(item.GetId()))
removed = append(removed, ptr.Val(giaa.GetId()))
}
}
nextLink, deltaLink := NextAndDeltaLink(resp)
// the deltaLink is kind of like a cursor for overall data state.
// once we run through pages of nextLinks, the last query will
// produce a deltaLink instead (if supported), which we'll use on
// the next backup to only get the changes since this run.
if len(deltaLink) > 0 {
deltaURL = deltaLink
}
// the nextLink is our page cursor within this query.
// if we have more data to retrieve, we'll have a
// nextLink instead of a deltaLink.
if len(nextLink) == 0 {
break
}
pager.SetNext(nextLink)
}
logger.Ctx(ctx).Infow("completed enumeration", "count", itemCount)
return addedIDs, removedIDs, deltaURL, nil
return added, removed, nil
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -39,22 +40,6 @@ func (l deltaNextLink) GetOdataDeltaLink() *string {
return l.deltaLink
}
// mock values
type testPagerValue struct {
id string
removed bool
}
func (v testPagerValue) GetId() *string { return &v.id } //revive:disable-line:var-naming
func (v testPagerValue) GetAdditionalData() map[string]any {
if v.removed {
return map[string]any{graph.AddtlDataRemoved: true}
}
return map[string]any{}
}
// mock page
type testPage struct {
@ -77,7 +62,7 @@ func (p testPage) GetValue() []any {
// mock item pager
var _ itemPager[any] = &testPager{}
var _ Pager[any] = &testPager{}
type testPager struct {
t *testing.T
@ -85,17 +70,15 @@ type testPager struct {
pageErr error
}
//lint:ignore U1000 False Positive
func (p *testPager) getPage(ctx context.Context) (PageLinkValuer[any], error) {
func (p *testPager) GetPage(ctx context.Context) (NextLinkValuer[any], error) {
return p.pager, p.pageErr
}
//lint:ignore U1000 False Positive
func (p *testPager) setNext(nextLink string) {}
func (p *testPager) SetNextLink(nextLink string) {}
// mock id pager
var _ DeltaPager[getIDAndAddtler] = &testIDsPager{}
var _ Pager[any] = &testIDsPager{}
type testIDsPager struct {
t *testing.T
@ -105,7 +88,9 @@ type testIDsPager struct {
needsReset bool
}
func (p *testIDsPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
func (p *testIDsPager) GetPage(
ctx context.Context,
) (NextLinkValuer[any], error) {
if p.errorCode != "" {
ierr := odataerrors.NewMainError()
ierr.SetCode(&p.errorCode)
@ -116,9 +101,28 @@ func (p *testIDsPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
return nil, err
}
return testPage{}, nil
values := make([]any, 0, len(p.added)+len(p.removed))
for _, a := range p.added {
// contact chosen arbitrarily, any exchange model should work
itm := models.NewContact()
itm.SetId(ptr.To(a))
values = append(values, itm)
}
for _, r := range p.removed {
// contact chosen arbitrarily, any exchange model should work
itm := models.NewContact()
itm.SetId(ptr.To(r))
itm.SetAdditionalData(map[string]any{graph.AddtlDataRemoved: struct{}{}})
values = append(values, itm)
}
return testPage{values}, nil
}
func (p *testIDsPager) SetNext(string) {}
func (p *testIDsPager) SetNextLink(string) {}
func (p *testIDsPager) Reset(context.Context) {
if !p.needsReset {
require.Fail(p.t, "reset should not be called")
@ -128,36 +132,76 @@ func (p *testIDsPager) Reset(context.Context) {
p.errorCode = ""
}
func (p *testIDsPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
items := []getIDAndAddtler{}
var _ DeltaPager[any] = &testIDsDeltaPager{}
for _, id := range p.added {
items = append(items, testPagerValue{id: id})
type testIDsDeltaPager struct {
t *testing.T
added []string
removed []string
errorCode string
needsReset bool
}
func (p *testIDsDeltaPager) GetPage(
ctx context.Context,
) (DeltaLinkValuer[any], error) {
if p.errorCode != "" {
ierr := odataerrors.NewMainError()
ierr.SetCode(&p.errorCode)
err := odataerrors.NewODataError()
err.SetErrorEscaped(ierr)
return nil, err
}
for _, id := range p.removed {
items = append(items, testPagerValue{id: id, removed: true})
values := make([]any, 0, len(p.added)+len(p.removed))
for _, a := range p.added {
// contact chosen arbitrarily, any exchange model should work
itm := models.NewContact()
itm.SetId(ptr.To(a))
values = append(values, itm)
}
return items, nil
for _, r := range p.removed {
// contact chosen arbitrarily, any exchange model should work
itm := models.NewContact()
itm.SetId(ptr.To(r))
itm.SetAdditionalData(map[string]any{graph.AddtlDataRemoved: struct{}{}})
values = append(values, itm)
}
return testPage{values}, nil
}
func (p *testIDsDeltaPager) SetNextLink(string) {}
func (p *testIDsDeltaPager) Reset(context.Context) {
if !p.needsReset {
require.Fail(p.t, "reset should not be called")
}
p.needsReset = false
p.errorCode = ""
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
type ItemPagerUnitSuite struct {
type PagerUnitSuite struct {
tester.Suite
}
func TestItemPagerUnitSuite(t *testing.T) {
suite.Run(t, &ItemPagerUnitSuite{Suite: tester.NewUnitSuite(t)})
func TestPagerUnitSuite(t *testing.T) {
suite.Run(t, &PagerUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *ItemPagerUnitSuite) TestEnumerateItems() {
func (suite *PagerUnitSuite) TestEnumerateItems() {
tests := []struct {
name string
getPager func(*testing.T, context.Context) itemPager[any]
getPager func(*testing.T, context.Context) Pager[any]
expect []any
expectErr require.ErrorAssertionFunc
}{
@ -166,7 +210,7 @@ func (suite *ItemPagerUnitSuite) TestEnumerateItems() {
getPager: func(
t *testing.T,
ctx context.Context,
) itemPager[any] {
) Pager[any] {
return &testPager{
t: t,
pager: testPage{[]any{"foo", "bar"}},
@ -180,7 +224,7 @@ func (suite *ItemPagerUnitSuite) TestEnumerateItems() {
getPager: func(
t *testing.T,
ctx context.Context,
) itemPager[any] {
) Pager[any] {
return &testPager{
t: t,
pageErr: assert.AnError,
@ -206,188 +250,125 @@ func (suite *ItemPagerUnitSuite) TestEnumerateItems() {
}
}
func (suite *ItemPagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
type expected struct {
added []string
removed []string
deltaUpdate DeltaUpdate
}
tests := []struct {
name string
pagerGetter func(
*testing.T,
context.Context,
graph.Servicer,
string, string,
bool,
) (DeltaPager[getIDAndAddtler], error)
) Pager[any]
deltaPagerGetter func(
*testing.T,
context.Context,
graph.Servicer,
string, string, string,
bool,
) (DeltaPager[getIDAndAddtler], error)
added []string
removed []string
deltaUpdate DeltaUpdate
delta string
canMakeDeltaQueries bool
) DeltaPager[any]
prevDelta string
expect expected
canDelta bool
}{
{
name: "no prev delta",
pagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
// this should not be called
return nil, assert.AnError
pagerGetter: func(t *testing.T) Pager[any] {
return nil
},
deltaPagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
return &testIDsPager{
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
return &testIDsDeltaPager{
t: t,
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
}, nil
}
},
expect: expected{
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
deltaUpdate: DeltaUpdate{Reset: true},
canMakeDeltaQueries: true,
},
canDelta: true,
},
{
name: "with prev delta",
pagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
// this should not be called
return nil, assert.AnError
pagerGetter: func(t *testing.T) Pager[any] {
return nil
},
deltaPagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
return &testIDsPager{
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
return &testIDsDeltaPager{
t: t,
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
}, nil
}
},
prevDelta: "delta",
expect: expected{
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
delta: "delta",
deltaUpdate: DeltaUpdate{Reset: false},
canMakeDeltaQueries: true,
},
canDelta: true,
},
{
name: "delta expired",
pagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
// this should not be called
return nil, assert.AnError
pagerGetter: func(t *testing.T) Pager[any] {
return nil
},
deltaPagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
return &testIDsPager{
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
return &testIDsDeltaPager{
t: t,
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
errorCode: "SyncStateNotFound",
needsReset: true,
}, nil
}
},
prevDelta: "delta",
expect: expected{
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
delta: "delta",
deltaUpdate: DeltaUpdate{Reset: true},
canMakeDeltaQueries: true,
},
canDelta: true,
},
{
name: "quota exceeded",
pagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
name: "delta not allowed",
pagerGetter: func(t *testing.T) Pager[any] {
return &testIDsPager{
t: t,
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
}, nil
}
},
deltaPagerGetter: func(
t *testing.T,
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (DeltaPager[getIDAndAddtler], error) {
return &testIDsPager{errorCode: "ErrorQuotaExceeded"}, nil
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
return nil
},
expect: expected{
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
deltaUpdate: DeltaUpdate{Reset: true},
canMakeDeltaQueries: false,
},
canDelta: false,
},
}
for _, tt := range tests {
suite.Run(tt.name, func() {
for _, test := range tests {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
pager, _ := tt.pagerGetter(t, ctx, graph.Service{}, "user", "directory", false)
deltaPager, _ := tt.deltaPagerGetter(t, ctx, graph.Service{}, "user", "directory", tt.delta, false)
added, removed, deltaUpdate, err := getAddedAndRemovedItemIDs(
added, removed, deltaUpdate, err := getAddedAndRemovedItemIDs[any](
ctx,
graph.Service{},
pager,
deltaPager,
tt.delta,
tt.canMakeDeltaQueries)
test.pagerGetter(t),
test.deltaPagerGetter(t),
test.prevDelta,
test.canDelta)
require.NoError(t, err, "getting added and removed item IDs")
require.EqualValues(t, tt.added, added, "added item IDs")
require.EqualValues(t, tt.removed, removed, "removed item IDs")
require.Equal(t, tt.deltaUpdate, deltaUpdate, "delta update")
require.NoErrorf(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err))
require.EqualValues(t, test.expect.added, added, "added item IDs")
require.EqualValues(t, test.expect.removed, removed, "removed item IDs")
require.Equal(t, test.expect.deltaUpdate, deltaUpdate, "delta update")
})
}
}
@ -423,7 +404,7 @@ var (
}
)
func (suite *ItemPagerUnitSuite) TestNextAndDeltaLink() {
func (suite *PagerUnitSuite) TestNextAndDeltaLink() {
deltaTable := []testInput{
{
name: "empty",
@ -467,7 +448,7 @@ func (suite *ItemPagerUnitSuite) TestNextAndDeltaLink() {
// Related to: https://github.com/alcionai/corso/issues/2520
//
//nolint:lll
func (suite *ItemPagerUnitSuite) TestIsLinkValid() {
func (suite *PagerUnitSuite) TestIsLinkValid() {
invalidString := `https://graph.microsoft.com/v1.0/users//mailFolders//messages/microsoft.graph.delta()?$select=id%2CisRead`
tests := []struct {
name string

View File

@ -11,7 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/path"
)
// ---------------------------------------------------------------------------
@ -31,7 +31,7 @@ func (c Mail) NewMailFolderPager(userID string) mailFolderPager {
return mailFolderPager{c.Stable, builder}
}
func (p *mailFolderPager) getPage(ctx context.Context) (PageLinker, error) {
func (p *mailFolderPager) getPage(ctx context.Context) (NextLinker, error) {
page, err := p.builder.Get(ctx, nil)
if err != nil {
return nil, graph.Stack(ctx, err)
@ -40,11 +40,11 @@ func (p *mailFolderPager) getPage(ctx context.Context) (PageLinker, error) {
return page, nil
}
func (p *mailFolderPager) setNext(nextLink string) {
func (p *mailFolderPager) SetNextLink(nextLink string) {
p.builder = users.NewItemMailFoldersRequestBuilder(nextLink, p.service.Adapter())
}
func (p *mailFolderPager) valuesIn(pl PageLinker) ([]models.MailFolderable, error) {
func (p *mailFolderPager) valuesIn(pl NextLinker) ([]models.MailFolderable, error) {
// Ideally this should be `users.ItemMailFoldersResponseable`, but
// that is not a thing as stable returns different result
page, ok := pl.(models.MailFolderCollectionResponseable)
@ -111,7 +111,7 @@ func (c Mail) EnumerateContainers(
break
}
pgr.setNext(link)
pgr.SetNextLink(link)
}
return el.Failure()
@ -121,9 +121,9 @@ func (c Mail) EnumerateContainers(
// item pager
// ---------------------------------------------------------------------------
var _ itemPager[models.Messageable] = &mailPageCtrl{}
var _ Pager[models.Messageable] = &mailsPageCtrl{}
type mailPageCtrl struct {
type mailsPageCtrl struct {
gs graph.Servicer
builder *users.ItemMailFoldersItemMessagesRequestBuilder
options *users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration
@ -131,13 +131,13 @@ type mailPageCtrl struct {
func (c Mail) NewMailPager(
userID, containerID string,
immutableIDs bool,
selectProps ...string,
) itemPager[models.Messageable] {
) Pager[models.Messageable] {
options := &users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)),
QueryParameters: &users.ItemMailFoldersItemMessagesRequestBuilderGetQueryParameters{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize), preferImmutableIDs(immutableIDs)),
QueryParameters: &users.ItemMailFoldersItemMessagesRequestBuilderGetQueryParameters{},
// do NOT set Top. It limits the total items received.
},
}
if len(selectProps) > 0 {
@ -152,90 +152,30 @@ func (c Mail) NewMailPager(
ByMailFolderIdString(containerID).
Messages()
return &mailPageCtrl{c.Stable, builder, options}
return &mailsPageCtrl{c.Stable, builder, options}
}
//lint:ignore U1000 False Positive
func (p *mailPageCtrl) getPage(ctx context.Context) (PageLinkValuer[models.Messageable], error) {
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return EmptyDeltaLinker[models.Messageable]{PageLinkValuer: page}, nil
}
//lint:ignore U1000 False Positive
func (p *mailPageCtrl) setNext(nextLink string) {
p.builder = users.NewItemMailFoldersItemMessagesRequestBuilder(nextLink, p.gs.Adapter())
}
// ---------------------------------------------------------------------------
// item ID pager
// ---------------------------------------------------------------------------
var _ DeltaPager[getIDAndAddtler] = &mailIDPager{}
type mailIDPager struct {
gs graph.Servicer
builder *users.ItemMailFoldersItemMessagesRequestBuilder
options *users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration
}
func (c Mail) NewMailIDsPager(
func (p *mailsPageCtrl) GetPage(
ctx context.Context,
userID, containerID string,
immutableIDs bool,
) DeltaPager[getIDAndAddtler] {
config := &users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration{
QueryParameters: &users.ItemMailFoldersItemMessagesRequestBuilderGetQueryParameters{
Select: idAnd("isRead"),
// do NOT set Top. It limits the total items received.
},
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize), preferImmutableIDs(immutableIDs)),
}
builder := c.Stable.
Client().
Users().
ByUserIdString(userID).
MailFolders().
ByMailFolderIdString(containerID).
Messages()
return &mailIDPager{c.Stable, builder, config}
) (NextLinkValuer[models.Messageable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *mailIDPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return EmptyDeltaLinker[models.Messageable]{PageLinkValuer: page}, nil
}
func (p *mailIDPager) SetNext(nextLink string) {
func (p *mailsPageCtrl) SetNextLink(nextLink string) {
p.builder = users.NewItemMailFoldersItemMessagesRequestBuilder(nextLink, p.gs.Adapter())
}
// non delta pagers don't have reset
func (p *mailIDPager) Reset(context.Context) {}
func (p *mailIDPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Messageable](pl)
}
func (c Mail) GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error) {
ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewMailPager(userID, containerID, mailCollisionKeyProps()...)
pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager)
if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating mail")
return nil, graph.Wrap(ctx, err, "enumerating mails")
}
m := map[string]string{}
@ -252,11 +192,11 @@ func (c Mail) GetItemIDsInContainer(
userID, containerID string,
) (map[string]struct{}, error) {
ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewMailPager(userID, containerID, "id")
pager := c.NewMailPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager)
if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts")
return nil, graph.Wrap(ctx, err, "enumerating mails")
}
m := map[string]struct{}{}
@ -272,9 +212,9 @@ func (c Mail) GetItemIDsInContainer(
// delta item ID pager
// ---------------------------------------------------------------------------
var _ DeltaPager[getIDAndAddtler] = &mailDeltaIDPager{}
var _ DeltaPager[models.Messageable] = &mailDeltaPager{}
type mailDeltaIDPager struct {
type mailDeltaPager struct {
gs graph.Servicer
userID string
containerID string
@ -285,13 +225,11 @@ type mailDeltaIDPager struct {
func getMailDeltaBuilder(
ctx context.Context,
gs graph.Servicer,
user, containerID string,
options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration,
userID, containerID string,
) *users.ItemMailFoldersItemMessagesDeltaRequestBuilder {
builder := gs.
Client().
builder := gs.Client().
Users().
ByUserIdString(user).
ByUserIdString(userID).
MailFolders().
ByMailFolderIdString(containerID).
Messages().
@ -300,71 +238,75 @@ func getMailDeltaBuilder(
return builder
}
func (c Mail) NewMailDeltaIDsPager(
func (c Mail) NewMailDeltaPager(
ctx context.Context,
userID, containerID, oldDelta string,
userID, containerID, prevDeltaLink string,
immutableIDs bool,
) DeltaPager[getIDAndAddtler] {
config := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{
Select: idAnd("isRead"),
selectProps ...string,
) DeltaPager[models.Messageable] {
options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{
// do NOT set Top. It limits the total items received.
},
QueryParameters: &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
}
if len(selectProps) > 0 {
options.QueryParameters.Select = selectProps
}
var builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder
if len(oldDelta) > 0 {
builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, c.Stable.Adapter())
if len(prevDeltaLink) > 0 {
builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(prevDeltaLink, c.Stable.Adapter())
} else {
builder = getMailDeltaBuilder(ctx, c.Stable, userID, containerID, config)
builder = getMailDeltaBuilder(ctx, c.Stable, userID, containerID)
}
return &mailDeltaIDPager{c.Stable, userID, containerID, builder, config}
return &mailDeltaPager{c.Stable, userID, containerID, builder, options}
}
func (p *mailDeltaIDPager) GetPage(ctx context.Context) (DeltaPageLinker, error) {
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return page, nil
func (p *mailDeltaPager) GetPage(
ctx context.Context,
) (DeltaLinkValuer[models.Messageable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *mailDeltaIDPager) SetNext(nextLink string) {
func (p *mailDeltaPager) SetNextLink(nextLink string) {
p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *mailDeltaIDPager) Reset(ctx context.Context) {
p.builder = p.gs.
Client().
Users().
ByUserIdString(p.userID).
MailFolders().
ByMailFolderIdString(p.containerID).
Messages().
Delta()
}
func (p *mailDeltaIDPager) ValuesIn(pl PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Messageable](pl)
func (p *mailDeltaPager) Reset(ctx context.Context) {
p.builder = getMailDeltaBuilder(ctx, p.gs, p.userID, p.containerID)
}
func (c Mail) GetAddedAndRemovedItemIDs(
ctx context.Context,
userID, containerID, oldDelta string,
userID, containerID, prevDeltaLink string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
ctx = clues.Add(
ctx,
"category", selectors.ExchangeMail,
"data_category", path.EmailCategory,
"container_id", containerID)
pager := c.NewMailIDsPager(ctx, userID, containerID, immutableIDs)
deltaPager := c.NewMailDeltaIDsPager(ctx, userID, containerID, oldDelta, immutableIDs)
deltaPager := c.NewMailDeltaPager(
ctx,
userID,
containerID,
prevDeltaLink,
immutableIDs,
idAnd()...)
pager := c.NewMailPager(
userID,
containerID,
immutableIDs,
idAnd()...)
return getAddedAndRemovedItemIDs(ctx, c.Stable, pager, deltaPager, oldDelta, canMakeDeltaQueries)
return getAddedAndRemovedItemIDs[models.Messageable](
ctx,
pager,
deltaPager,
prevDeltaLink,
canMakeDeltaQueries)
}

View File

@ -8,16 +8,21 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type DeltaNextLinks struct {
type DeltaNextLinkValues[T any] struct {
Next *string
Delta *string
Values []T
}
func (dnl *DeltaNextLinks) GetOdataNextLink() *string {
func (dnl *DeltaNextLinkValues[T]) GetValue() []T {
return dnl.Values
}
func (dnl *DeltaNextLinkValues[T]) GetOdataNextLink() *string {
return dnl.Next
}
func (dnl *DeltaNextLinks) GetOdataDeltaLink() *string {
func (dnl *DeltaNextLinkValues[T]) GetOdataDeltaLink() *string {
return dnl.Delta
}
@ -37,7 +42,9 @@ type Pager[T any] struct {
getIdx int
}
func (p *Pager[T]) GetPage(context.Context) (api.PageLinker, error) {
func (p *Pager[T]) GetPage(
context.Context,
) (api.NextLinkValuer[T], error) {
if len(p.ToReturn) <= p.getIdx {
return nil, clues.New("index out of bounds").
With("index", p.getIdx, "values", p.ToReturn)
@ -46,27 +53,15 @@ func (p *Pager[T]) GetPage(context.Context) (api.PageLinker, error) {
idx := p.getIdx
p.getIdx++
link := DeltaNextLinks{Next: p.ToReturn[idx].NextLink}
link := DeltaNextLinkValues[T]{
Next: p.ToReturn[idx].NextLink,
Values: p.ToReturn[idx].Values,
}
return &link, p.ToReturn[idx].Err
}
func (p *Pager[T]) SetNext(string) {}
func (p *Pager[T]) ValuesIn(api.PageLinker) ([]T, error) {
idx := p.getIdx
if idx > 0 {
// Return values lag by one since we increment in GetPage().
idx--
}
if len(p.ToReturn) <= idx {
return nil, clues.New("index out of bounds").
With("index", idx, "values", p.ToReturn)
}
return p.ToReturn[idx].Values, nil
}
func (p *Pager[T]) SetNextLink(string) {}
// ---------------------------------------------------------------------------
// delta pager
@ -77,7 +72,9 @@ type DeltaPager[T any] struct {
getIdx int
}
func (p *DeltaPager[T]) GetPage(context.Context) (api.DeltaPageLinker, error) {
func (p *DeltaPager[T]) GetPage(
context.Context,
) (api.DeltaLinkValuer[T], error) {
if len(p.ToReturn) <= p.getIdx {
return nil, clues.New("index out of bounds").
With("index", p.getIdx, "values", p.ToReturn)
@ -86,28 +83,14 @@ func (p *DeltaPager[T]) GetPage(context.Context) (api.DeltaPageLinker, error) {
idx := p.getIdx
p.getIdx++
link := DeltaNextLinks{
link := DeltaNextLinkValues[T]{
Next: p.ToReturn[idx].NextLink,
Delta: p.ToReturn[idx].DeltaLink,
Values: p.ToReturn[idx].Values,
}
return &link, p.ToReturn[idx].Err
}
func (p *DeltaPager[T]) SetNext(string) {}
func (p *DeltaPager[T]) SetNextLink(string) {}
func (p *DeltaPager[T]) Reset(context.Context) {}
func (p *DeltaPager[T]) ValuesIn(api.PageLinker) ([]T, error) {
idx := p.getIdx
if idx > 0 {
// Return values lag by one since we increment in GetPage().
idx--
}
if len(p.ToReturn) <= idx {
return nil, clues.New("index out of bounds").
With("index", idx, "values", p.ToReturn)
}
return p.ToReturn[idx].Values, nil
}