lists prefetech (#4943)

refactors sharepoint lists collection by:
- renaming `Collection` interface as `preFetchCollection` interface
- making `finishPopulation` method as independent function 
- seralizing list using `serializeContent` function

#### Does this PR need a docs update or release note?
- [x]  No

#### Type of change
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)
#4754 

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Hitesh Pattanayak 2023-12-30 20:38:29 +05:30 committed by GitHub
parent 823fcdc559
commit 13f549c6cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 563 additions and 139 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
stdpath "path" stdpath "path"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
@ -123,7 +124,7 @@ func CollectPages(
el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "creating page collection path")) el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "creating page collection path"))
} }
collection := NewCollection( collection := NewPrefetchCollection(
nil, nil,
dir, dir,
ac, ac,
@ -131,7 +132,7 @@ func CollectPages(
su, su,
bpc.Options) bpc.Options)
collection.SetBetaService(betaService) collection.SetBetaService(betaService)
collection.AddItem(tuple.ID) collection.AddItem(tuple.ID, time.Now())
spcs = append(spcs, collection) spcs = append(spcs, collection)
} }
@ -148,16 +149,19 @@ func CollectLists(
scope selectors.SharePointScope, scope selectors.SharePointScope,
su support.StatusUpdater, su support.StatusUpdater,
errs *fault.Bus, errs *fault.Bus,
counter *count.Bus,
) ([]data.BackupCollection, error) { ) ([]data.BackupCollection, error) {
logger.Ctx(ctx).Debug("Creating SharePoint List Collections") logger.Ctx(ctx).Debug("Creating SharePoint List Collections")
var ( var (
collection data.BackupCollection
el = errs.Local() el = errs.Local()
cl = counter.Local()
spcs = make([]data.BackupCollection, 0) spcs = make([]data.BackupCollection, 0)
acc = api.CallConfig{Select: idAnd("list")} cfg = api.CallConfig{Select: idAnd("list", "lastModifiedDateTime")}
) )
lists, err := bh.GetItems(ctx, acc) lists, err := bh.GetItems(ctx, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -171,6 +175,8 @@ func CollectLists(
continue continue
} }
modTime := ptr.Val(list.GetLastModifiedDateTime())
dir, err := path.Build( dir, err := path.Build(
tenantID, tenantID,
bpc.ProtectedResource.ID(), bpc.ProtectedResource.ID(),
@ -182,14 +188,35 @@ func CollectLists(
el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "creating list collection path")) el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "creating list collection path"))
} }
collection := NewCollection( lazyFetchCol := NewLazyFetchCollection(
bh,
dir,
su,
cl)
lazyFetchCol.AddItem(
ptr.Val(list.GetId()),
modTime)
collection = lazyFetchCol
// Always use lazyFetchCol.
// In case we receive zero mod time from graph fallback to prefetchCol.
if modTime.IsZero() {
prefetchCol := NewPrefetchCollection(
bh, bh,
dir, dir,
ac, ac,
scope, scope,
su, su,
bpc.Options) bpc.Options)
collection.AddItem(ptr.Val(list.GetId()))
prefetchCol.AddItem(
ptr.Val(list.GetId()),
modTime)
collection = prefetchCol
}
spcs = append(spcs, collection) spcs = append(spcs, collection)
} }

View File

@ -121,7 +121,8 @@ func (suite *SharePointSuite) TestCollectLists() {
creds.AzureTenantID, creds.AzureTenantID,
sel.Lists(selectors.Any())[0], sel.Lists(selectors.Any())[0],
(&MockGraphService{}).UpdateStatus, (&MockGraphService{}).UpdateStatus,
fault.New(true)) fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
assert.Less(t, 0, len(col)) assert.Less(t, 0, len(col))
} }

View File

