From f4ec628df9b4dfae948ec678027be99db4716453 Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Thu, 22 Dec 2022 13:19:11 -0700 Subject: [PATCH] add doNotMergeItems func to collections Adds a new func to the data.Collections iface: DoNotMergeItems. This propagates a flag that tells kopia when it should not add items from previous snapshots into the current snapshot for the given collection. This is needed in cases like delta token expiration, where we are forced to re-discover all items in the container instead of performing an incremental lookup. --- .../exchange/exchange_data_collection.go | 9 ++++++-- .../connector/graph/metadata_collection.go | 4 ++++ .../mockconnector/mock_data_collection.go | 21 ++++++++++++------- src/internal/connector/onedrive/collection.go | 7 +++++++ .../connector/sharepoint/collection.go | 6 ++++-- src/internal/data/data_collection.go | 9 ++++++++ src/internal/data/data_collection_test.go | 4 ++++ src/internal/kopia/data_collection.go | 4 ++++ src/internal/streamstore/streamstore.go | 4 ++++ 9 files changed, 56 insertions(+), 12 deletions(-) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 62e05d52d..9157f8f47 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -69,6 +69,9 @@ type Collection struct { prevPath path.Path state data.CollectionState + + // doNotMergeItems should only be true if the old delta token expired. + doNotMergeItems bool } // NewExchangeDataCollection creates an ExchangeDataCollection. @@ -156,12 +159,14 @@ func (col Collection) PreviousPath() path.Path { return nil } -// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder -// hierarchies. func (col Collection) State() data.CollectionState { return col.state } +func (col Collection) DoNotMergeItems() bool { + return col.doNotMergeItems +} + // populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize // all the M365IDs defined in the jobs field. data channel is closed by this function func (col *Collection) populateByOptionIdentifier(ctx context.Context) { diff --git a/src/internal/connector/graph/metadata_collection.go b/src/internal/connector/graph/metadata_collection.go index acab27b2a..6036bfbe4 100644 --- a/src/internal/connector/graph/metadata_collection.go +++ b/src/internal/connector/graph/metadata_collection.go @@ -127,6 +127,10 @@ func (md MetadataCollection) State() data.CollectionState { return data.NewState } +func (md MetadataCollection) DoNotMergeItems() bool { + return false +} + func (md MetadataCollection) Items() <-chan data.Stream { res := make(chan data.Stream) diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index e7c5d83ae..584b411a2 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -15,14 +15,15 @@ import ( // MockExchangeDataCollection represents a mock exchange mailbox type MockExchangeDataCollection struct { - fullPath path.Path - messageCount int - Data [][]byte - Names []string - ModTimes []time.Time - ColState data.CollectionState - PrevPath path.Path - DeletedItems []bool + fullPath path.Path + messageCount int + Data [][]byte + Names []string + ModTimes []time.Time + ColState data.CollectionState + PrevPath path.Path + DeletedItems []bool + doNotMergeItems bool } var ( @@ -104,6 +105,10 @@ func (medc MockExchangeDataCollection) State() data.CollectionState { return medc.ColState } +func (medc MockExchangeDataCollection) DoNotMergeItems() bool { + return medc.doNotMergeItems +} + // Items returns a channel that has the next items in the collection. The // channel is closed when there are no more items available. func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index c225b8571..8e4e2e3f0 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -57,6 +57,9 @@ type Collection struct { statusUpdater support.StatusUpdater itemReader itemReaderFunc ctrl control.Options + + // should only be true if the old delta token expired + doNotMergeItems bool } // itemReadFunc returns a reader for the specified item @@ -123,6 +126,10 @@ func (oc Collection) State() data.CollectionState { return data.NewState } +func (oc Collection) DoNotMergeItems() bool { + return oc.doNotMergeItems +} + // Item represents a single item retrieved from OneDrive type Item struct { id string diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index fee8d33e9..14d0beb34 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -81,12 +81,14 @@ func (sc Collection) PreviousPath() path.Path { return nil } -// TODO(ashmrtn): Fill in once GraphConnector compares old and new folder -// hierarchies. func (sc Collection) State() data.CollectionState { return data.NewState } +func (sc Collection) DoNotMergeItems() bool { + return false +} + func (sc *Collection) Items() <-chan data.Stream { go sc.populate(context.TODO()) return sc.data diff --git a/src/internal/data/data_collection.go b/src/internal/data/data_collection.go index 0db720105..20f5f74cc 100644 --- a/src/internal/data/data_collection.go +++ b/src/internal/data/data_collection.go @@ -47,6 +47,15 @@ type Collection interface { // backup along with all items and Collections below them in the hierarchy // unless said items/Collections were moved. State() CollectionState + // DoNotMergeItems informs kopia that the collection is rebuilding its contents + // from scratch, and that any items currently stored in that collection should + // be skipped during the process of merging historical data into the new backup. + // This flag is normally expected to be false. It should only be flagged under + // specific circumstances. Example: if the link token used for incremental queries + // expires or otherwise becomes unusable, thus requiring the backup producer to + // re-discover all data in the container. This flag only affects the path of the + // collection, and does not cascade to subfolders. + DoNotMergeItems() bool } // Stream represents a single item within a Collection diff --git a/src/internal/data/data_collection_test.go b/src/internal/data/data_collection_test.go index b9facad4a..091762daf 100644 --- a/src/internal/data/data_collection_test.go +++ b/src/internal/data/data_collection_test.go @@ -30,6 +30,10 @@ func (mc mockColl) State() CollectionState { return NewState } +func (mc mockColl) DoNotMergeItems() bool { + return false +} + type CollectionSuite struct { suite.Suite } diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 0afa39c35..4197f754c 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -43,6 +43,10 @@ func (kdc kopiaDataCollection) State() data.CollectionState { return data.NewState } +func (kdc kopiaDataCollection) DoNotMergeItems() bool { + return false +} + type kopiaDataStream struct { reader io.ReadCloser uuid string diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 287a23413..9938e0627 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -177,6 +177,10 @@ func (dc *streamCollection) State() data.CollectionState { return data.NewState } +func (dc *streamCollection) DoNotMergeItems() bool { + return false +} + // Items() always returns a channel with a single data.Stream // representing the object to be persisted func (dc *streamCollection) Items() <-chan data.Stream {