add doNotMergeItems func to collections

Adds a new func to the data.Collections iface:
DoNotMergeItems.  This propagates a flag that
tells kopia when it should not add items from
previous snapshots into the current snapshot
for the given collection.  This is needed in cases
like delta token expiration, where we are forced
to re-discover all items in the container instead
of performing an incremental lookup.
This commit is contained in:
ryanfkeepers 2022-12-22 13:19:11 -07:00
parent 3089879cec
commit f4ec628df9
9 changed files with 56 additions and 12 deletions

View File

@ -69,6 +69,9 @@ type Collection struct {
prevPath path.Path prevPath path.Path
state data.CollectionState state data.CollectionState
// doNotMergeItems should only be true if the old delta token expired.
doNotMergeItems bool
} }
// NewExchangeDataCollection creates an ExchangeDataCollection. // NewExchangeDataCollection creates an ExchangeDataCollection.
@ -156,12 +159,14 @@ func (col Collection) PreviousPath() path.Path {
return nil return nil
} }
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
// hierarchies.
func (col Collection) State() data.CollectionState { func (col Collection) State() data.CollectionState {
return col.state return col.state
} }
func (col Collection) DoNotMergeItems() bool {
return col.doNotMergeItems
}
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize // populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the jobs field. data channel is closed by this function // all the M365IDs defined in the jobs field. data channel is closed by this function
func (col *Collection) populateByOptionIdentifier(ctx context.Context) { func (col *Collection) populateByOptionIdentifier(ctx context.Context) {

View File

@ -127,6 +127,10 @@ func (md MetadataCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (md MetadataCollection) DoNotMergeItems() bool {
return false
}
func (md MetadataCollection) Items() <-chan data.Stream { func (md MetadataCollection) Items() <-chan data.Stream {
res := make(chan data.Stream) res := make(chan data.Stream)

View File

@ -15,14 +15,15 @@ import (
// MockExchangeDataCollection represents a mock exchange mailbox // MockExchangeDataCollection represents a mock exchange mailbox
type MockExchangeDataCollection struct { type MockExchangeDataCollection struct {
fullPath path.Path fullPath path.Path
messageCount int messageCount int
Data [][]byte Data [][]byte
Names []string Names []string
ModTimes []time.Time ModTimes []time.Time
ColState data.CollectionState ColState data.CollectionState
PrevPath path.Path PrevPath path.Path
DeletedItems []bool DeletedItems []bool
doNotMergeItems bool
} }
var ( var (
@ -104,6 +105,10 @@ func (medc MockExchangeDataCollection) State() data.CollectionState {
return medc.ColState return medc.ColState
} }
func (medc MockExchangeDataCollection) DoNotMergeItems() bool {
return medc.doNotMergeItems
}
// Items returns a channel that has the next items in the collection. The // Items returns a channel that has the next items in the collection. The
// channel is closed when there are no more items available. // channel is closed when there are no more items available.
func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {

View File

@ -57,6 +57,9 @@ type Collection struct {
statusUpdater support.StatusUpdater statusUpdater support.StatusUpdater
itemReader itemReaderFunc itemReader itemReaderFunc
ctrl control.Options ctrl control.Options
// should only be true if the old delta token expired
doNotMergeItems bool
} }
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
@ -123,6 +126,10 @@ func (oc Collection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (oc Collection) DoNotMergeItems() bool {
return oc.doNotMergeItems
}
// Item represents a single item retrieved from OneDrive // Item represents a single item retrieved from OneDrive
type Item struct { type Item struct {
id string id string

View File

@ -81,12 +81,14 @@ func (sc Collection) PreviousPath() path.Path {
return nil return nil
} }
// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder
// hierarchies.
func (sc Collection) State() data.CollectionState { func (sc Collection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (sc Collection) DoNotMergeItems() bool {
return false
}
func (sc *Collection) Items() <-chan data.Stream { func (sc *Collection) Items() <-chan data.Stream {
go sc.populate(context.TODO()) go sc.populate(context.TODO())
return sc.data return sc.data

View File

@ -47,6 +47,15 @@ type Collection interface {
// backup along with all items and Collections below them in the hierarchy // backup along with all items and Collections below them in the hierarchy
// unless said items/Collections were moved. // unless said items/Collections were moved.
State() CollectionState State() CollectionState
// DoNotMergeItems informs kopia that the collection is rebuilding its contents
// from scratch, and that any items currently stored in that collection should
// be skipped during the process of merging historical data into the new backup.
// This flag is normally expected to be false. It should only be flagged under
// specific circumstances. Example: if the link token used for incremental queries
// expires or otherwise becomes unusable, thus requiring the backup producer to
// re-discover all data in the container. This flag only affects the path of the
// collection, and does not cascade to subfolders.
DoNotMergeItems() bool
} }
// Stream represents a single item within a Collection // Stream represents a single item within a Collection

View File

@ -30,6 +30,10 @@ func (mc mockColl) State() CollectionState {
return NewState return NewState
} }
func (mc mockColl) DoNotMergeItems() bool {
return false
}
type CollectionSuite struct { type CollectionSuite struct {
suite.Suite suite.Suite
} }

View File

@ -43,6 +43,10 @@ func (kdc kopiaDataCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (kdc kopiaDataCollection) DoNotMergeItems() bool {
return false
}
type kopiaDataStream struct { type kopiaDataStream struct {
reader io.ReadCloser reader io.ReadCloser
uuid string uuid string

View File

@ -177,6 +177,10 @@ func (dc *streamCollection) State() data.CollectionState {
return data.NewState return data.NewState
} }
func (dc *streamCollection) DoNotMergeItems() bool {
return false
}
// Items() always returns a channel with a single data.Stream // Items() always returns a channel with a single data.Stream
// representing the object to be persisted // representing the object to be persisted
func (dc *streamCollection) Items() <-chan data.Stream { func (dc *streamCollection) Items() <-chan data.Stream {