Implement ModTime interface in service streams (#1670)

## Description

Add ModTime to Exchange, OneDrive and SharePoint list stream items. This enables kopia-assisted incrementals for those items. Backup details still contains a complete set of information for all items in the backup regardless of if kopia uploaded data for the item or not.

Kopia-assisted incrementals does come with some caveats though. If changes are made to an item in M365 and that change does not cause the modified time reported by M365 to update, then the change will not be backed up. Currently, only marking an email as read/unread is known to hit this edge case.

This patch does not lazily fetch data from Graph API. This means that kopia may upload less data, but the same amount of data will still be pulled from Graph

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* closes #622 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-05 10:01:38 -08:00 committed by GitHub
parent b1b0d2ba63
commit 8fb55ec886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 19 deletions

View File

@ -31,6 +31,7 @@ var (
_ data.Collection = &Collection{} _ data.Collection = &Collection{}
_ data.Stream = &Stream{} _ data.Stream = &Stream{}
_ data.StreamInfo = &Stream{} _ data.StreamInfo = &Stream{}
_ data.StreamModTime = &Stream{}
) )
const ( const (
@ -222,6 +223,20 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, totalB
col.statusUpdater(status) col.statusUpdater(status)
} }
type modTimer interface {
GetLastModifiedDateTime() *time.Time
}
func getModTime(mt modTimer) time.Time {
res := time.Now()
if t := mt.GetLastModifiedDateTime(); t != nil {
res = *t
}
return res
}
// GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc // GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc
// responses into data.Stream items contained within the Collection // responses into data.Stream items contained within the Collection
type GraphSerializeFunc func( type GraphSerializeFunc func(
@ -290,7 +305,12 @@ func eventToDataCollection(
} }
if len(byteArray) > 0 { if len(byteArray) > 0 {
dataChannel <- &Stream{id: *event.GetId(), message: byteArray, info: EventInfo(event, int64(len(byteArray)))} dataChannel <- &Stream{
id: *event.GetId(),
message: byteArray,
info: EventInfo(event, int64(len(byteArray))),
modTime: getModTime(event),
}
} }
return len(byteArray), nil return len(byteArray), nil
@ -323,7 +343,12 @@ func contactToDataCollection(
} }
if len(byteArray) > 0 { if len(byteArray) > 0 {
dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: ContactInfo(contact, int64(len(byteArray)))} dataChannel <- &Stream{
id: *contact.GetId(),
message: byteArray,
info: ContactInfo(contact, int64(len(byteArray))),
modTime: getModTime(contact),
}
} }
return len(byteArray), nil return len(byteArray), nil
@ -382,7 +407,12 @@ func messageToDataCollection(
return 0, support.SetNonRecoverableError(err) return 0, support.SetNonRecoverableError(err)
} }
dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage, int64(len(byteArray)))} dataChannel <- &Stream{
id: *aMessage.GetId(),
message: byteArray,
info: MessageInfo(aMessage, int64(len(byteArray))),
modTime: getModTime(aMessage),
}
return len(byteArray), nil return len(byteArray), nil
} }
@ -395,6 +425,9 @@ type Stream struct {
// some structured type in here (serialization to []byte can be done in `Read`) // some structured type in here (serialization to []byte can be done in `Read`)
message []byte message []byte
info *details.ExchangeInfo // temporary change to bring populate function into directory info *details.ExchangeInfo // temporary change to bring populate function into directory
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
// request to provide modtime in ItemInfo structs.
modTime time.Time
} }
func (od *Stream) UUID() string { func (od *Stream) UUID() string {
@ -409,11 +442,16 @@ func (od *Stream) Info() details.ItemInfo {
return details.ItemInfo{Exchange: od.info} return details.ItemInfo{Exchange: od.info}
} }
func (od *Stream) ModTime() time.Time {
return od.modTime
}
// NewStream constructor for exchange.Stream object // NewStream constructor for exchange.Stream object
func NewStream(identifier string, dataBytes []byte, detail details.ExchangeInfo) Stream { func NewStream(identifier string, dataBytes []byte, detail details.ExchangeInfo, modTime time.Time) Stream {
return Stream{ return Stream{
id: identifier, id: identifier,
message: dataBytes, message: dataBytes,
info: &detail, info: &detail,
modTime: modTime,
} }
} }

View File

@ -633,6 +633,10 @@ func compareItem(
category path.CategoryType, category path.CategoryType,
item data.Stream, item data.Stream,
) { ) {
if mt, ok := item.(data.StreamModTime); ok {
assert.NotZero(t, mt.ModTime())
}
switch service { switch service {
case path.ExchangeService: case path.ExchangeService:
switch category { switch category {

View File

@ -34,6 +34,7 @@ var (
_ data.Collection = &Collection{} _ data.Collection = &Collection{}
_ data.Stream = &Item{} _ data.Stream = &Item{}
_ data.StreamInfo = &Item{} _ data.StreamInfo = &Item{}
_ data.StreamModTime = &Item{}
) )
// Collection represents a set of OneDrive objects retreived from M365 // Collection represents a set of OneDrive objects retreived from M365
@ -115,6 +116,10 @@ func (od *Item) Info() details.ItemInfo {
return details.ItemInfo{OneDrive: od.info} return details.ItemInfo{OneDrive: od.info}
} }
func (od *Item) ModTime() time.Time {
return od.info.Modified
}
// populateItems iterates through items added to the collection // populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item // and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context) { func (oc *Collection) populateItems(ctx context.Context) {

View File

@ -7,6 +7,7 @@ import (
"io" "io"
"sync" "sync"
"testing" "testing"
"time"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -59,6 +60,7 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
t := suite.T() t := suite.T()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
collStatus := support.ConnectorOperationStatus{} collStatus := support.ConnectorOperationStatus{}
now := time.Now()
folderPath, err := GetCanonicalPath("drive/driveID1/root:/dir1/dir2/dir3", "a-tenant", "a-user", OneDriveSource) folderPath, err := GetCanonicalPath("drive/driveID1/root:/dir1/dir2/dir3", "a-tenant", "a-user", OneDriveSource)
require.NoError(t, err) require.NoError(t, err)
@ -77,7 +79,10 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
coll.Add(testItemID) coll.Add(testItemID)
coll.itemReader = func(context.Context, graph.Service, string, string) (*details.OneDriveInfo, io.ReadCloser, error) { coll.itemReader = func(context.Context, graph.Service, string, string) (*details.OneDriveInfo, io.ReadCloser, error) {
return &details.OneDriveInfo{ItemName: testItemName}, io.NopCloser(bytes.NewReader(testItemData)), nil return &details.OneDriveInfo{
ItemName: testItemName,
Modified: now,
}, io.NopCloser(bytes.NewReader(testItemData)), nil
} }
// Read items from the collection // Read items from the collection
@ -101,6 +106,11 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
readItemInfo := readItem.(data.StreamInfo) readItemInfo := readItem.(data.StreamInfo)
assert.Equal(t, testItemName, readItem.UUID()) assert.Equal(t, testItemName, readItem.UUID())
require.Implements(t, (*data.StreamModTime)(nil), readItem)
mt := readItem.(data.StreamModTime)
assert.Equal(t, now, mt.ModTime())
readData, err := io.ReadAll(readItem.ToReader()) readData, err := io.ReadAll(readItem.ToReader())
require.NoError(t, err) require.NoError(t, err)

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"io" "io"
"time"
kw "github.com/microsoft/kiota-serialization-json-go" kw "github.com/microsoft/kiota-serialization-json-go"
@ -29,6 +30,8 @@ const (
var ( var (
_ data.Collection = &Collection{} _ data.Collection = &Collection{}
_ data.Stream = &Item{} _ data.Stream = &Item{}
_ data.StreamInfo = &Item{}
_ data.StreamModTime = &Item{}
) )
type Collection struct { type Collection struct {
@ -75,6 +78,7 @@ type Item struct {
id string id string
data io.ReadCloser data io.ReadCloser
info *details.SharePointInfo info *details.SharePointInfo
modTime time.Time
} }
func (sd *Item) UUID() string { func (sd *Item) UUID() string {
@ -89,6 +93,10 @@ func (sd *Item) Info() details.ItemInfo {
return details.ItemInfo{SharePoint: sd.info} return details.ItemInfo{SharePoint: sd.info}
} }
func (sd *Item) ModTime() time.Time {
return sd.modTime
}
func (sc *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) { func (sc *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) {
close(sc.data) close(sc.data)
attempted := len(sc.jobs) attempted := len(sc.jobs)
@ -150,6 +158,11 @@ func (sc *Collection) populate(ctx context.Context) {
arrayLength = int64(len(byteArray)) arrayLength = int64(len(byteArray))
if arrayLength > 0 { if arrayLength > 0 {
t := time.Now()
if t1 := lst.GetLastModifiedDateTime(); t1 != nil {
t = *t1
}
totalBytes += arrayLength totalBytes += arrayLength
success++ success++
@ -157,6 +170,7 @@ func (sc *Collection) populate(ctx context.Context) {
id: *lst.GetId(), id: *lst.GetId(),
data: io.NopCloser(bytes.NewReader(byteArray)), data: io.NopCloser(bytes.NewReader(byteArray)),
info: sharePointListInfo(lst, arrayLength), info: sharePointListInfo(lst, arrayLength),
modTime: t,
} }
colProgress <- struct{}{} colProgress <- struct{}{}