track tombstones by id, not path

This commit is contained in:
ryanfkeepers 2022-12-22 12:37:55 -07:00
parent f0c638e8b9
commit 3089879cec

View File

@ -72,13 +72,9 @@ func filterContainersAndFillCollections(
} }
cID := *c.GetId() cID := *c.GetId()
delete(tombstones, cID)
currPath, ok := includeContainer(qp, c, scope) currPath, ok := includeContainer(qp, c, scope)
if currPath != nil {
// this path exists, do not delete it, even if it isn't
// included in this backup.
delete(tombstones, currPath.String())
}
// Only create a collection if the path matches the scope. // Only create a collection if the path matches the scope.
if !ok { if !ok {
continue continue
@ -107,7 +103,7 @@ func filterContainersAndFillCollections(
if graph.IsErrDeletedInFlight(err) == nil { if graph.IsErrDeletedInFlight(err) == nil {
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
} else { } else {
tombstones[currPath.String()] = struct{}{} tombstones[cID] = dp.path
} }
continue continue
@ -122,7 +118,7 @@ func filterContainersAndFillCollections(
// We need to mark the collection as New, and tombstone // We need to mark the collection as New, and tombstone
// the path to clear out any data for a clean refresh. // the path to clear out any data for a clean refresh.
prevPath = nil prevPath = nil
tombstones[currPath.String()] = struct{}{} tombstones[cID] = dp.path
} }
edc := NewCollection( edc := NewCollection(
@ -150,16 +146,24 @@ func filterContainersAndFillCollections(
// to the same container (ex: container_1 gets deleted, then container_2 // to the same container (ex: container_1 gets deleted, then container_2
// gets created with the same name), it is assumed that the backup consumer // gets created with the same name), it is assumed that the backup consumer
// processes deletions before creations, making the combined operation safe. // processes deletions before creations, making the combined operation safe.
for p := range tombstones { for id, p := range tombstones {
service, err := createService(qp.Credentials) service, err := createService(qp.Credentials)
if err != nil { if err != nil {
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
continue continue
} }
// only occurs if it was a new folder that we picked up during the container
// resolver phase that got deleted in flight by the time we hit this stage.
if len(p) == 0 {
continue
}
prevPath, err := pathFromPrevString(p) prevPath, err := pathFromPrevString(p)
if err != nil { if err != nil {
logger.Ctx(ctx).Error(err) // technically shouldn't ever happen. But just in case, we need to catch
// it for protection.
logger.Ctx(ctx).Errorw("parsing tombstone path", "err", err)
continue continue
} }
@ -172,7 +176,7 @@ func filterContainersAndFillCollections(
statusUpdater, statusUpdater,
ctrlOpts, ctrlOpts,
) )
collections[p] = &edc collections[id] = &edc
} }
entries := []graph.MetadataCollectionEntry{ entries := []graph.MetadataCollectionEntry{
@ -199,14 +203,14 @@ func filterContainersAndFillCollections(
return errs return errs
} }
// produces a set keyed by path strings from the deltapaths map. // produces a set of id:path pairs from the deltapaths map.
// Each entry in the set will, if not removed, produce a collection // Each entry in the set will, if not removed, produce a collection
// that will delete the tombstone by path. // that will delete the tombstone by path.
func makeTombstones(dps DeltaPaths) map[string]struct{} { func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]struct{}, len(dps)) r := make(map[string]string, len(dps))
for _, v := range dps { for id, v := range dps {
r[v.path] = struct{}{} r[id] = v.path
} }
return r return r