some minor logging touchups (#4617)
#### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🤖 Supportability/Tests #### Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
fedde050ea
commit
85d13b382a
@ -723,16 +723,25 @@ func addMergeLocation(col data.BackupCollection, toMerge *mergeDetails) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type pathUpdate struct {
|
||||
p path.Path
|
||||
state data.CollectionState
|
||||
}
|
||||
|
||||
func inflateCollectionTree(
|
||||
ctx context.Context,
|
||||
collections []data.BackupCollection,
|
||||
toMerge *mergeDetails,
|
||||
) (map[string]*treeMap, map[string]path.Path, error) {
|
||||
) (map[string]*treeMap, map[string]pathUpdate, error) {
|
||||
// failed is temporary and just allows us to log all conflicts before
|
||||
// returning an error.
|
||||
var firstErr error
|
||||
|
||||
roots := make(map[string]*treeMap)
|
||||
// Contains the old path for collections that are not new.
|
||||
// Allows resolving what the new path should be when walking the base
|
||||
// snapshot(s)'s hierarchy. Nil represents a collection that was deleted.
|
||||
updatedPaths := make(map[string]path.Path)
|
||||
updatedPaths := make(map[string]pathUpdate)
|
||||
// Temporary variable just to track the things that have been marked as
|
||||
// changed while keeping a reference to their path.
|
||||
changedPaths := []path.Path{}
|
||||
@ -752,24 +761,39 @@ func inflateCollectionTree(
|
||||
|
||||
changedPaths = append(changedPaths, s.PreviousPath())
|
||||
|
||||
if _, ok := updatedPaths[s.PreviousPath().String()]; ok {
|
||||
return nil, nil, clues.New("multiple previous state changes to collection").
|
||||
WithClues(ictx)
|
||||
if p, ok := updatedPaths[s.PreviousPath().String()]; ok {
|
||||
err := clues.New("multiple previous state changes").
|
||||
WithClues(ictx).
|
||||
With("updated_path", p, "current_state", data.DeletedState)
|
||||
logger.CtxErr(ictx, err).Error("previous path state collision")
|
||||
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
updatedPaths[s.PreviousPath().String()] = nil
|
||||
updatedPaths[s.PreviousPath().String()] = pathUpdate{state: data.DeletedState}
|
||||
|
||||
continue
|
||||
|
||||
case data.MovedState:
|
||||
changedPaths = append(changedPaths, s.PreviousPath())
|
||||
|
||||
if _, ok := updatedPaths[s.PreviousPath().String()]; ok {
|
||||
return nil, nil, clues.New("multiple previous state changes to collection").
|
||||
WithClues(ictx)
|
||||
if p, ok := updatedPaths[s.PreviousPath().String()]; ok {
|
||||
err := clues.New("multiple previous state changes").
|
||||
WithClues(ictx).
|
||||
With("updated_path", p, "current_state", data.MovedState)
|
||||
logger.CtxErr(ictx, err).Error("previous path state collision")
|
||||
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
updatedPaths[s.PreviousPath().String()] = s.FullPath()
|
||||
updatedPaths[s.PreviousPath().String()] = pathUpdate{
|
||||
p: s.FullPath(),
|
||||
state: data.MovedState,
|
||||
}
|
||||
|
||||
// Only safe when collections are moved since we only need prefix matching
|
||||
// if a nested folder's path changed in some way that didn't generate a
|
||||
@ -780,14 +804,24 @@ func inflateCollectionTree(
|
||||
return nil, nil, clues.Wrap(err, "adding merge location").
|
||||
WithClues(ictx)
|
||||
}
|
||||
|
||||
case data.NotMovedState:
|
||||
p := s.PreviousPath().String()
|
||||
if _, ok := updatedPaths[p]; ok {
|
||||
return nil, nil, clues.New("multiple previous state changes to collection").
|
||||
WithClues(ictx)
|
||||
if p, ok := updatedPaths[p]; ok {
|
||||
err := clues.New("multiple previous state changes").
|
||||
WithClues(ictx).
|
||||
With("updated_path", p, "current_state", data.NotMovedState)
|
||||
logger.CtxErr(ictx, err).Error("previous path state collision")
|
||||
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
updatedPaths[p] = s.FullPath()
|
||||
updatedPaths[p] = pathUpdate{
|
||||
p: s.FullPath(),
|
||||
state: data.NotMovedState,
|
||||
}
|
||||
}
|
||||
|
||||
if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 {
|
||||
@ -821,18 +855,22 @@ func inflateCollectionTree(
|
||||
}
|
||||
|
||||
if node.collection != nil && node.collection.State() == data.NotMovedState {
|
||||
return nil, nil, clues.New("conflicting states for collection").
|
||||
WithClues(ctx).
|
||||
With("changed_path", p)
|
||||
err := clues.New("conflicting states for collection").
|
||||
WithClues(ctx)
|
||||
logger.CtxErr(ctx, err).Error("adding node to tree")
|
||||
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return roots, updatedPaths, nil
|
||||
return roots, updatedPaths, clues.Stack(firstErr).OrNil()
|
||||
}
|
||||
|
||||
func subtreeChanged(
|
||||
roots map[string]*treeMap,
|
||||
updatedPaths map[string]path.Path,
|
||||
updatedPaths map[string]pathUpdate,
|
||||
oldDirPath *path.Builder,
|
||||
currentPath *path.Builder,
|
||||
) bool {
|
||||
@ -889,7 +927,7 @@ func subtreeChanged(
|
||||
func traverseBaseDir(
|
||||
ctx context.Context,
|
||||
depth int,
|
||||
updatedPaths map[string]path.Path,
|
||||
updatedPaths map[string]pathUpdate,
|
||||
oldDirPath *path.Builder,
|
||||
expectedDirPath *path.Builder,
|
||||
dir fs.Directory,
|
||||
@ -934,14 +972,14 @@ func traverseBaseDir(
|
||||
|
||||
if upb, ok := updatedPaths[oldDirPath.String()]; ok {
|
||||
// This directory was deleted.
|
||||
if upb == nil {
|
||||
if upb.p == nil {
|
||||
currentPath = nil
|
||||
|
||||
stats.Inc(statDel)
|
||||
} else {
|
||||
// This directory was explicitly mentioned and the new (possibly
|
||||
// unchanged) location is in upb.
|
||||
currentPath = upb.ToBuilder()
|
||||
currentPath = upb.p.ToBuilder()
|
||||
|
||||
// Below we check if the collection was marked as new or DoNotMerge which
|
||||
// disables merging behavior. That means we can't directly update stats
|
||||
@ -1087,7 +1125,7 @@ func inflateBaseTree(
|
||||
ctx context.Context,
|
||||
loader snapshotLoader,
|
||||
base BackupBase,
|
||||
updatedPaths map[string]path.Path,
|
||||
updatedPaths map[string]pathUpdate,
|
||||
roots map[string]*treeMap,
|
||||
) error {
|
||||
bupID := "no_backup_id"
|
||||
@ -1158,8 +1196,8 @@ func inflateBaseTree(
|
||||
// otherwise unchecked in tree inflation below this point.
|
||||
newSubtreePath := subtreePath.ToBuilder()
|
||||
|
||||
if p, ok := updatedPaths[subtreePath.String()]; ok {
|
||||
newSubtreePath = p.ToBuilder()
|
||||
if up, ok := updatedPaths[subtreePath.String()]; ok {
|
||||
newSubtreePath = up.p.ToBuilder()
|
||||
}
|
||||
|
||||
stats := count.New()
|
||||
|
||||
@ -522,6 +522,8 @@ func (c *Collections) Get(
|
||||
collections = append(collections, md)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections))
|
||||
|
||||
return collections, canUsePrevBackup, nil
|
||||
}
|
||||
|
||||
@ -737,6 +739,8 @@ func (c *Collections) PopulateDriveCollections(
|
||||
seenFolders = map[string]string{}
|
||||
)
|
||||
|
||||
ctx = clues.Add(ctx, "invalid_prev_delta", invalidPrevDelta)
|
||||
|
||||
if !invalidPrevDelta {
|
||||
maps.Copy(newPrevPaths, oldPrevPaths)
|
||||
}
|
||||
@ -755,6 +759,7 @@ func (c *Collections) PopulateDriveCollections(
|
||||
}
|
||||
|
||||
if reset {
|
||||
ctx = clues.Add(ctx, "delta_reset_occurred", true)
|
||||
newPrevPaths = map[string]string{}
|
||||
currPrevPaths = map[string]string{}
|
||||
c.CollectionMap[driveID] = map[string]*Collection{}
|
||||
|
||||
@ -278,6 +278,8 @@ func populateCollections(
|
||||
|
||||
collections["metadata"] = col
|
||||
|
||||
logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections))
|
||||
|
||||
return collections, el.Failure()
|
||||
}
|
||||
|
||||
|
||||
@ -109,7 +109,7 @@ func populateCollections(
|
||||
el = errs.Local()
|
||||
)
|
||||
|
||||
logger.Ctx(ctx).Info("filling collections", "len_deltapaths", len(dps))
|
||||
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
|
||||
|
||||
for _, c := range channels {
|
||||
if el.Failure() != nil {
|
||||
@ -267,5 +267,7 @@ func populateCollections(
|
||||
|
||||
collections["metadata"] = col
|
||||
|
||||
logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections))
|
||||
|
||||
return collections, el.Failure()
|
||||
}
|
||||
|
||||
@ -505,14 +505,14 @@ func CollectionProgress(
|
||||
counted++
|
||||
// Log every 1000 items that are processed
|
||||
if counted%1000 == 0 {
|
||||
log.Infow("uploading", "count", counted)
|
||||
log.Infow("uploading", "count_items", counted)
|
||||
}
|
||||
}
|
||||
|
||||
if obs.hidden() || len(plain) == 0 {
|
||||
go listen(ctx, ch, nop, incCount)
|
||||
|
||||
defer log.Infow("done - "+message, "count", counted)
|
||||
defer log.Infow("done - "+message, "count_items", counted)
|
||||
|
||||
return ch
|
||||
}
|
||||
@ -546,7 +546,7 @@ func CollectionProgress(
|
||||
})
|
||||
|
||||
go waitAndCloseBar(ctx, bar, obs.wg, func() {
|
||||
log.Infow("done - "+message, "count", counted)
|
||||
log.Infow("done - "+message, "count_items", counted)
|
||||
})()
|
||||
|
||||
return ch
|
||||
|
||||
@ -191,7 +191,8 @@ func EnumerateItems[T any](
|
||||
defer npr.close()
|
||||
|
||||
var (
|
||||
result = make([]T, 0)
|
||||
pageCount = 0
|
||||
itemCount = 0
|
||||
// stubbed initial value to ensure we enter the loop.
|
||||
nextLink = "do-while"
|
||||
)
|
||||
@ -204,7 +205,12 @@ func EnumerateItems[T any](
|
||||
return
|
||||
}
|
||||
|
||||
if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil {
|
||||
pageResults := page.GetValue()
|
||||
|
||||
itemCount += len(pageResults)
|
||||
pageCount++
|
||||
|
||||
if err := npr.writeNextPage(ctx, pageResults, false); err != nil {
|
||||
npr.err = clues.Stack(err)
|
||||
return
|
||||
}
|
||||
@ -214,7 +220,10 @@ func EnumerateItems[T any](
|
||||
pager.SetNextLink(nextLink)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result))
|
||||
logger.Ctx(ctx).Infow(
|
||||
"completed item enumeration",
|
||||
"item_count", itemCount,
|
||||
"page_count", pageCount)
|
||||
}
|
||||
|
||||
func BatchEnumerateItems[T any](
|
||||
@ -262,7 +271,8 @@ func DeltaEnumerateItems[T any](
|
||||
defer npr.close()
|
||||
|
||||
var (
|
||||
result = make([]T, 0)
|
||||
pageCount = 0
|
||||
itemCount = 0
|
||||
// stubbed initial value to ensure we enter the loop.
|
||||
newDeltaLink = ""
|
||||
invalidPrevDelta = len(prevDeltaLink) == 0
|
||||
@ -304,6 +314,9 @@ func DeltaEnumerateItems[T any](
|
||||
// Reset tells the pager to try again after ditching its delta history.
|
||||
pager.Reset(ctx)
|
||||
|
||||
pageCount = 0
|
||||
itemCount = 0
|
||||
|
||||
if err := npr.writeNextPage(ctx, nil, true); err != nil {
|
||||
npr.err = clues.Stack(err)
|
||||
return
|
||||
@ -317,7 +330,12 @@ func DeltaEnumerateItems[T any](
|
||||
return
|
||||
}
|
||||
|
||||
if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil {
|
||||
pageResults := page.GetValue()
|
||||
|
||||
itemCount += len(pageResults)
|
||||
pageCount++
|
||||
|
||||
if err := npr.writeNextPage(ctx, pageResults, false); err != nil {
|
||||
npr.err = clues.Stack(err)
|
||||
return
|
||||
}
|
||||
@ -331,7 +349,10 @@ func DeltaEnumerateItems[T any](
|
||||
pager.SetNextLink(nextLink)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result))
|
||||
logger.Ctx(ctx).Infow(
|
||||
"completed delta item enumeration",
|
||||
"item_count", itemCount,
|
||||
"page_count", pageCount)
|
||||
|
||||
npr.du = DeltaUpdate{
|
||||
URL: newDeltaLink,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user