From 152d78de21852d8a884e9add1c3a2c84cc658f95 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Fri, 26 Jan 2024 18:55:47 -0800 Subject: [PATCH] Refactor metadata read code --- .../m365/collection/groups/collection.go | 115 ++++++++++++++---- 1 file changed, 91 insertions(+), 24 deletions(-) diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index 3f622ac92..e3020c2aa 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -179,7 +179,16 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul defer wg.Done() defer func() { <-semaphoreCh }() - col.stream <- data.NewDeletedItem(id) + // This is a no-op for conversations, as there is no way to detect + // deleted items in a conversation. It might be added in the future + // if graph supports it, so make sure we put up both .data and .meta + // files for deletions. + if col.getAndAugment.supportsItemMetadata() { + col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix) + col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix) + } else { + col.stream <- data.NewDeletedItem(id) + } atomic.AddInt64(&streamedItems, 1) col.Counter.Inc(count.StreamItemsRemoved) @@ -203,6 +212,18 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul defer wg.Done() defer func() { <-semaphoreCh }() + ictx := clues.Add( + ctx, + "item_id", id, + "parent_path", path.LoggableDir(col.LocationPath().String())) + + // Conversation posts carry a .data suffix, while channel messages + // don't have any suffix. Metadata files are only supported for conversations. + dataFile := id + if col.getAndAugment.supportsItemMetadata() { + dataFile += metadata.DataFileSuffix + } + writer := kjson.NewJsonSerializationWriter() defer writer.Close() @@ -237,24 +258,48 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul info.ParentPath = col.LocationPath().String() - storeItem, err := data.NewPrefetchedItemWithInfo( + dataItem, err := data.NewPrefetchedItemWithInfo( io.NopCloser(bytes.NewReader(itemData)), - id, + dataFile, details.ItemInfo{Groups: info}) if err != nil { - err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) + err := clues.StackWC(ictx, err).Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ictx, err) return } - col.stream <- storeItem + // Handle metadata before data so that if metadata file fails, + // we are not left with an orphaned data file. No op for channel + // messages. + if col.getAndAugment.supportsItemMetadata() { + metaFile := id + metadata.MetaFileSuffix + + // Use modTime from item info as it's the latest. + metaItem, err := downloadItemMeta[C, I]( + ictx, + col.getAndAugment, + col.contains, + metaFile, + info.Modified, + el) + if err != nil { + return + } + + col.stream <- metaItem + } + + // Add data file to collection only if the metadata file(if any) was + // successfully added. This also helps to maintain consistency between + // prefetch & lazy fetch collection behaviors. + col.stream <- dataItem atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&totalBytes, info.Size) if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { - logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values()) + logger.Ctx(ictx).Infow("item stream progress", "stats", col.Counter.Values()) } col.Counter.Add(count.StreamBytesAdded, info.Size) @@ -401,29 +446,18 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau dataFile += metadata.DataFileSuffix metaFile := id + metadata.MetaFileSuffix - itemMeta, _, err := col.getAndAugment.getItemMetadata( + metaItem, err := downloadItemMeta[C, I]( ictx, - col.contains.container) - if err != nil { - errs.AddRecoverable(ctx, clues.StackWC(ctx, err)) - - return - } - - // Skip adding progress reader for metadata files. It doesn't add - // much value. - storeItem, err := data.NewPrefetchedItem( - itemMeta, + col.getAndAugment, + col.contains, metaFile, - // Use the same last modified time as post's. - modTime) + modTime, + el) if err != nil { - errs.AddRecoverable(ctx, clues.StackWC(ctx, err)) - return } - col.stream <- storeItem + col.stream <- metaItem } col.stream <- data.NewLazyItemWithInfo( @@ -519,3 +553,36 @@ func (lig *lazyItemGetter[C, I]) GetData( false, nil } + +func downloadItemMeta[C graph.GetIDer, I groupsItemer]( + ctx context.Context, + gim getItemMetadataer[C, I], + c container[C], + metaFile string, + modTime time.Time, + errs *fault.Bus, +) (data.Item, error) { + itemMeta, _, err := gim.getItemMetadata( + ctx, + c.container) + if err != nil { + errs.AddRecoverable(ctx, clues.StackWC(ctx, err)) + + return nil, err + } + + // Skip adding progress reader for metadata files. It doesn't add + // much value. + storeItem, err := data.NewPrefetchedItem( + itemMeta, + metaFile, + // Use the same last modified time as post's. + modTime) + if err != nil { + errs.AddRecoverable(ctx, clues.StackWC(ctx, err)) + + return nil, err + } + + return storeItem, nil +}