add multi delta enumeration to collection tree

tests coming in follow-up PR
This commit is contained in:
ryanfkeepers 2023-11-29 13:49:19 -07:00
parent 14225ad616
commit 3ab500a7d8

View File

@ -277,65 +277,94 @@ func (c *Collections) populateTree(
ctx = clues.Add(ctx, "invalid_prev_delta", len(prevDeltaLink) == 0) ctx = clues.Add(ctx, "invalid_prev_delta", len(prevDeltaLink) == 0)
var ( var (
driveID = ptr.Val(drv.GetId()) currDeltaLink = prevDeltaLink
el = errs.Local() driveID = ptr.Val(drv.GetId())
el = errs.Local()
du pagers.DeltaUpdate
finished bool
hitLimit bool
) )
// TODO(keepers): to end in a correct state, we'll eventually need to run this for !hitLimit && !finished && el.Failure() == nil {
// query multiple times over, until it ends in an empty change set. var (
pager := c.handler.EnumerateDriveItemsDelta( pageCount int
ctx, pageItemCount int
driveID, hadReset bool
prevDeltaLink, err error
api.CallConfig{ )
Select: api.DefaultDriveItemProps(),
})
for page, reset, done := pager.NextPage(); !done; page, reset, done = pager.NextPage() { // TODO(keepers): to end in a correct state, we'll eventually need to run this
if el.Failure() != nil { // query multiple times over, until it ends in an empty change set.
break pager := c.handler.EnumerateDriveItemsDelta(
}
if reset {
counter.Inc(count.PagerResets)
tree.reset()
c.resetStats()
}
err := c.enumeratePageOfItems(
ctx, ctx,
tree, driveID,
drv, currDeltaLink,
page, api.CallConfig{
limiter, Select: api.DefaultDriveItemProps(),
counter, })
errs)
if err != nil { for page, reset, done := pager.NextPage(); !done; page, reset, done = pager.NextPage() {
if errors.Is(err, errHitLimit) { if el.Failure() != nil {
return du, el.Failure()
}
if reset {
counter.Inc(count.PagerResets)
tree.reset()
c.resetStats()
pageCount = 0
pageItemCount = 0
hadReset = true
}
err = c.enumeratePageOfItems(
ctx,
tree,
drv,
page,
limiter,
counter,
errs)
if err != nil {
if errors.Is(err, errHitLimit) {
hitLimit = true
break
}
el.AddRecoverable(ctx, clues.Stack(err))
}
counter.Inc(count.PagesEnumerated)
// Stop enumeration early if we've reached the page limit. Keep this
// at the end of the loop so we don't request another page (pager.NextPage)
// before seeing we've passed the limit.
if limiter.hitPageLimit(int(counter.Get(count.PagesEnumerated))) {
hitLimit = true
break break
} }
el.AddRecoverable(ctx, clues.Stack(err)) pageCount++
pageItemCount += len(page)
} }
counter.Inc(count.PagesEnumerated) // Always cancel the pager so that even if we exit early from the loop above
// we don't deadlock. Cancelling a pager that's already completed is
// essentially a noop.
pager.Cancel()
// Stop enumeration early if we've reached the page limit. Keep this du, err = pager.Results()
// at the end of the loop so we don't request another page (pager.NextPage) if err != nil {
// before seeing we've passed the limit. return du, clues.Stack(err)
if limiter.hitPageLimit(int(counter.Get(count.PagesEnumerated))) {
break
} }
}
// Always cancel the pager so that even if we exit early from the loop above currDeltaLink = du.URL
// we don't deadlock. Cancelling a pager that's already completed is
// essentially a noop.
pager.Cancel()
du, err := pager.Results() // 0 pages is never expected. We should at least have one (empty) page to
if err != nil { // consume. But checking pageCount == 1 is brittle in a non-helpful way.
return du, clues.Stack(err) finished = pageCount < 2 && pageItemCount == 0 && !hadReset
} }
logger.Ctx(ctx).Infow("enumerated collection delta", "stats", counter.Values()) logger.Ctx(ctx).Infow("enumerated collection delta", "stats", counter.Values())