diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index f5f283ac3..25e1d613a 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -4,7 +4,6 @@ import ( "context" "github.com/alcionai/clues" - "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/common/ptr" @@ -174,7 +173,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer]( continue } - added := str.SliceToMap(maps.Keys(addAndRem.Added)) + added := addAndRem.Added removed := str.SliceToMap(addAndRem.Removed) cl.Add(count.ItemsAdded, int64(len(added))) diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index a3a72479d..e4ae4cb60 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -3,9 +3,11 @@ package groups import ( "bytes" "context" + "fmt" "io" "sync" "sync/atomic" + "time" "github.com/alcionai/clues" kjson "github.com/microsoft/kiota-serialization-json-go" @@ -17,6 +19,7 @@ import ( "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) @@ -35,7 +38,7 @@ type Collection[C graph.GetIDer, I groupsItemer] struct { contains container[C] // added is a list of existing item IDs that were added to a container - added map[string]struct{} + added map[string]time.Time // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} @@ -54,7 +57,7 @@ func NewCollection[C graph.GetIDer, I groupsItemer]( baseCol data.BaseCollection, getAndAugment getItemAndAugmentInfoer[C, I], protectedResource string, - added map[string]struct{}, + added map[string]time.Time, removed map[string]struct{}, contains container[C], statusUpdater support.StatusUpdater, @@ -135,81 +138,97 @@ func (col *Collection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { } // add any new items - for id := range col.added { + for id, modTime := range col.added { if el.Failure() != nil { break } - wg.Add(1) - semaphoreCh <- struct{}{} + col.stream <- data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter[C, I]{ + modTime: modTime, + getAndAugment: col.getAndAugment, + userID: col.protectedResource, + itemID: id, + containerIDs: col.FullPath().Folders(), + contains: col.contains, + parentPath: col.LocationPath().String(), + }, + id, + modTime, + col.Counter, + el) - go func(id string) { - defer wg.Done() - defer func() { <-semaphoreCh }() + // wg.Add(1) + // semaphoreCh <- struct{}{} - writer := kjson.NewJsonSerializationWriter() - defer writer.Close() + // go func(id string) { + // defer wg.Done() + // defer func() { <-semaphoreCh }() - item, info, err := col.getAndAugment.getItem( - ctx, - col.protectedResource, - col.FullPath().Folders(), - id) - if err != nil { - err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) + // writer := kjson.NewJsonSerializationWriter() + // defer writer.Close() - return - } + // item, info, err := col.getAndAugment.getItem( + // ctx, + // col.protectedResource, + // col.FullPath().Folders(), + // id) + // if err != nil { + // err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation) + // el.AddRecoverable(ctx, err) - col.getAndAugment.augmentItemInfo(info, col.contains.container) + // return + // } - if err := writer.WriteObjectValue("", item); err != nil { - err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) + // col.getAndAugment.augmentItemInfo(info, col.contains.container) - return - } + // if err := writer.WriteObjectValue("", item); err != nil { + // err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation) + // el.AddRecoverable(ctx, err) - itemData, err := writer.GetSerializedContent() - if err != nil { - err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) + // return + // } - return - } + // itemData, err := writer.GetSerializedContent() + // if err != nil { + // err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation) + // el.AddRecoverable(ctx, err) - info.ParentPath = col.LocationPath().String() + // return + // } - storeItem, err := data.NewPrefetchedItemWithInfo( - io.NopCloser(bytes.NewReader(itemData)), - id, - details.ItemInfo{Groups: info}) - if err != nil { - err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) + // info.ParentPath = col.LocationPath().String() - return - } + // storeItem, err := data.NewPrefetchedItemWithInfo( + // io.NopCloser(bytes.NewReader(itemData)), + // id, + // details.ItemInfo{Groups: info}) + // if err != nil { + // err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) + // el.AddRecoverable(ctx, err) - col.stream <- storeItem + // return + // } - atomic.AddInt64(&streamedItems, 1) - atomic.AddInt64(&totalBytes, info.Size) + // col.stream <- storeItem - if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { - logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values()) - } + // atomic.AddInt64(&streamedItems, 1) + // atomic.AddInt64(&totalBytes, info.Size) - col.Counter.Add(count.StreamBytesAdded, info.Size) + // if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { + // logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values()) + // } - if colProgress != nil { - colProgress <- struct{}{} - } - }(id) + // col.Counter.Add(count.StreamBytesAdded, info.Size) + + if colProgress != nil { + colProgress <- struct{}{} + } + // }(id) } - wg.Wait() + // wg.Wait() } // finishPopulation is a utility function used to close a Collection's data channel @@ -237,3 +256,72 @@ func (col *Collection[C, I]) finishPopulation( col.statusUpdater(status) } + +type lazyItemGetter[C graph.GetIDer, I groupsItemer] struct { + getAndAugment getItemAndAugmentInfoer[C, I] + userID string + itemID string + containerIDs path.Elements + parentPath string + modTime time.Time + contains container[C] +} + +func (lig *lazyItemGetter[C, I]) GetData( + ctx context.Context, + errs *fault.Bus, +) (io.ReadCloser, *details.ItemInfo, bool, error) { + writer := kjson.NewJsonSerializationWriter() + defer writer.Close() + + fmt.Println("getdata itemID: ", lig.itemID) + item, info, err := lig.getAndAugment.getItem( + ctx, + lig.userID, + lig.containerIDs, + lig.itemID) + if err != nil { + // If an item was deleted then return an empty file so we don't fail + // the backup and return a sentinel error when asked for ItemInfo so + // we don't display the item in the backup. + // + // The item will be deleted from kopia on the next backup when the + // delta token shows it's removed. + if graph.IsErrDeletedInFlight(err) { + logger.CtxErr(ctx, err).Info("item not found") + return nil, nil, true, nil + } + + err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation) + errs.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + lig.getAndAugment.augmentItemInfo(info, lig.contains.container) + + if err := writer.WriteObjectValue("", item); err != nil { + err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation) + errs.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + itemData, err := writer.GetSerializedContent() + if err != nil { + err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation) + errs.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + info.ParentPath = lig.parentPath + // Update the mod time to what we already told kopia about. This is required + // for proper details merging. + info.Modified = lig.modTime + + return io.NopCloser(bytes.NewReader(itemData)), + &details.ItemInfo{Groups: info}, + false, + nil +}