@ -4,12 +4,16 @@ import (
"bytes" "bytes"
"context" "context"
"io" "io"
"net/http"
"sync" "sync"
"sync/atomic"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoft/kiota-abstractions-go/serialization" "github.com/microsoft/kiota-abstractions-go/serialization"
kjson "github.com/microsoft/kiota-serialization-json-go" kjson "github.com/microsoft/kiota-serialization-json-go"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -18,6 +22,7 @@ import (
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -41,19 +46,24 @@ const (
Pages DataCategory = 2 Pages DataCategory = 2
) )
var _ data.BackupCollection = &Collection{} var (
_ data.BackupCollection = &prefetchCollection{}
_ data.BackupCollection = &lazyFetchCollection{}
)
// Collection is the SharePoint.List or SharePoint.Page implementation of data.Collection. // Collection is the SharePoint.List or SharePoint.Page implementation of data.Collection.
// SharePoint.Libraries collections are supported by the oneDrive.Collection // SharePoint.Libraries collections are supported by the oneDrive.Collection
// as the calls are identical for populating the Collection // as the calls are identical for populating the Collection
type Collection struct { type prefetchCollection struct {
// stream is the container for each individual SharePoint item of (page/list) // stream is a container for each individual SharePoint item (page/list) category,
stream chan data.Item // where the category type serves as the key, and the associated channel holds the items.
stream map[path.CategoryType]chan data.Item
// fullPath indicates the hierarchy within the collection // fullPath indicates the hierarchy within the collection
fullPath path.Path fullPath path.Path
// jobs contain the SharePoint.List.IDs or SharePoint.Page.IDs // items contains the SharePoint.List.IDs or SharePoint.Page.IDs
items []string // and their corresponding last modified time
items map[string]time.Time
// M365 IDs of the items of this collection // M365 IDs of the items of this collection
category path.CategoryType category path.CategoryType
client api.Sites client api.Sites
@ -63,20 +73,20 @@ type Collection struct {
getter getItemByIDer getter getItemByIDer
} }
// NewCollection helper function for creating a Collection // NewPrefetchCollection constructor function for creating a prefetchCollection
func NewCollection( func NewPrefetchCollection(
getter getItemByIDer, getter getItemByIDer,
folderPath path.Path, folderPath path.Path,
ac api.Client, ac api.Client,
scope selectors.SharePointScope, scope selectors.SharePointScope,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
ctrlOpts control.Options, ctrlOpts control.Options,
) *Collection { ) *prefetchCollection {
c := &Collection{ c := &prefetchCollection{
fullPath: folderPath, fullPath: folderPath,
items: make([]string, 0), items: make(map[string]time.Time),
getter: getter, getter: getter,
stream: make(chan data.Item, collectionChannelBufferSize), stream: make(map[path.CategoryType]chan data.Item),
client: ac.Sites(), client: ac.Sites(),
statusUpdater: statusUpdater, statusUpdater: statusUpdater,
category: scope.Category().PathType(), category: scope.Category().PathType(),
@ -86,82 +96,67 @@ func NewCollection(
return c return c
} }
func (sc *Collection) SetBetaService(betaService *betaAPI.BetaService) { func (pc *prefetchCollection) SetBetaService(betaService *betaAPI.BetaService) {
sc.betaService = betaService pc.betaService = betaService
} }
// AddItem appends additional itemID to items field // AddItem appends additional itemID to items field
func (sc *Collection) AddItem(itemID string) { func (pc *prefetchCollection) AddItem(itemID string, lastModifedTime time.Time) {
sc.items = append(sc.items, itemID) pc.items[itemID] = lastModifedTime
} }
func (sc *Collection) FullPath() path.Path { func (pc *prefetchCollection) FullPath() path.Path {
return sc.fullPath return pc.fullPath
} }
// TODO(ashmrtn): Fill in with previous path once the Controller compares old // TODO(ashmrtn): Fill in with previous path once the Controller compares old
// and new folder hierarchies. // and new folder hierarchies.
func (sc Collection) PreviousPath() path.Path { func (pc prefetchCollection) PreviousPath() path.Path {
return nil return nil
} }
func (sc Collection) LocationPath() *path.Builder { func (pc prefetchCollection) LocationPath() *path.Builder {
return path.Builder{}.Append(sc.fullPath.Folders()...) return path.Builder{}.Append(pc.fullPath.Folders()...)
} }
func (sc Collection) State() data.CollectionState { func (pc prefetchCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (sc Collection) DoNotMergeItems() bool { func (pc prefetchCollection) DoNotMergeItems() bool {
return false return false
} }
func (sc *Collection) Items( func (pc *prefetchCollection) Items(
ctx context.Context, ctx context.Context,
errs *fault.Bus, errs *fault.Bus,
) <-chan data.Item { ) <-chan data.Item {
go sc.streamItems(ctx, errs) if _, ok := pc.stream[pc.category]; !ok {
return sc.stream pc.stream[pc.category] = make(chan data.Item, collectionChannelBufferSize)
} }
func (sc *Collection) finishPopulation( go pc.streamItems(ctx, errs)
ctx context.Context,
metrics *support.CollectionMetrics,
) {
close(sc.stream)
status := support.CreateStatus( return pc.stream[pc.category]
ctx,
support.Backup,
1, // 1 folder
*metrics,
sc.fullPath.Folder(false))
logger.Ctx(ctx).Debug(status.String())
if sc.statusUpdater != nil {
sc.statusUpdater(status)
}
} }
// streamItems utility function to retrieve data from back store for a given collection // streamItems utility function to retrieve data from back store for a given collection
func (sc *Collection) streamItems( func (pc *prefetchCollection) streamItems(
ctx context.Context, ctx context.Context,
errs *fault.Bus, errs *fault.Bus,
) { ) {
// Switch retrieval function based on category // Switch retrieval function based on category
switch sc.category { switch pc.category {
case path.ListsCategory: case path.ListsCategory:
sc.streamLists(ctx, errs) pc.streamLists(ctx, errs)
case path.PagesCategory: case path.PagesCategory:
sc.retrievePages(ctx, sc.client, errs) pc.streamPages(ctx, pc.client, errs)
} }
} }
// streamLists utility function for collection that downloads and serializes // streamLists utility function for collection that downloads and serializes
// models.Listable objects based on M365 IDs from the jobs field. // models.Listable objects based on M365 IDs from the jobs field.
func (sc *Collection) streamLists( func (pc *prefetchCollection) streamLists(
ctx context.Context, ctx context.Context,
errs *fault.Bus, errs *fault.Bus,
) { ) {
@ -169,12 +164,20 @@ func (sc *Collection) streamLists(
metrics support.CollectionMetrics metrics support.CollectionMetrics
el = errs.Local() el = errs.Local()
wg sync.WaitGroup wg sync.WaitGroup
objects int64
objectBytes int64
objectSuccesses int64
) )
defer sc.finishPopulation(ctx, &metrics) defer updateStatus(
ctx,
pc.stream[path.ListsCategory],
pc.statusUpdater,
pc.fullPath,
&metrics)
// TODO: Insert correct ID for CollectionProgress // TODO: Insert correct ID for CollectionProgress
progress := observe.CollectionProgress(ctx, sc.fullPath.Category().HumanString(), sc.fullPath.Folders()) progress := observe.CollectionProgress(ctx, pc.fullPath.Category().HumanString(), pc.fullPath.Folders())
defer close(progress) defer close(progress)
semaphoreCh := make(chan struct{}, fetchChannelSize) semaphoreCh := make(chan struct{}, fetchChannelSize)
@ -182,7 +185,7 @@ func (sc *Collection) streamLists(
// For each models.Listable, object is serialized and the metrics are collected. // For each models.Listable, object is serialized and the metrics are collected.
// The progress is objected via the passed in channel. // The progress is objected via the passed in channel.
for _, listID := range sc.items { for listID := range pc.items {
if el.Failure() != nil { if el.Failure() != nil {
break break
} }
@ -190,15 +193,26 @@ func (sc *Collection) streamLists(
wg.Add(1) wg.Add(1)
semaphoreCh <- struct{}{} semaphoreCh <- struct{}{}
sc.handleListItems(ctx, semaphoreCh, progress, listID, el, &metrics) go pc.handleListItems(
ctx,
wg.Done() semaphoreCh,
progress,
&wg,
listID,
&objects,
&objectBytes,
&objectSuccesses,
el)
} }
wg.Wait() wg.Wait()
metrics.Objects = int(objects)
metrics.Bytes = objectBytes
metrics.Successes = int(objectSuccesses)
} }
func (sc *Collection) retrievePages( func (pc *prefetchCollection) streamPages(
ctx context.Context, ctx context.Context,
as api.Sites, as api.Sites,
errs *fault.Bus, errs *fault.Bus,
@ -208,22 +222,24 @@ func (sc *Collection) retrievePages(
el = errs.Local() el = errs.Local()
) )
defer sc.finishPopulation(ctx, &metrics) defer updateStatus(
ctx,
pc.stream[path.PagesCategory],
pc.statusUpdater,
pc.fullPath,
&metrics)
// TODO: Insert correct ID for CollectionProgress // TODO: Insert correct ID for CollectionProgress
progress := observe.CollectionProgress(ctx, sc.fullPath.Category().HumanString(), sc.fullPath.Folders()) progress := observe.CollectionProgress(ctx, pc.fullPath.Category().HumanString(), pc.fullPath.Folders())
defer close(progress) defer close(progress)
wtr := kjson.NewJsonSerializationWriter() betaService := pc.betaService
defer wtr.Close()
betaService := sc.betaService
if betaService == nil { if betaService == nil {
logger.Ctx(ctx).Error(clues.New("beta service required")) logger.Ctx(ctx).Error(clues.New("beta service required"))
return return
} }
parent, err := as.GetByID(ctx, sc.fullPath.ProtectedResource(), api.CallConfig{}) parent, err := as.GetByID(ctx, pc.fullPath.ProtectedResource(), api.CallConfig{})
if err != nil { if err != nil {
logger.Ctx(ctx).Error(err) logger.Ctx(ctx).Error(err)
@ -232,7 +248,9 @@ func (sc *Collection) retrievePages(
root := ptr.Val(parent.GetWebUrl()) root := ptr.Val(parent.GetWebUrl())
pages, err := betaAPI.GetSitePages(ctx, betaService, sc.fullPath.ProtectedResource(), sc.items, errs) pageIDs := maps.Keys(pc.items)
pages, err := betaAPI.GetSitePages(ctx, betaService, pc.fullPath.ProtectedResource(), pageIDs, errs)
if err != nil { if err != nil {
logger.Ctx(ctx).Error(err) logger.Ctx(ctx).Error(err)
@ -248,7 +266,7 @@ func (sc *Collection) retrievePages(
break break
} }
byteArray, err := serializeContent(ctx, wtr, pg) byteArray, err := serializeContent(ctx, pg)
if err != nil { if err != nil {
el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "serializing page").Label(fault.LabelForceNoBackupCreation)) el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "serializing page").Label(fault.LabelForceNoBackupCreation))
continue continue
@ -272,51 +290,32 @@ func (sc *Collection) retrievePages(
continue continue
} }
sc.stream <- item pc.stream[path.PagesCategory] <- item
progress <- struct{}{} progress <- struct{}{}
} }
} }
func serializeContent( func (pc *prefetchCollection) handleListItems(
ctx context.Context,
writer *kjson.JsonSerializationWriter,
obj serialization.Parsable,
) ([]byte, error) {
defer writer.Close()
err := writer.WriteObjectValue("", obj)
if err != nil {
return nil, graph.Wrap(ctx, err, "writing object")
}
byteArray, err := writer.GetSerializedContent()
if err != nil {
return nil, graph.Wrap(ctx, err, "getting content from writer")
}
return byteArray, nil
}
func (sc *Collection) handleListItems(
ctx context.Context, ctx context.Context,
semaphoreCh chan struct{}, semaphoreCh chan struct{},
progress chan<- struct{}, progress chan<- struct{},
wg *sync.WaitGroup,
listID string, listID string,
objects *int64,
objectBytes *int64,
objectSuccesses *int64,
el *fault.Bus, el *fault.Bus,
metrics *support.CollectionMetrics,
) { ) {
defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
var ( var (
list models.Listable list models.Listable
info *details.SharePointInfo info *details.SharePointInfo
err error err error
) )
list, info, err = sc.getter.GetItemByID(ctx, listID) list, info, err = pc.getter.GetItemByID(ctx, listID)
if err != nil { if err != nil {
err = clues.WrapWC(ctx, err, "getting list data").Label(fault.LabelForceNoBackupCreation) err = clues.WrapWC(ctx, err, "getting list data").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err) el.AddRecoverable(ctx, err)
@ -324,20 +323,11 @@ func (sc *Collection) handleListItems(
return return
} }
metrics.Objects++ atomic.AddInt64(objects, 1)
if err := writer.WriteObjectValue("", list); err != nil { entryBytes, err := serializeContent(ctx, list)
err = clues.WrapWC(ctx, err, "writing list to serializer").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
entryBytes, err := writer.GetSerializedContent()
if err != nil { if err != nil {
err = clues.WrapWC(ctx, err, "serializing list").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err) el.AddRecoverable(ctx, err)
return return
} }
@ -347,8 +337,8 @@ func (sc *Collection) handleListItems(
return return
} }
metrics.Bytes += size atomic.AddInt64(objectBytes, size)
metrics.Successes++ atomic.AddInt64(objectSuccesses, 1)
rc := io.NopCloser(bytes.NewReader(entryBytes)) rc := io.NopCloser(bytes.NewReader(entryBytes))
itemInfo := details.ItemInfo{ itemInfo := details.ItemInfo{
@ -363,6 +353,200 @@ func (sc *Collection) handleListItems(
return return
} }
sc.stream <- item pc.stream[path.ListsCategory] <- item
progress <- struct{}{} progress <- struct{}{}
} }
type lazyFetchCollection struct {
// stream is the container for each individual SharePoint item of list
stream chan data.Item
// fullPath indicates the hierarchy within the collection
fullPath path.Path
// jobs contain the SharePoint.List.IDs and their last modified time
items map[string]time.Time
statusUpdater support.StatusUpdater
getter getItemByIDer
counter *count.Bus
}
func NewLazyFetchCollection(
getter getItemByIDer,
folderPath path.Path,
statusUpdater support.StatusUpdater,
counter *count.Bus,
) *lazyFetchCollection {
c := &lazyFetchCollection{
fullPath: folderPath,
items: make(map[string]time.Time),
getter: getter,
stream: make(chan data.Item, collectionChannelBufferSize),
statusUpdater: statusUpdater,
counter: counter,
}
return c
}
func (lc *lazyFetchCollection) AddItem(itemID string, lastModifiedTime time.Time) {
lc.items[itemID] = lastModifiedTime
lc.counter.Add(count.ItemsAdded, 1)
}
func (lc *lazyFetchCollection) FullPath() path.Path {
return lc.fullPath
}
func (lc lazyFetchCollection) LocationPath() *path.Builder {
return path.Builder{}.Append(lc.fullPath.Folders()...)
}
// TODO(hitesh): Implement PreviousPath, State, DoNotMergeItems
// once the Controller compares old and new folder hierarchies.
func (lc lazyFetchCollection) PreviousPath() path.Path {
return nil
}
func (lc lazyFetchCollection) State() data.CollectionState {
return data.NewState
}
func (lc lazyFetchCollection) DoNotMergeItems() bool {
return false
}
func (lc lazyFetchCollection) Items(
ctx context.Context,
errs *fault.Bus,
) <-chan data.Item {
go lc.streamItems(ctx, errs)
return lc.stream
}
func (lc *lazyFetchCollection) streamItems(
ctx context.Context,
errs *fault.Bus,
) {
var (
metrics support.CollectionMetrics
el = errs.Local()
numLists int64
)
defer updateStatus(
ctx,
lc.stream,
lc.statusUpdater,
lc.fullPath,
&metrics)
progress := observe.CollectionProgress(ctx, lc.fullPath.Category().HumanString(), lc.fullPath.Folders())
defer close(progress)
for listID, modTime := range lc.items {
if el.Failure() != nil {
break
}
lc.stream <- data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter{
itemID: listID,
getter: lc.getter,
modTime: modTime,
},
listID,
modTime,
lc.counter,
el)
metrics.Successes++
progress <- struct{}{}
}
metrics.Objects += int(numLists)
}
type lazyItemGetter struct {
getter getItemByIDer
itemID string
modTime time.Time
}
func (lig *lazyItemGetter) GetData(
ctx context.Context,
el *fault.Bus,
) (io.ReadCloser, *details.ItemInfo, bool, error) {
list, info, err := lig.getter.GetItemByID(ctx, lig.itemID)
if err != nil {
if clues.HasLabel(err, graph.LabelStatus(http.StatusNotFound)) || graph.IsErrDeletedInFlight(err) {
logger.CtxErr(ctx, err).Info("item deleted in flight. skipping")
// Returning delInFlight as true here for correctness, although the caller is going
// to ignore it since we are returning an error.
return nil, nil, true, clues.Wrap(err, "deleted item").Label(graph.LabelsSkippable)
}
err = clues.WrapWC(ctx, err, "getting list data").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return nil, nil, false, err
}
entryBytes, err := serializeContent(ctx, list)
if err != nil {
el.AddRecoverable(ctx, err)
return nil, nil, false, err
}
info.Modified = lig.modTime
return io.NopCloser(bytes.NewReader(entryBytes)),
&details.ItemInfo{SharePoint: info},
false,
nil
}
func serializeContent(
ctx context.Context,
obj serialization.Parsable,
) ([]byte, error) {
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
err := writer.WriteObjectValue("", obj)
if err != nil {
return nil, graph.Wrap(ctx, err, "writing to serializer").Label(fault.LabelForceNoBackupCreation)
}
byteArray, err := writer.GetSerializedContent()
if err != nil {
return nil, graph.Wrap(ctx, err, "getting content from writer").Label(fault.LabelForceNoBackupCreation)
}
return byteArray, nil
}
func updateStatus(
ctx context.Context,
stream chan data.Item,
su support.StatusUpdater,
fullPath path.Path,
metrics *support.CollectionMetrics,
) {
close(stream)
status := support.CreateStatus(
ctx,
support.Backup,
1, // 1 folder
*metrics,
fullPath.Folder(false))
logger.Ctx(ctx).Debug(status.String())
if su != nil {
su(status)
}
}

View File

@ -3,7 +3,9 @@ package site
import ( import (
"bytes" "bytes"
"io" "io"
"slices"
"testing" "testing"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
kioser "github.com/microsoft/kiota-serialization-json-go" kioser "github.com/microsoft/kiota-serialization-json-go"
@ -11,10 +13,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/site/mock" "github.com/alcionai/corso/src/internal/m365/collection/site/mock"
betaAPI "github.com/alcionai/corso/src/internal/m365/service/sharepoint/api" betaAPI "github.com/alcionai/corso/src/internal/m365/service/sharepoint/api"
spMock "github.com/alcionai/corso/src/internal/m365/service/sharepoint/mock" spMock "github.com/alcionai/corso/src/internal/m365/service/sharepoint/mock"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
@ -25,6 +29,7 @@ import (
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
) )
type SharePointCollectionSuite struct { type SharePointCollectionSuite struct {
@ -63,7 +68,7 @@ func TestSharePointCollectionSuite(t *testing.T) {
// TestListCollection tests basic functionality to create // TestListCollection tests basic functionality to create
// SharePoint collection and to use the data stream channel. // SharePoint collection and to use the data stream channel.
func (suite *SharePointCollectionSuite) TestCollection_Items() { func (suite *SharePointCollectionSuite) TestPrefetchCollection_Items() {
var ( var (
tenant = "some" tenant = "some"
user = "user" user = "user"
@ -74,6 +79,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
tables := []struct { tables := []struct {
name, itemName string name, itemName string
cat path.CategoryType
scope selectors.SharePointScope scope selectors.SharePointScope
getter getItemByIDer getter getItemByIDer
getDir func(t *testing.T) path.Path getDir func(t *testing.T) path.Path
@ -82,6 +88,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
{ {
name: "List", name: "List",
itemName: "MockListing", itemName: "MockListing",
cat: path.ListsCategory,
scope: sel.Lists(selectors.Any())[0], scope: sel.Lists(selectors.Any())[0],
getter: &mock.ListHandler{}, getter: &mock.ListHandler{},
getDir: func(t *testing.T) path.Path { getDir: func(t *testing.T) path.Path {
@ -123,6 +130,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
{ {
name: "Pages", name: "Pages",
itemName: "MockPages", itemName: "MockPages",
cat: path.PagesCategory,
scope: sel.Pages(selectors.Any())[0], scope: sel.Pages(selectors.Any())[0],
getter: nil, getter: nil,
getDir: func(t *testing.T) path.Path { getDir: func(t *testing.T) path.Path {
@ -160,14 +168,15 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
col := NewCollection( col := NewPrefetchCollection(
test.getter, test.getter,
test.getDir(t), test.getDir(t),
suite.ac, suite.ac,
test.scope, test.scope,
nil, nil,
control.DefaultOptions()) control.DefaultOptions())
col.stream <- test.getItem(t, test.itemName) col.stream[test.cat] = make(chan data.Item, collectionChannelBufferSize)
col.stream[test.cat] <- test.getItem(t, test.itemName)
readItems := []data.Item{} readItems := []data.Item{}
@ -189,3 +198,173 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
}) })
} }
} }
func (suite *SharePointCollectionSuite) TestLazyCollection_Items() {
var (
t = suite.T()
errs = fault.New(true)
start = time.Now().Add(-time.Second)
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build(
"t", "pr", path.SharePointService, path.ListsCategory, false, "listid")
require.NoError(t, err, clues.ToCore(err))
tables := []struct {
name string
items map[string]time.Time
expectItemCount int
expectReads []string
}{
{
name: "no lists",
},
{
name: "added lists",
items: map[string]time.Time{
"list1": start.Add(time.Minute),
"list2": start.Add(2 * time.Minute),
"list3": start.Add(3 * time.Minute),
},
expectItemCount: 3,
expectReads: []string{
"list1",
"list2",
"list3",
},
},
}
for _, test := range tables {
suite.Run(test.name, func() {
itemCount := 0
ctx, flush := tester.NewContext(t)
defer flush()
getter := &mock.ListHandler{}
defer getter.Check(t, test.expectReads)
col := &lazyFetchCollection{
stream: make(chan data.Item),
fullPath: fullPath,
items: test.items,
getter: getter,
statusUpdater: statusUpdater,
}
for item := range col.Items(ctx, errs) {
itemCount++
modTime, aok := test.items[item.ID()]
require.True(t, aok, "item must have been added: %q", item.ID())
assert.Implements(t, (*data.ItemModTime)(nil), item)
assert.Equal(t, modTime, item.(data.ItemModTime).ModTime(), "item mod time")
if slices.Contains(test.expectReads, item.ID()) {
r := item.ToReader()
_, err := io.ReadAll(r)
assert.NoError(t, err, clues.ToCore(err))
r.Close()
assert.Implements(t, (*data.ItemInfo)(nil), item)
info, err := item.(data.ItemInfo).Info()
assert.NoError(t, err, clues.ToCore(err))
assert.Equal(t, modTime, info.Modified(), "ItemInfo mod time")
}
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
test.expectItemCount,
itemCount,
"should see all expected items")
})
}
}
func (suite *SharePointCollectionSuite) TestLazyItem() {
var (
t = suite.T()
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
lh := mock.ListHandler{}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter{
itemID: "itemID",
getter: &lh,
modTime: now,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
require.NoError(t, err, "reading item data: %v", clues.ToCore(err))
assert.NotEmpty(t, readData, "read item data")
info, err := li.Info()
require.NoError(t, err, "getting item info: %v", clues.ToCore(err))
assert.Equal(t, now, info.Modified())
}
func (suite *SharePointCollectionSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() {
var (
t = suite.T()
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
lh := mock.ListHandler{
Err: graph.ErrDeletedInFlight,
}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter{
itemID: "itemID",
getter: &lh,
modTime: now,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
assert.ErrorIs(t, err, graph.ErrDeletedInFlight, "item should be marked deleted in flight")
assert.Nil(t, r)
}

View File

@ -2,8 +2,11 @@ package mock
import ( import (
"context" "context"
"slices"
"testing"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
@ -11,6 +14,7 @@ import (
type ListHandler struct { type ListHandler struct {
List models.Listable List models.Listable
ListIDs []string
Err error Err error
} }
@ -18,6 +22,8 @@ func (lh *ListHandler) GetItemByID(
ctx context.Context, ctx context.Context,
itemID string, itemID string,
) (models.Listable, *details.SharePointInfo, error) { ) (models.Listable, *details.SharePointInfo, error) {
lh.ListIDs = append(lh.ListIDs, itemID)
ls := models.NewList() ls := models.NewList()
lh.List = ls lh.List = ls
@ -30,6 +36,13 @@ func (lh *ListHandler) GetItemByID(
return ls, info, lh.Err return ls, info, lh.Err
} }
func (lh *ListHandler) Check(t *testing.T, expected []string) {
slices.Sort(lh.ListIDs)
slices.Sort(expected)
assert.Equal(t, expected, lh.ListIDs, "expected calls")
}
type ListRestoreHandler struct { type ListRestoreHandler struct {
List models.Listable List models.Listable
Err error Err error

View File

@ -65,7 +65,8 @@ func ProduceBackupCollections(
creds.AzureTenantID, creds.AzureTenantID,
scope, scope,
su, su,
errs) errs,
counter)
if err != nil { if err != nil {
el.AddRecoverable(ctx, err) el.AddRecoverable(ctx, err)
continue continue

View File

@ -3,6 +3,7 @@ package api
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/h2non/gock" "github.com/h2non/gock"
@ -14,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/dttm"
graphTD "github.com/alcionai/corso/src/pkg/services/m365/api/graph/testdata" graphTD "github.com/alcionai/corso/src/pkg/services/m365/api/graph/testdata"
) )
@ -40,6 +42,7 @@ func (suite *ListsPagerIntgSuite) TestEnumerateLists_withAssociatedRelationships
ac = suite.its.gockAC.Lists() ac = suite.its.gockAC.Lists()
listID = "fake-list-id" listID = "fake-list-id"
listLastModifiedTime = time.Now()
siteID = suite.its.site.id siteID = suite.its.site.id
textColumnDefID = "fake-text-column-id" textColumnDefID = "fake-text-column-id"
textColumnDefName = "itemName" textColumnDefName = "itemName"
@ -60,7 +63,8 @@ func (suite *ListsPagerIntgSuite) TestEnumerateLists_withAssociatedRelationships
defer gock.Off() defer gock.Off()
suite.setStubListAndItsRelationShip(listID, suite.setStubListAndItsRelationShip(
listID,
siteID, siteID,
textColumnDefID, textColumnDefID,
textColumnDefName, textColumnDefName,
@ -69,11 +73,24 @@ func (suite *ListsPagerIntgSuite) TestEnumerateLists_withAssociatedRelationships
colLinkID, colLinkID,
cTypeID, cTypeID,
listItemID, listItemID,
listLastModifiedTime,
fieldsData) fieldsData)
lists, err := ac.GetLists(ctx, suite.its.site.id, CallConfig{}) lists, err := ac.GetLists(ctx, suite.its.site.id, CallConfig{})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(lists)) require.Equal(t, 1, len(lists))
assert.Equal(t, listID, ptr.Val(lists[0].GetId()))
expectedLmt, err := dttm.ExtractTime(listLastModifiedTime.String())
require.NoError(t, err)
actualLmt, err := dttm.ExtractTime(ptr.Val(lists[0].GetLastModifiedDateTime()).String())
require.NoError(t, err)
assert.Equal(
t,
expectedLmt,
actualLmt)
for _, list := range lists { for _, list := range lists {
suite.testEnumerateListItems(ctx, list, listItemID, fieldsData) suite.testEnumerateListItems(ctx, list, listItemID, fieldsData)
@ -242,10 +259,12 @@ func (suite *ListsPagerIntgSuite) setStubListAndItsRelationShip(
colLinkID, colLinkID,
cTypeID, cTypeID,
listItemID string, listItemID string,
listLastModifiedTime time.Time,
fieldsData map[string]any, fieldsData map[string]any,
) { ) {
list := models.NewList() list := models.NewList()
list.SetId(&listID) list.SetId(ptr.To(listID))
list.SetLastModifiedDateTime(ptr.To(listLastModifiedTime))
listCol := models.NewListCollectionResponse() listCol := models.NewListCollectionResponse()
listCol.SetValue([]models.Listable{list}) listCol.SetValue([]models.Listable{list})