update branch to latest design
This commit is contained in:
parent
2b7d76faef
commit
3c44837418
@ -85,8 +85,8 @@ type GetItemer interface {
|
||||
type EnumerateDriveItemsDeltaer interface {
|
||||
EnumerateDriveItemsDelta(
|
||||
ctx context.Context,
|
||||
ch chan<- api.NextPage[models.DriveItemable],
|
||||
driveID, prevDeltaLink string,
|
||||
cc api.CallConfig,
|
||||
) api.NextPageResulter[models.DriveItemable]
|
||||
}
|
||||
|
||||
|
||||
@ -136,11 +136,10 @@ func (h itemBackupHandler) IncludesDir(dir string) bool {
|
||||
|
||||
func (h itemBackupHandler) EnumerateDriveItemsDelta(
|
||||
ctx context.Context,
|
||||
ch chan<- api.NextPage[models.DriveItemable],
|
||||
driveID, prevDeltaLink string,
|
||||
selectProps []string,
|
||||
) (api.NextPageResulter[models.DriveItemable], error) {
|
||||
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps)
|
||||
cc api.CallConfig,
|
||||
) api.NextPageResulter[models.DriveItemable] {
|
||||
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, cc)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -139,11 +139,10 @@ func (h libraryBackupHandler) IncludesDir(dir string) bool {
|
||||
|
||||
func (h libraryBackupHandler) EnumerateDriveItemsDelta(
|
||||
ctx context.Context,
|
||||
ch chan<- api.NextPage[models.DriveItemable],
|
||||
driveID, prevDeltaLink string,
|
||||
selectProps []string,
|
||||
cc api.CallConfig,
|
||||
) api.NextPageResulter[models.DriveItemable] {
|
||||
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps)
|
||||
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, cc)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -157,47 +157,32 @@ func (uc *urlCache) refreshCache(
|
||||
// Issue a delta query to graph
|
||||
logger.Ctx(ctx).Info("refreshing url cache")
|
||||
|
||||
var (
|
||||
ch = make(chan api.NextPage[models.DriveItemable], 1)
|
||||
cacheErr error
|
||||
wg = sync.WaitGroup{}
|
||||
)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for pg := range ch {
|
||||
if cacheErr != nil {
|
||||
continue
|
||||
}
|
||||
pager := uc.enumerator.EnumerateDriveItemsDelta(
|
||||
ctx,
|
||||
uc.driveID,
|
||||
uc.prevDelta,
|
||||
api.CallConfig{
|
||||
Select: api.URLCacheDriveItemProps(),
|
||||
})
|
||||
|
||||
for page, reset, done := pager.NextPage(); !done; {
|
||||
uc.deltaQueryCount++
|
||||
|
||||
err := uc.updateCache(
|
||||
ctx,
|
||||
pg.Items,
|
||||
pg.Reset,
|
||||
page,
|
||||
reset,
|
||||
uc.errs)
|
||||
if err != nil {
|
||||
cacheErr = clues.Wrap(err, "updating cache")
|
||||
return clues.Wrap(err, "updating cache")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
du, err := uc.enumerator.EnumerateDriveItemsDelta(ctx, ch, uc.driveID, uc.prevDelta)
|
||||
du, err := pager.Results()
|
||||
if err != nil {
|
||||
uc.idToProps = make(map[string]itemProps)
|
||||
return clues.Stack(err)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if cacheErr != nil {
|
||||
return clues.Stack(cacheErr)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Info("url cache refreshed")
|
||||
|
||||
// Update last refresh time
|
||||
|
||||
@ -96,26 +96,20 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
||||
require.NotNil(t, newFolder.GetId())
|
||||
|
||||
nfid := ptr.Val(newFolder.GetId())
|
||||
ch := make(chan api.NextPage[models.DriveItemable], 1)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// no-op, we just need the previous delta
|
||||
// but also need to drain the channel to
|
||||
// prevent deadlock.
|
||||
_, ok := <-ch
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Get the previous delta to feed into url cache
|
||||
_, du, err := ac.EnumerateDriveItemsDelta(
|
||||
pager := ac.EnumerateDriveItemsDelta(
|
||||
ctx,
|
||||
suite.driveID,
|
||||
"",
|
||||
api.URLCacheDriveItemProps())
|
||||
api.CallConfig{
|
||||
Select: api.URLCacheDriveItemProps(),
|
||||
})
|
||||
|
||||
// normally we'd page through all the pager.NextPage
|
||||
// enumerations first. But Results should make sure
|
||||
// that we don't need to drain lower-level communication first.
|
||||
du, err := pager.Results()
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
require.NotEmpty(t, du.URL)
|
||||
|
||||
@ -210,7 +204,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
pages []api.NextPage[models.DriveItemable]
|
||||
pages []mock.NextPage[models.DriveItemable]
|
||||
pagerErr error
|
||||
expectedItemProps map[string]itemProps
|
||||
expectErr assert.ErrorAssertionFunc
|
||||
@ -218,7 +212,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
}{
|
||||
{
|
||||
name: "single item in cache",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
}},
|
||||
@ -238,7 +232,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "multiple items in cache",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
|
||||
@ -278,7 +272,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "multiple pages",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
|
||||
@ -320,7 +314,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "multiple pages with resets",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{
|
||||
Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
@ -377,7 +371,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "multiple pages with resets and combo reset+items in page",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{
|
||||
Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
@ -431,7 +425,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "duplicate items with potentially new urls",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
|
||||
@ -463,7 +457,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "deleted items",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
|
||||
@ -489,7 +483,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "item not found in cache",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
}},
|
||||
@ -506,7 +500,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
},
|
||||
{
|
||||
name: "delta query error",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{}},
|
||||
},
|
||||
pagerErr: errors.New("delta query error"),
|
||||
@ -524,7 +518,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
|
||||
{
|
||||
name: "folder item",
|
||||
pages: []api.NextPage[models.DriveItemable]{
|
||||
pages: []mock.NextPage[models.DriveItemable]{
|
||||
{Items: []models.DriveItemable{
|
||||
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
|
||||
driveItem("2", "folder2", "root", "root", false, true, false),
|
||||
@ -552,15 +546,13 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
medi := mock.EnumeratesDriveItemsDelta[models.DriveItemable]{
|
||||
Pages: map[string][]api.NextPage[models.DriveItemable]{
|
||||
driveID: test.pages,
|
||||
medi := mock.EnumerateItemsDeltaByDrive{
|
||||
DrivePagers: map[string]mock.DriveItemsDeltaPager{
|
||||
driveID: mock.DriveItemsDeltaPager{
|
||||
Pages: test.pages,
|
||||
Err: test.pagerErr,
|
||||
DeltaUpdate: api.DeltaUpdate{URL: deltaString},
|
||||
},
|
||||
Err: map[string]error{
|
||||
driveID: test.pagerErr,
|
||||
},
|
||||
DeltaUpdate: map[string]api.DeltaUpdate{
|
||||
driveID: {URL: deltaString},
|
||||
},
|
||||
}
|
||||
|
||||
@ -612,7 +604,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
|
||||
driveID,
|
||||
"",
|
||||
refreshInterval,
|
||||
&mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
|
||||
&mock.EnumerateItemsDeltaByDrive{},
|
||||
fault.New(true))
|
||||
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
@ -647,7 +639,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
|
||||
name: "invalid driveID",
|
||||
driveID: "",
|
||||
refreshInt: 1 * time.Hour,
|
||||
itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
|
||||
itemPager: &mock.EnumerateItemsDeltaByDrive{},
|
||||
errors: fault.New(true),
|
||||
expectErr: require.Error,
|
||||
},
|
||||
@ -655,7 +647,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
|
||||
name: "invalid refresh interval",
|
||||
driveID: "drive1",
|
||||
refreshInt: 100 * time.Millisecond,
|
||||
itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
|
||||
itemPager: &mock.EnumerateItemsDeltaByDrive{},
|
||||
errors: fault.New(true),
|
||||
expectErr: require.Error,
|
||||
},
|
||||
@ -671,7 +663,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
|
||||
name: "valid",
|
||||
driveID: "drive1",
|
||||
refreshInt: 1 * time.Hour,
|
||||
itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
|
||||
itemPager: &mock.EnumerateItemsDeltaByDrive{},
|
||||
errors: fault.New(true),
|
||||
expectErr: require.NoError,
|
||||
},
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/idname"
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
@ -28,7 +27,7 @@ type BackupHandler[T any] struct {
|
||||
// and plug in the selector scope there.
|
||||
Sel selectors.Selector
|
||||
|
||||
DriveItemEnumeration EnumeratesDriveItemsDelta[T]
|
||||
DriveItemEnumeration EnumerateItemsDeltaByDrive
|
||||
|
||||
GI GetsItem
|
||||
GIP GetsItemPermission
|
||||
@ -67,7 +66,7 @@ func DefaultOneDriveBH(resourceOwner string) *BackupHandler[models.DriveItemable
|
||||
Extension: &details.ExtensionData{},
|
||||
},
|
||||
Sel: sel.Selector,
|
||||
DriveItemEnumeration: EnumeratesDriveItemsDelta[models.DriveItemable]{},
|
||||
DriveItemEnumeration: EnumerateItemsDeltaByDrive{},
|
||||
GI: GetsItem{Err: clues.New("not defined")},
|
||||
GIP: GetsItemPermission{Err: clues.New("not defined")},
|
||||
PathPrefixFn: defaultOneDrivePathPrefixer,
|
||||
@ -107,7 +106,7 @@ func DefaultSharePointBH(resourceOwner string) *BackupHandler[models.DriveItemab
|
||||
}
|
||||
|
||||
func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) {
|
||||
pp, err := h.PathPrefixFn(tID, h.ResourceOwner, driveID)
|
||||
pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -116,7 +115,7 @@ func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) {
|
||||
}
|
||||
|
||||
func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) {
|
||||
pp, err := h.MetadataPathPrefixFn(tID, h.ResourceOwner)
|
||||
pp, err := h.MetadataPathPrefixFn(tID, h.ProtectedResource.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -125,7 +124,7 @@ func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) {
|
||||
}
|
||||
|
||||
func (h BackupHandler[T]) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) {
|
||||
cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource)
|
||||
cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -149,7 +148,7 @@ func (h BackupHandler[T]) NewLocationIDer(driveID string, elems ...string) detai
|
||||
return h.LocationIDFn(driveID, elems...)
|
||||
}
|
||||
|
||||
func (h BackupHandler) AugmentItemInfo(
|
||||
func (h BackupHandler[T]) AugmentItemInfo(
|
||||
details.ItemInfo,
|
||||
idname.Provider,
|
||||
models.DriveItemable,
|
||||
@ -173,15 +172,14 @@ func (h *BackupHandler[T]) Get(context.Context, string, map[string]string) (*htt
|
||||
|
||||
func (h BackupHandler[T]) EnumerateDriveItemsDelta(
|
||||
ctx context.Context,
|
||||
ch chan<- api.NextPage[T],
|
||||
driveID, prevDeltaLink string,
|
||||
selectProps []string,
|
||||
) ([]models.DriveItemable, api.DeltaUpdate, error) {
|
||||
cc api.CallConfig,
|
||||
) api.NextPageResulter[models.DriveItemable] {
|
||||
return h.DriveItemEnumeration.EnumerateDriveItemsDelta(
|
||||
ctx,
|
||||
driveID,
|
||||
prevDeltaLink,
|
||||
selectProps)
|
||||
cc)
|
||||
}
|
||||
|
||||
func (h BackupHandler[T]) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) {
|
||||
@ -294,26 +292,46 @@ func (m GetsItem) GetItem(
|
||||
// Enumerates Drive Items
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type EnumeratesDriveItemsDelta[T any] struct {
|
||||
Pages map[string][]api.NextPage[T]
|
||||
DeltaUpdate map[string]api.DeltaUpdate
|
||||
Err map[string]error
|
||||
type NextPage[T any] struct {
|
||||
Items []T
|
||||
Reset bool
|
||||
}
|
||||
|
||||
func (edi EnumeratesDriveItemsDelta[T]) EnumerateDriveItemsDelta(
|
||||
type EnumerateItemsDeltaByDrive struct {
|
||||
DrivePagers map[string]DriveItemsDeltaPager
|
||||
}
|
||||
|
||||
var _ api.NextPageResulter[models.DriveItemable] = &DriveItemsDeltaPager{}
|
||||
|
||||
type DriveItemsDeltaPager struct {
|
||||
Idx int
|
||||
Pages []NextPage[models.DriveItemable]
|
||||
DeltaUpdate api.DeltaUpdate
|
||||
Err error
|
||||
}
|
||||
|
||||
func (edibd EnumerateItemsDeltaByDrive) EnumerateDriveItemsDelta(
|
||||
_ context.Context,
|
||||
ch chan<- api.NextPage[T],
|
||||
driveID, _ string,
|
||||
_ []string,
|
||||
_ api.CallConfig,
|
||||
) api.NextPageResulter[models.DriveItemable] {
|
||||
return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID]
|
||||
didp := edibd.DrivePagers[driveID]
|
||||
return &didp
|
||||
}
|
||||
|
||||
for _, page := range edi.Pages[driveID] {
|
||||
ch <- page
|
||||
func (edi *DriveItemsDeltaPager) NextPage() ([]models.DriveItemable, bool, bool) {
|
||||
if edi.Idx >= len(edi.Pages) {
|
||||
return nil, false, true
|
||||
}
|
||||
|
||||
return edi.DeltaUpdate[driveID], edi.Err[driveID]
|
||||
np := edi.Pages[edi.Idx]
|
||||
edi.Idx++
|
||||
|
||||
return np.Items, np.Reset, false
|
||||
}
|
||||
|
||||
func (edi *DriveItemsDeltaPager) Results() (api.DeltaUpdate, error) {
|
||||
return edi.DeltaUpdate, edi.Err
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -136,6 +136,7 @@ func (c Client) Post(
|
||||
|
||||
type CallConfig struct {
|
||||
Expand []string
|
||||
Select []string
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -133,7 +133,7 @@ type DriveItemDeltaPageCtrl struct {
|
||||
|
||||
func (c Drives) newDriveItemDeltaPager(
|
||||
driveID, prevDeltaLink string,
|
||||
selectProps ...string,
|
||||
cc CallConfig,
|
||||
) *DriveItemDeltaPageCtrl {
|
||||
preferHeaderItems := []string{
|
||||
"deltashowremovedasdeleted",
|
||||
@ -147,8 +147,8 @@ func (c Drives) newDriveItemDeltaPager(
|
||||
QueryParameters: &drives.ItemItemsItemDeltaRequestBuilderGetQueryParameters{},
|
||||
}
|
||||
|
||||
if len(selectProps) > 0 {
|
||||
options.QueryParameters.Select = selectProps
|
||||
if len(cc.Select) > 0 {
|
||||
options.QueryParameters.Select = cc.Select
|
||||
}
|
||||
|
||||
builder := c.Stable.
|
||||
@ -203,12 +203,12 @@ func (c Drives) EnumerateDriveItemsDelta(
|
||||
ctx context.Context,
|
||||
driveID string,
|
||||
prevDeltaLink string,
|
||||
selectProps []string,
|
||||
cc CallConfig,
|
||||
) NextPageResulter[models.DriveItemable] {
|
||||
deltaPager := c.newDriveItemDeltaPager(
|
||||
driveID,
|
||||
prevDeltaLink,
|
||||
selectProps...)
|
||||
cc)
|
||||
|
||||
npr := &nextPageResults[models.DriveItemable]{
|
||||
pages: make(chan nextPage[models.DriveItemable]),
|
||||
|
||||
@ -191,7 +191,13 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() {
|
||||
pager := suite.its.
|
||||
ac.
|
||||
Drives().
|
||||
EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps())
|
||||
EnumerateDriveItemsDelta(
|
||||
ctx,
|
||||
suite.its.user.driveID,
|
||||
"",
|
||||
api.CallConfig{
|
||||
Select: api.DefaultDriveItemProps(),
|
||||
})
|
||||
|
||||
for page, reset, done := pager.NextPage(); !done; {
|
||||
items = append(items, page...)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user