From 13f549c6cbac5437417f4ec1e544055101c9b659 Mon Sep 17 00:00:00 2001 From: Hitesh Pattanayak <48874082+HiteshRepo@users.noreply.github.com> Date: Sat, 30 Dec 2023 20:38:29 +0530 Subject: [PATCH] lists prefetech (#4943) refactors sharepoint lists collection by: - renaming `Collection` interface as `preFetchCollection` interface - making `finishPopulation` method as independent function - seralizing list using `serializeContent` function #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Issue(s) #4754 #### Test Plan - [x] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/m365/collection/site/backup.go | 49 ++- .../m365/collection/site/backup_test.go | 3 +- .../m365/collection/site/collection.go | 404 +++++++++++++----- .../m365/collection/site/collection_test.go | 185 +++++++- .../m365/collection/site/mock/list.go | 17 +- .../m365/service/sharepoint/backup.go | 3 +- src/pkg/services/m365/api/lists_pager_test.go | 41 +- 7 files changed, 563 insertions(+), 139 deletions(-) diff --git a/src/internal/m365/collection/site/backup.go b/src/internal/m365/collection/site/backup.go index 3c6a61c27..a181a5d50 100644 --- a/src/internal/m365/collection/site/backup.go +++ b/src/internal/m365/collection/site/backup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" stdpath "path" + "time" "github.com/alcionai/clues" @@ -123,7 +124,7 @@ func CollectPages( el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "creating page collection path")) } - collection := NewCollection( + collection := NewPrefetchCollection( nil, dir, ac, @@ -131,7 +132,7 @@ func CollectPages( su, bpc.Options) collection.SetBetaService(betaService) - collection.AddItem(tuple.ID) + collection.AddItem(tuple.ID, time.Now()) spcs = append(spcs, collection) } @@ -148,16 +149,19 @@ func CollectLists( scope selectors.SharePointScope, su support.StatusUpdater, errs *fault.Bus, + counter *count.Bus, ) ([]data.BackupCollection, error) { logger.Ctx(ctx).Debug("Creating SharePoint List Collections") var ( - el = errs.Local() - spcs = make([]data.BackupCollection, 0) - acc = api.CallConfig{Select: idAnd("list")} + collection data.BackupCollection + el = errs.Local() + cl = counter.Local() + spcs = make([]data.BackupCollection, 0) + cfg = api.CallConfig{Select: idAnd("list", "lastModifiedDateTime")} ) - lists, err := bh.GetItems(ctx, acc) + lists, err := bh.GetItems(ctx, cfg) if err != nil { return nil, err } @@ -171,6 +175,8 @@ func CollectLists( continue } + modTime := ptr.Val(list.GetLastModifiedDateTime()) + dir, err := path.Build( tenantID, bpc.ProtectedResource.ID(), @@ -182,14 +188,35 @@ func CollectLists( el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "creating list collection path")) } - collection := NewCollection( + lazyFetchCol := NewLazyFetchCollection( bh, dir, - ac, - scope, su, - bpc.Options) - collection.AddItem(ptr.Val(list.GetId())) + cl) + + lazyFetchCol.AddItem( + ptr.Val(list.GetId()), + modTime) + + collection = lazyFetchCol + + // Always use lazyFetchCol. + // In case we receive zero mod time from graph fallback to prefetchCol. + if modTime.IsZero() { + prefetchCol := NewPrefetchCollection( + bh, + dir, + ac, + scope, + su, + bpc.Options) + + prefetchCol.AddItem( + ptr.Val(list.GetId()), + modTime) + + collection = prefetchCol + } spcs = append(spcs, collection) } diff --git a/src/internal/m365/collection/site/backup_test.go b/src/internal/m365/collection/site/backup_test.go index 4f7251b79..83989b8ed 100644 --- a/src/internal/m365/collection/site/backup_test.go +++ b/src/internal/m365/collection/site/backup_test.go @@ -121,7 +121,8 @@ func (suite *SharePointSuite) TestCollectLists() { creds.AzureTenantID, sel.Lists(selectors.Any())[0], (&MockGraphService{}).UpdateStatus, - fault.New(true)) + fault.New(true), + count.New()) require.NoError(t, err, clues.ToCore(err)) assert.Less(t, 0, len(col)) } diff --git a/src/internal/m365/collection/site/collection.go b/src/internal/m365/collection/site/collection.go index 505028048..b5e7474d3 100644 --- a/src/internal/m365/collection/site/collection.go +++ b/src/internal/m365/collection/site/collection.go @@ -4,12 +4,16 @@ import ( "bytes" "context" "io" + "net/http" "sync" + "sync/atomic" + "time" "github.com/alcionai/clues" "github.com/microsoft/kiota-abstractions-go/serialization" kjson "github.com/microsoft/kiota-serialization-json-go" "github.com/microsoftgraph/msgraph-sdk-go/models" + "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" @@ -18,6 +22,7 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -41,19 +46,24 @@ const ( Pages DataCategory = 2 ) -var _ data.BackupCollection = &Collection{} +var ( + _ data.BackupCollection = &prefetchCollection{} + _ data.BackupCollection = &lazyFetchCollection{} +) // Collection is the SharePoint.List or SharePoint.Page implementation of data.Collection. // SharePoint.Libraries collections are supported by the oneDrive.Collection // as the calls are identical for populating the Collection -type Collection struct { - // stream is the container for each individual SharePoint item of (page/list) - stream chan data.Item +type prefetchCollection struct { + // stream is a container for each individual SharePoint item (page/list) category, + // where the category type serves as the key, and the associated channel holds the items. + stream map[path.CategoryType]chan data.Item // fullPath indicates the hierarchy within the collection fullPath path.Path - // jobs contain the SharePoint.List.IDs or SharePoint.Page.IDs - items []string + // items contains the SharePoint.List.IDs or SharePoint.Page.IDs + // and their corresponding last modified time + items map[string]time.Time // M365 IDs of the items of this collection category path.CategoryType client api.Sites @@ -63,20 +73,20 @@ type Collection struct { getter getItemByIDer } -// NewCollection helper function for creating a Collection -func NewCollection( +// NewPrefetchCollection constructor function for creating a prefetchCollection +func NewPrefetchCollection( getter getItemByIDer, folderPath path.Path, ac api.Client, scope selectors.SharePointScope, statusUpdater support.StatusUpdater, ctrlOpts control.Options, -) *Collection { - c := &Collection{ +) *prefetchCollection { + c := &prefetchCollection{ fullPath: folderPath, - items: make([]string, 0), + items: make(map[string]time.Time), getter: getter, - stream: make(chan data.Item, collectionChannelBufferSize), + stream: make(map[path.CategoryType]chan data.Item), client: ac.Sites(), statusUpdater: statusUpdater, category: scope.Category().PathType(), @@ -86,95 +96,88 @@ func NewCollection( return c } -func (sc *Collection) SetBetaService(betaService *betaAPI.BetaService) { - sc.betaService = betaService +func (pc *prefetchCollection) SetBetaService(betaService *betaAPI.BetaService) { + pc.betaService = betaService } // AddItem appends additional itemID to items field -func (sc *Collection) AddItem(itemID string) { - sc.items = append(sc.items, itemID) +func (pc *prefetchCollection) AddItem(itemID string, lastModifedTime time.Time) { + pc.items[itemID] = lastModifedTime } -func (sc *Collection) FullPath() path.Path { - return sc.fullPath +func (pc *prefetchCollection) FullPath() path.Path { + return pc.fullPath } // TODO(ashmrtn): Fill in with previous path once the Controller compares old // and new folder hierarchies. -func (sc Collection) PreviousPath() path.Path { +func (pc prefetchCollection) PreviousPath() path.Path { return nil } -func (sc Collection) LocationPath() *path.Builder { - return path.Builder{}.Append(sc.fullPath.Folders()...) +func (pc prefetchCollection) LocationPath() *path.Builder { + return path.Builder{}.Append(pc.fullPath.Folders()...) } -func (sc Collection) State() data.CollectionState { +func (pc prefetchCollection) State() data.CollectionState { return data.NewState } -func (sc Collection) DoNotMergeItems() bool { +func (pc prefetchCollection) DoNotMergeItems() bool { return false } -func (sc *Collection) Items( +func (pc *prefetchCollection) Items( ctx context.Context, errs *fault.Bus, ) <-chan data.Item { - go sc.streamItems(ctx, errs) - return sc.stream -} - -func (sc *Collection) finishPopulation( - ctx context.Context, - metrics *support.CollectionMetrics, -) { - close(sc.stream) - - status := support.CreateStatus( - ctx, - support.Backup, - 1, // 1 folder - *metrics, - sc.fullPath.Folder(false)) - - logger.Ctx(ctx).Debug(status.String()) - - if sc.statusUpdater != nil { - sc.statusUpdater(status) + if _, ok := pc.stream[pc.category]; !ok { + pc.stream[pc.category] = make(chan data.Item, collectionChannelBufferSize) } + + go pc.streamItems(ctx, errs) + + return pc.stream[pc.category] } // streamItems utility function to retrieve data from back store for a given collection -func (sc *Collection) streamItems( +func (pc *prefetchCollection) streamItems( ctx context.Context, errs *fault.Bus, ) { // Switch retrieval function based on category - switch sc.category { + switch pc.category { case path.ListsCategory: - sc.streamLists(ctx, errs) + pc.streamLists(ctx, errs) case path.PagesCategory: - sc.retrievePages(ctx, sc.client, errs) + pc.streamPages(ctx, pc.client, errs) } } // streamLists utility function for collection that downloads and serializes // models.Listable objects based on M365 IDs from the jobs field. -func (sc *Collection) streamLists( +func (pc *prefetchCollection) streamLists( ctx context.Context, errs *fault.Bus, ) { var ( - metrics support.CollectionMetrics - el = errs.Local() - wg sync.WaitGroup + metrics support.CollectionMetrics + el = errs.Local() + wg sync.WaitGroup + objects int64 + objectBytes int64 + objectSuccesses int64 ) - defer sc.finishPopulation(ctx, &metrics) + defer updateStatus( + ctx, + pc.stream[path.ListsCategory], + pc.statusUpdater, + pc.fullPath, + &metrics) // TODO: Insert correct ID for CollectionProgress - progress := observe.CollectionProgress(ctx, sc.fullPath.Category().HumanString(), sc.fullPath.Folders()) + progress := observe.CollectionProgress(ctx, pc.fullPath.Category().HumanString(), pc.fullPath.Folders()) defer close(progress) semaphoreCh := make(chan struct{}, fetchChannelSize) @@ -182,7 +185,7 @@ func (sc *Collection) streamLists( // For each models.Listable, object is serialized and the metrics are collected. // The progress is objected via the passed in channel. - for _, listID := range sc.items { + for listID := range pc.items { if el.Failure() != nil { break } @@ -190,15 +193,26 @@ func (sc *Collection) streamLists( wg.Add(1) semaphoreCh <- struct{}{} - sc.handleListItems(ctx, semaphoreCh, progress, listID, el, &metrics) - - wg.Done() + go pc.handleListItems( + ctx, + semaphoreCh, + progress, + &wg, + listID, + &objects, + &objectBytes, + &objectSuccesses, + el) } wg.Wait() + + metrics.Objects = int(objects) + metrics.Bytes = objectBytes + metrics.Successes = int(objectSuccesses) } -func (sc *Collection) retrievePages( +func (pc *prefetchCollection) streamPages( ctx context.Context, as api.Sites, errs *fault.Bus, @@ -208,22 +222,24 @@ func (sc *Collection) retrievePages( el = errs.Local() ) - defer sc.finishPopulation(ctx, &metrics) + defer updateStatus( + ctx, + pc.stream[path.PagesCategory], + pc.statusUpdater, + pc.fullPath, + &metrics) // TODO: Insert correct ID for CollectionProgress - progress := observe.CollectionProgress(ctx, sc.fullPath.Category().HumanString(), sc.fullPath.Folders()) + progress := observe.CollectionProgress(ctx, pc.fullPath.Category().HumanString(), pc.fullPath.Folders()) defer close(progress) - wtr := kjson.NewJsonSerializationWriter() - defer wtr.Close() - - betaService := sc.betaService + betaService := pc.betaService if betaService == nil { logger.Ctx(ctx).Error(clues.New("beta service required")) return } - parent, err := as.GetByID(ctx, sc.fullPath.ProtectedResource(), api.CallConfig{}) + parent, err := as.GetByID(ctx, pc.fullPath.ProtectedResource(), api.CallConfig{}) if err != nil { logger.Ctx(ctx).Error(err) @@ -232,7 +248,9 @@ func (sc *Collection) retrievePages( root := ptr.Val(parent.GetWebUrl()) - pages, err := betaAPI.GetSitePages(ctx, betaService, sc.fullPath.ProtectedResource(), sc.items, errs) + pageIDs := maps.Keys(pc.items) + + pages, err := betaAPI.GetSitePages(ctx, betaService, pc.fullPath.ProtectedResource(), pageIDs, errs) if err != nil { logger.Ctx(ctx).Error(err) @@ -248,7 +266,7 @@ func (sc *Collection) retrievePages( break } - byteArray, err := serializeContent(ctx, wtr, pg) + byteArray, err := serializeContent(ctx, pg) if err != nil { el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "serializing page").Label(fault.LabelForceNoBackupCreation)) continue @@ -272,51 +290,32 @@ func (sc *Collection) retrievePages( continue } - sc.stream <- item + pc.stream[path.PagesCategory] <- item progress <- struct{}{} } } -func serializeContent( - ctx context.Context, - writer *kjson.JsonSerializationWriter, - obj serialization.Parsable, -) ([]byte, error) { - defer writer.Close() - - err := writer.WriteObjectValue("", obj) - if err != nil { - return nil, graph.Wrap(ctx, err, "writing object") - } - - byteArray, err := writer.GetSerializedContent() - if err != nil { - return nil, graph.Wrap(ctx, err, "getting content from writer") - } - - return byteArray, nil -} - -func (sc *Collection) handleListItems( +func (pc *prefetchCollection) handleListItems( ctx context.Context, semaphoreCh chan struct{}, progress chan<- struct{}, + wg *sync.WaitGroup, listID string, + objects *int64, + objectBytes *int64, + objectSuccesses *int64, el *fault.Bus, - metrics *support.CollectionMetrics, ) { + defer wg.Done() defer func() { <-semaphoreCh }() - writer := kjson.NewJsonSerializationWriter() - defer writer.Close() - var ( list models.Listable info *details.SharePointInfo err error ) - list, info, err = sc.getter.GetItemByID(ctx, listID) + list, info, err = pc.getter.GetItemByID(ctx, listID) if err != nil { err = clues.WrapWC(ctx, err, "getting list data").Label(fault.LabelForceNoBackupCreation) el.AddRecoverable(ctx, err) @@ -324,20 +323,11 @@ func (sc *Collection) handleListItems( return } - metrics.Objects++ + atomic.AddInt64(objects, 1) - if err := writer.WriteObjectValue("", list); err != nil { - err = clues.WrapWC(ctx, err, "writing list to serializer").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) - - return - } - - entryBytes, err := writer.GetSerializedContent() + entryBytes, err := serializeContent(ctx, list) if err != nil { - err = clues.WrapWC(ctx, err, "serializing list").Label(fault.LabelForceNoBackupCreation) el.AddRecoverable(ctx, err) - return } @@ -347,8 +337,8 @@ func (sc *Collection) handleListItems( return } - metrics.Bytes += size - metrics.Successes++ + atomic.AddInt64(objectBytes, size) + atomic.AddInt64(objectSuccesses, 1) rc := io.NopCloser(bytes.NewReader(entryBytes)) itemInfo := details.ItemInfo{ @@ -363,6 +353,200 @@ func (sc *Collection) handleListItems( return } - sc.stream <- item + pc.stream[path.ListsCategory] <- item progress <- struct{}{} } + +type lazyFetchCollection struct { + // stream is the container for each individual SharePoint item of list + stream chan data.Item + // fullPath indicates the hierarchy within the collection + fullPath path.Path + // jobs contain the SharePoint.List.IDs and their last modified time + items map[string]time.Time + statusUpdater support.StatusUpdater + getter getItemByIDer + counter *count.Bus +} + +func NewLazyFetchCollection( + getter getItemByIDer, + folderPath path.Path, + statusUpdater support.StatusUpdater, + counter *count.Bus, +) *lazyFetchCollection { + c := &lazyFetchCollection{ + fullPath: folderPath, + items: make(map[string]time.Time), + getter: getter, + stream: make(chan data.Item, collectionChannelBufferSize), + statusUpdater: statusUpdater, + counter: counter, + } + + return c +} + +func (lc *lazyFetchCollection) AddItem(itemID string, lastModifiedTime time.Time) { + lc.items[itemID] = lastModifiedTime + lc.counter.Add(count.ItemsAdded, 1) +} + +func (lc *lazyFetchCollection) FullPath() path.Path { + return lc.fullPath +} + +func (lc lazyFetchCollection) LocationPath() *path.Builder { + return path.Builder{}.Append(lc.fullPath.Folders()...) +} + +// TODO(hitesh): Implement PreviousPath, State, DoNotMergeItems +// once the Controller compares old and new folder hierarchies. +func (lc lazyFetchCollection) PreviousPath() path.Path { + return nil +} + +func (lc lazyFetchCollection) State() data.CollectionState { + return data.NewState +} + +func (lc lazyFetchCollection) DoNotMergeItems() bool { + return false +} + +func (lc lazyFetchCollection) Items( + ctx context.Context, + errs *fault.Bus, +) <-chan data.Item { + go lc.streamItems(ctx, errs) + return lc.stream +} + +func (lc *lazyFetchCollection) streamItems( + ctx context.Context, + errs *fault.Bus, +) { + var ( + metrics support.CollectionMetrics + el = errs.Local() + numLists int64 + ) + + defer updateStatus( + ctx, + lc.stream, + lc.statusUpdater, + lc.fullPath, + &metrics) + + progress := observe.CollectionProgress(ctx, lc.fullPath.Category().HumanString(), lc.fullPath.Folders()) + defer close(progress) + + for listID, modTime := range lc.items { + if el.Failure() != nil { + break + } + + lc.stream <- data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter{ + itemID: listID, + getter: lc.getter, + modTime: modTime, + }, + listID, + modTime, + lc.counter, + el) + + metrics.Successes++ + + progress <- struct{}{} + } + + metrics.Objects += int(numLists) +} + +type lazyItemGetter struct { + getter getItemByIDer + itemID string + modTime time.Time +} + +func (lig *lazyItemGetter) GetData( + ctx context.Context, + el *fault.Bus, +) (io.ReadCloser, *details.ItemInfo, bool, error) { + list, info, err := lig.getter.GetItemByID(ctx, lig.itemID) + if err != nil { + if clues.HasLabel(err, graph.LabelStatus(http.StatusNotFound)) || graph.IsErrDeletedInFlight(err) { + logger.CtxErr(ctx, err).Info("item deleted in flight. skipping") + + // Returning delInFlight as true here for correctness, although the caller is going + // to ignore it since we are returning an error. + return nil, nil, true, clues.Wrap(err, "deleted item").Label(graph.LabelsSkippable) + } + + err = clues.WrapWC(ctx, err, "getting list data").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + entryBytes, err := serializeContent(ctx, list) + if err != nil { + el.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + info.Modified = lig.modTime + + return io.NopCloser(bytes.NewReader(entryBytes)), + &details.ItemInfo{SharePoint: info}, + false, + nil +} + +func serializeContent( + ctx context.Context, + obj serialization.Parsable, +) ([]byte, error) { + writer := kjson.NewJsonSerializationWriter() + defer writer.Close() + + err := writer.WriteObjectValue("", obj) + if err != nil { + return nil, graph.Wrap(ctx, err, "writing to serializer").Label(fault.LabelForceNoBackupCreation) + } + + byteArray, err := writer.GetSerializedContent() + if err != nil { + return nil, graph.Wrap(ctx, err, "getting content from writer").Label(fault.LabelForceNoBackupCreation) + } + + return byteArray, nil +} + +func updateStatus( + ctx context.Context, + stream chan data.Item, + su support.StatusUpdater, + fullPath path.Path, + metrics *support.CollectionMetrics, +) { + close(stream) + + status := support.CreateStatus( + ctx, + support.Backup, + 1, // 1 folder + *metrics, + fullPath.Folder(false)) + + logger.Ctx(ctx).Debug(status.String()) + + if su != nil { + su(status) + } +} diff --git a/src/internal/m365/collection/site/collection_test.go b/src/internal/m365/collection/site/collection_test.go index 547503a6a..2748e1eb6 100644 --- a/src/internal/m365/collection/site/collection_test.go +++ b/src/internal/m365/collection/site/collection_test.go @@ -3,7 +3,9 @@ package site import ( "bytes" "io" + "slices" "testing" + "time" "github.com/alcionai/clues" kioser "github.com/microsoft/kiota-serialization-json-go" @@ -11,10 +13,12 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/site/mock" betaAPI "github.com/alcionai/corso/src/internal/m365/service/sharepoint/api" spMock "github.com/alcionai/corso/src/internal/m365/service/sharepoint/mock" + "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" @@ -25,6 +29,7 @@ import ( "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" + "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) type SharePointCollectionSuite struct { @@ -63,7 +68,7 @@ func TestSharePointCollectionSuite(t *testing.T) { // TestListCollection tests basic functionality to create // SharePoint collection and to use the data stream channel. -func (suite *SharePointCollectionSuite) TestCollection_Items() { +func (suite *SharePointCollectionSuite) TestPrefetchCollection_Items() { var ( tenant = "some" user = "user" @@ -74,6 +79,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { tables := []struct { name, itemName string + cat path.CategoryType scope selectors.SharePointScope getter getItemByIDer getDir func(t *testing.T) path.Path @@ -82,6 +88,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { { name: "List", itemName: "MockListing", + cat: path.ListsCategory, scope: sel.Lists(selectors.Any())[0], getter: &mock.ListHandler{}, getDir: func(t *testing.T) path.Path { @@ -123,6 +130,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { { name: "Pages", itemName: "MockPages", + cat: path.PagesCategory, scope: sel.Pages(selectors.Any())[0], getter: nil, getDir: func(t *testing.T) path.Path { @@ -160,14 +168,15 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { ctx, flush := tester.NewContext(t) defer flush() - col := NewCollection( + col := NewPrefetchCollection( test.getter, test.getDir(t), suite.ac, test.scope, nil, control.DefaultOptions()) - col.stream <- test.getItem(t, test.itemName) + col.stream[test.cat] = make(chan data.Item, collectionChannelBufferSize) + col.stream[test.cat] <- test.getItem(t, test.itemName) readItems := []data.Item{} @@ -189,3 +198,173 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { }) } } + +func (suite *SharePointCollectionSuite) TestLazyCollection_Items() { + var ( + t = suite.T() + errs = fault.New(true) + start = time.Now().Add(-time.Second) + statusUpdater = func(*support.ControllerOperationStatus) {} + ) + + fullPath, err := path.Build( + "t", "pr", path.SharePointService, path.ListsCategory, false, "listid") + require.NoError(t, err, clues.ToCore(err)) + + tables := []struct { + name string + items map[string]time.Time + expectItemCount int + expectReads []string + }{ + { + name: "no lists", + }, + { + name: "added lists", + items: map[string]time.Time{ + "list1": start.Add(time.Minute), + "list2": start.Add(2 * time.Minute), + "list3": start.Add(3 * time.Minute), + }, + expectItemCount: 3, + expectReads: []string{ + "list1", + "list2", + "list3", + }, + }, + } + + for _, test := range tables { + suite.Run(test.name, func() { + itemCount := 0 + + ctx, flush := tester.NewContext(t) + defer flush() + + getter := &mock.ListHandler{} + defer getter.Check(t, test.expectReads) + + col := &lazyFetchCollection{ + stream: make(chan data.Item), + fullPath: fullPath, + items: test.items, + getter: getter, + statusUpdater: statusUpdater, + } + + for item := range col.Items(ctx, errs) { + itemCount++ + + modTime, aok := test.items[item.ID()] + require.True(t, aok, "item must have been added: %q", item.ID()) + assert.Implements(t, (*data.ItemModTime)(nil), item) + assert.Equal(t, modTime, item.(data.ItemModTime).ModTime(), "item mod time") + + if slices.Contains(test.expectReads, item.ID()) { + r := item.ToReader() + + _, err := io.ReadAll(r) + assert.NoError(t, err, clues.ToCore(err)) + + r.Close() + + assert.Implements(t, (*data.ItemInfo)(nil), item) + info, err := item.(data.ItemInfo).Info() + + assert.NoError(t, err, clues.ToCore(err)) + assert.Equal(t, modTime, info.Modified(), "ItemInfo mod time") + } + } + + assert.NoError(t, errs.Failure()) + assert.Equal( + t, + test.expectItemCount, + itemCount, + "should see all expected items") + }) + } +} + +func (suite *SharePointCollectionSuite) TestLazyItem() { + var ( + t = suite.T() + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + lh := mock.ListHandler{} + + li := data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter{ + itemID: "itemID", + getter: &lh, + modTime: now, + }, + "itemID", + now, + count.New(), + fault.New(true)) + + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + require.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + assert.NotEmpty(t, readData, "read item data") + + info, err := li.Info() + require.NoError(t, err, "getting item info: %v", clues.ToCore(err)) + assert.Equal(t, now, info.Modified()) +} + +func (suite *SharePointCollectionSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() { + var ( + t = suite.T() + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + lh := mock.ListHandler{ + Err: graph.ErrDeletedInFlight, + } + + li := data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter{ + itemID: "itemID", + getter: &lh, + modTime: now, + }, + "itemID", + now, + count.New(), + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + assert.ErrorIs(t, err, graph.ErrDeletedInFlight, "item should be marked deleted in flight") + assert.Nil(t, r) +} diff --git a/src/internal/m365/collection/site/mock/list.go b/src/internal/m365/collection/site/mock/list.go index 0f66057fd..f75a4a406 100644 --- a/src/internal/m365/collection/site/mock/list.go +++ b/src/internal/m365/collection/site/mock/list.go @@ -2,22 +2,28 @@ package mock import ( "context" + "slices" + "testing" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/stretchr/testify/assert" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/pkg/backup/details" ) type ListHandler struct { - List models.Listable - Err error + List models.Listable + ListIDs []string + Err error } func (lh *ListHandler) GetItemByID( ctx context.Context, itemID string, ) (models.Listable, *details.SharePointInfo, error) { + lh.ListIDs = append(lh.ListIDs, itemID) + ls := models.NewList() lh.List = ls @@ -30,6 +36,13 @@ func (lh *ListHandler) GetItemByID( return ls, info, lh.Err } +func (lh *ListHandler) Check(t *testing.T, expected []string) { + slices.Sort(lh.ListIDs) + slices.Sort(expected) + + assert.Equal(t, expected, lh.ListIDs, "expected calls") +} + type ListRestoreHandler struct { List models.Listable Err error diff --git a/src/internal/m365/service/sharepoint/backup.go b/src/internal/m365/service/sharepoint/backup.go index d4f82c0bf..b3733c237 100644 --- a/src/internal/m365/service/sharepoint/backup.go +++ b/src/internal/m365/service/sharepoint/backup.go @@ -65,7 +65,8 @@ func ProduceBackupCollections( creds.AzureTenantID, scope, su, - errs) + errs, + counter) if err != nil { el.AddRecoverable(ctx, err) continue diff --git a/src/pkg/services/m365/api/lists_pager_test.go b/src/pkg/services/m365/api/lists_pager_test.go index f7d7fa9dd..573c001ec 100644 --- a/src/pkg/services/m365/api/lists_pager_test.go +++ b/src/pkg/services/m365/api/lists_pager_test.go @@ -3,6 +3,7 @@ package api import ( "context" "testing" + "time" "github.com/alcionai/clues" "github.com/h2non/gock" @@ -14,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/dttm" graphTD "github.com/alcionai/corso/src/pkg/services/m365/api/graph/testdata" ) @@ -39,15 +41,16 @@ func (suite *ListsPagerIntgSuite) TestEnumerateLists_withAssociatedRelationships t = suite.T() ac = suite.its.gockAC.Lists() - listID = "fake-list-id" - siteID = suite.its.site.id - textColumnDefID = "fake-text-column-id" - textColumnDefName = "itemName" - numColumnDefID = "fake-num-column-id" - numColumnDefName = "itemSize" - colLinkID = "fake-collink-id" - cTypeID = "fake-ctype-id" - listItemID = "fake-list-item-id" + listID = "fake-list-id" + listLastModifiedTime = time.Now() + siteID = suite.its.site.id + textColumnDefID = "fake-text-column-id" + textColumnDefName = "itemName" + numColumnDefID = "fake-num-column-id" + numColumnDefName = "itemSize" + colLinkID = "fake-collink-id" + cTypeID = "fake-ctype-id" + listItemID = "fake-list-item-id" fieldsData = map[string]any{ "itemName": "item1", @@ -60,7 +63,8 @@ func (suite *ListsPagerIntgSuite) TestEnumerateLists_withAssociatedRelationships defer gock.Off() - suite.setStubListAndItsRelationShip(listID, + suite.setStubListAndItsRelationShip( + listID, siteID, textColumnDefID, textColumnDefName, @@ -69,11 +73,24 @@ func (suite *ListsPagerIntgSuite) TestEnumerateLists_withAssociatedRelationships colLinkID, cTypeID, listItemID, + listLastModifiedTime, fieldsData) lists, err := ac.GetLists(ctx, suite.its.site.id, CallConfig{}) require.NoError(t, err) require.Equal(t, 1, len(lists)) + assert.Equal(t, listID, ptr.Val(lists[0].GetId())) + + expectedLmt, err := dttm.ExtractTime(listLastModifiedTime.String()) + require.NoError(t, err) + + actualLmt, err := dttm.ExtractTime(ptr.Val(lists[0].GetLastModifiedDateTime()).String()) + require.NoError(t, err) + + assert.Equal( + t, + expectedLmt, + actualLmt) for _, list := range lists { suite.testEnumerateListItems(ctx, list, listItemID, fieldsData) @@ -242,10 +259,12 @@ func (suite *ListsPagerIntgSuite) setStubListAndItsRelationShip( colLinkID, cTypeID, listItemID string, + listLastModifiedTime time.Time, fieldsData map[string]any, ) { list := models.NewList() - list.SetId(&listID) + list.SetId(ptr.To(listID)) + list.SetLastModifiedDateTime(ptr.To(listLastModifiedTime)) listCol := models.NewListCollectionResponse() listCol.SetValue([]models.Listable{list})