diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index 764182901..8c77abae3 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -723,16 +723,25 @@ func addMergeLocation(col data.BackupCollection, toMerge *mergeDetails) error { return nil } +type pathUpdate struct { + p path.Path + state data.CollectionState +} + func inflateCollectionTree( ctx context.Context, collections []data.BackupCollection, toMerge *mergeDetails, -) (map[string]*treeMap, map[string]path.Path, error) { +) (map[string]*treeMap, map[string]pathUpdate, error) { + // failed is temporary and just allows us to log all conflicts before + // returning an error. + var firstErr error + roots := make(map[string]*treeMap) // Contains the old path for collections that are not new. // Allows resolving what the new path should be when walking the base // snapshot(s)'s hierarchy. Nil represents a collection that was deleted. - updatedPaths := make(map[string]path.Path) + updatedPaths := make(map[string]pathUpdate) // Temporary variable just to track the things that have been marked as // changed while keeping a reference to their path. changedPaths := []path.Path{} @@ -752,24 +761,39 @@ func inflateCollectionTree( changedPaths = append(changedPaths, s.PreviousPath()) - if _, ok := updatedPaths[s.PreviousPath().String()]; ok { - return nil, nil, clues.New("multiple previous state changes to collection"). - WithClues(ictx) + if p, ok := updatedPaths[s.PreviousPath().String()]; ok { + err := clues.New("multiple previous state changes"). + WithClues(ictx). + With("updated_path", p, "current_state", data.DeletedState) + logger.CtxErr(ictx, err).Error("previous path state collision") + + if firstErr == nil { + firstErr = err + } } - updatedPaths[s.PreviousPath().String()] = nil + updatedPaths[s.PreviousPath().String()] = pathUpdate{state: data.DeletedState} continue case data.MovedState: changedPaths = append(changedPaths, s.PreviousPath()) - if _, ok := updatedPaths[s.PreviousPath().String()]; ok { - return nil, nil, clues.New("multiple previous state changes to collection"). - WithClues(ictx) + if p, ok := updatedPaths[s.PreviousPath().String()]; ok { + err := clues.New("multiple previous state changes"). + WithClues(ictx). + With("updated_path", p, "current_state", data.MovedState) + logger.CtxErr(ictx, err).Error("previous path state collision") + + if firstErr == nil { + firstErr = err + } } - updatedPaths[s.PreviousPath().String()] = s.FullPath() + updatedPaths[s.PreviousPath().String()] = pathUpdate{ + p: s.FullPath(), + state: data.MovedState, + } // Only safe when collections are moved since we only need prefix matching // if a nested folder's path changed in some way that didn't generate a @@ -780,14 +804,24 @@ func inflateCollectionTree( return nil, nil, clues.Wrap(err, "adding merge location"). WithClues(ictx) } + case data.NotMovedState: p := s.PreviousPath().String() - if _, ok := updatedPaths[p]; ok { - return nil, nil, clues.New("multiple previous state changes to collection"). - WithClues(ictx) + if p, ok := updatedPaths[p]; ok { + err := clues.New("multiple previous state changes"). + WithClues(ictx). + With("updated_path", p, "current_state", data.NotMovedState) + logger.CtxErr(ictx, err).Error("previous path state collision") + + if firstErr == nil { + firstErr = err + } } - updatedPaths[p] = s.FullPath() + updatedPaths[p] = pathUpdate{ + p: s.FullPath(), + state: data.NotMovedState, + } } if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 { @@ -821,18 +855,22 @@ func inflateCollectionTree( } if node.collection != nil && node.collection.State() == data.NotMovedState { - return nil, nil, clues.New("conflicting states for collection"). - WithClues(ctx). - With("changed_path", p) + err := clues.New("conflicting states for collection"). + WithClues(ctx) + logger.CtxErr(ctx, err).Error("adding node to tree") + + if firstErr == nil { + firstErr = err + } } } - return roots, updatedPaths, nil + return roots, updatedPaths, clues.Stack(firstErr).OrNil() } func subtreeChanged( roots map[string]*treeMap, - updatedPaths map[string]path.Path, + updatedPaths map[string]pathUpdate, oldDirPath *path.Builder, currentPath *path.Builder, ) bool { @@ -889,7 +927,7 @@ func subtreeChanged( func traverseBaseDir( ctx context.Context, depth int, - updatedPaths map[string]path.Path, + updatedPaths map[string]pathUpdate, oldDirPath *path.Builder, expectedDirPath *path.Builder, dir fs.Directory, @@ -934,14 +972,14 @@ func traverseBaseDir( if upb, ok := updatedPaths[oldDirPath.String()]; ok { // This directory was deleted. - if upb == nil { + if upb.p == nil { currentPath = nil stats.Inc(statDel) } else { // This directory was explicitly mentioned and the new (possibly // unchanged) location is in upb. - currentPath = upb.ToBuilder() + currentPath = upb.p.ToBuilder() // Below we check if the collection was marked as new or DoNotMerge which // disables merging behavior. That means we can't directly update stats @@ -1087,7 +1125,7 @@ func inflateBaseTree( ctx context.Context, loader snapshotLoader, base BackupBase, - updatedPaths map[string]path.Path, + updatedPaths map[string]pathUpdate, roots map[string]*treeMap, ) error { bupID := "no_backup_id" @@ -1158,8 +1196,8 @@ func inflateBaseTree( // otherwise unchecked in tree inflation below this point. newSubtreePath := subtreePath.ToBuilder() - if p, ok := updatedPaths[subtreePath.String()]; ok { - newSubtreePath = p.ToBuilder() + if up, ok := updatedPaths[subtreePath.String()]; ok { + newSubtreePath = up.p.ToBuilder() } stats := count.New() diff --git a/src/internal/m365/collection/drive/collections.go b/src/internal/m365/collection/drive/collections.go index 4469ded21..7014642bd 100644 --- a/src/internal/m365/collection/drive/collections.go +++ b/src/internal/m365/collection/drive/collections.go @@ -522,6 +522,8 @@ func (c *Collections) Get( collections = append(collections, md) } + logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections)) + return collections, canUsePrevBackup, nil } @@ -737,6 +739,8 @@ func (c *Collections) PopulateDriveCollections( seenFolders = map[string]string{} ) + ctx = clues.Add(ctx, "invalid_prev_delta", invalidPrevDelta) + if !invalidPrevDelta { maps.Copy(newPrevPaths, oldPrevPaths) } @@ -755,6 +759,7 @@ func (c *Collections) PopulateDriveCollections( } if reset { + ctx = clues.Add(ctx, "delta_reset_occurred", true) newPrevPaths = map[string]string{} currPrevPaths = map[string]string{} c.CollectionMap[driveID] = map[string]*Collection{} diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index 7bc210dac..9ff603f4d 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -278,6 +278,8 @@ func populateCollections( collections["metadata"] = col + logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections)) + return collections, el.Failure() } diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index de5814577..1b317c785 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -109,7 +109,7 @@ func populateCollections( el = errs.Local() ) - logger.Ctx(ctx).Info("filling collections", "len_deltapaths", len(dps)) + logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps)) for _, c := range channels { if el.Failure() != nil { @@ -267,5 +267,7 @@ func populateCollections( collections["metadata"] = col + logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections)) + return collections, el.Failure() } diff --git a/src/internal/observe/observe.go b/src/internal/observe/observe.go index a9c7b56b0..4281f2814 100644 --- a/src/internal/observe/observe.go +++ b/src/internal/observe/observe.go @@ -505,14 +505,14 @@ func CollectionProgress( counted++ // Log every 1000 items that are processed if counted%1000 == 0 { - log.Infow("uploading", "count", counted) + log.Infow("uploading", "count_items", counted) } } if obs.hidden() || len(plain) == 0 { go listen(ctx, ch, nop, incCount) - defer log.Infow("done - "+message, "count", counted) + defer log.Infow("done - "+message, "count_items", counted) return ch } @@ -546,7 +546,7 @@ func CollectionProgress( }) go waitAndCloseBar(ctx, bar, obs.wg, func() { - log.Infow("done - "+message, "count", counted) + log.Infow("done - "+message, "count_items", counted) })() return ch diff --git a/src/pkg/services/m365/api/pagers/pagers.go b/src/pkg/services/m365/api/pagers/pagers.go index 34715501f..da1556afc 100644 --- a/src/pkg/services/m365/api/pagers/pagers.go +++ b/src/pkg/services/m365/api/pagers/pagers.go @@ -191,7 +191,8 @@ func EnumerateItems[T any]( defer npr.close() var ( - result = make([]T, 0) + pageCount = 0 + itemCount = 0 // stubbed initial value to ensure we enter the loop. nextLink = "do-while" ) @@ -204,7 +205,12 @@ func EnumerateItems[T any]( return } - if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil { + pageResults := page.GetValue() + + itemCount += len(pageResults) + pageCount++ + + if err := npr.writeNextPage(ctx, pageResults, false); err != nil { npr.err = clues.Stack(err) return } @@ -214,7 +220,10 @@ func EnumerateItems[T any]( pager.SetNextLink(nextLink) } - logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) + logger.Ctx(ctx).Infow( + "completed item enumeration", + "item_count", itemCount, + "page_count", pageCount) } func BatchEnumerateItems[T any]( @@ -262,7 +271,8 @@ func DeltaEnumerateItems[T any]( defer npr.close() var ( - result = make([]T, 0) + pageCount = 0 + itemCount = 0 // stubbed initial value to ensure we enter the loop. newDeltaLink = "" invalidPrevDelta = len(prevDeltaLink) == 0 @@ -304,6 +314,9 @@ func DeltaEnumerateItems[T any]( // Reset tells the pager to try again after ditching its delta history. pager.Reset(ctx) + pageCount = 0 + itemCount = 0 + if err := npr.writeNextPage(ctx, nil, true); err != nil { npr.err = clues.Stack(err) return @@ -317,7 +330,12 @@ func DeltaEnumerateItems[T any]( return } - if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil { + pageResults := page.GetValue() + + itemCount += len(pageResults) + pageCount++ + + if err := npr.writeNextPage(ctx, pageResults, false); err != nil { npr.err = clues.Stack(err) return } @@ -331,7 +349,10 @@ func DeltaEnumerateItems[T any]( pager.SetNextLink(nextLink) } - logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result)) + logger.Ctx(ctx).Infow( + "completed delta item enumeration", + "item_count", itemCount, + "page_count", pageCount) npr.du = DeltaUpdate{ URL: newDeltaLink,