Export data from OneDrive (#3819)

Core logic for exporting data from OneDrive

Next: https://github.com/alcionai/corso/pull/3820

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/3670
* https://github.com/alcionai/corso/pull/3797

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abin Simon 2023-07-26 18:00:39 +05:30 committed by GitHub
parent 0175033a83
commit f6d44b3c60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1633 additions and 0 deletions

View File

@ -0,0 +1,99 @@
package archive
import (
"archive/zip"
"context"
"io"
"path"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/pkg/export"
)
const (
// ZipCopyBufferSize is the size of the copy buffer for zip
// write operations
// TODO(meain): tweak this value
ZipCopyBufferSize = 5 * 1024 * 1024
)
type zipCollection struct {
reader io.ReadCloser
}
func (z zipCollection) BasePath() string {
return ""
}
func (z zipCollection) Items(ctx context.Context) <-chan export.Item {
rc := make(chan export.Item, 1)
defer close(rc)
rc <- export.Item{
Data: export.ItemData{
Name: "Corso_Export_" + dttm.FormatNow(dttm.HumanReadable) + ".zip",
Body: z.reader,
},
}
return rc
}
// ZipExportCollection takes a list of export collections and zips
// them into a single collection.
func ZipExportCollection(
ctx context.Context,
expCollections []export.Collection,
) (export.Collection, error) {
if len(expCollections) == 0 {
return nil, clues.New("no export collections provided")
}
reader, writer := io.Pipe()
wr := zip.NewWriter(writer)
go func() {
defer writer.Close()
defer wr.Close()
buf := make([]byte, ZipCopyBufferSize)
for _, ec := range expCollections {
folder := ec.BasePath()
items := ec.Items(ctx)
for item := range items {
err := item.Error
if err != nil {
writer.CloseWithError(clues.Wrap(err, "getting export item").With("id", item.ID))
return
}
name := item.Data.Name
// We assume folder and name to not contain any path separators.
// Also, this should always use `/` as this is
// created within a zip file and not written to disk.
// TODO(meain): Exchange paths might contain a path
// separator and will have to have special handling.
//nolint:forbidigo
f, err := wr.Create(path.Join(folder, name))
if err != nil {
writer.CloseWithError(clues.Wrap(err, "creating zip entry").With("name", name).With("id", item.ID))
return
}
_, err = io.CopyBuffer(f, item.Data.Body, buf)
if err != nil {
writer.CloseWithError(clues.Wrap(err, "writing zip entry").With("name", name).With("id", item.ID))
return
}
}
}
}()
return zipCollection{reader}, nil
}

View File

@ -91,6 +91,11 @@ func (c NoFetchRestoreCollection) FetchItemByName(context.Context, string) (Stre
return nil, ErrNotFound
}
type FetchRestoreCollection struct {
Collection
FetchItemByNamer
}
// Stream represents a single item within a Collection
// that can be consumed as a stream (it embeds io.Reader)
type Stream interface {

View File

@ -35,6 +35,8 @@ const (
BackupEnd = "Backup End"
RestoreStart = "Restore Start"
RestoreEnd = "Restore End"
ExportStart = "Export Start"
ExportEnd = "Export End"
MaintenanceStart = "Maintenance Start"
MaintenanceEnd = "Maintenance End"
@ -49,6 +51,7 @@ const (
ItemsWritten = "items_written"
Resources = "resources"
RestoreID = "restore_id"
ExportID = "export_id"
Service = "service"
StartTime = "start_time"
Status = "status"

View File

@ -24,6 +24,7 @@ import (
var (
_ inject.BackupProducer = &Controller{}
_ inject.RestoreConsumer = &Controller{}
_ inject.ExportConsumer = &Controller{}
)
// Controller is a struct used to wrap the GraphServiceClient and

View File

@ -0,0 +1,61 @@
package m365
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/onedrive"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
)
// ExportRestoreCollections exports data from the specified collections
func (ctrl *Controller) ExportRestoreCollections(
ctx context.Context,
backupVersion int,
sels selectors.Selector,
exportCfg control.ExportConfig,
opts control.Options,
dcs []data.RestoreCollection,
errs *fault.Bus,
) ([]export.Collection, error) {
ctx, end := diagnostics.Span(ctx, "m365:export")
defer end()
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: sels.PathService()})
ctx = clues.Add(ctx, "export_config", exportCfg) // TODO(meain): needs PII control
var (
expCollections []export.Collection
status *support.ControllerOperationStatus
deets = &details.Builder{}
err error
)
switch sels.Service {
case selectors.ServiceOneDrive:
expCollections, err = onedrive.ExportRestoreCollections(
ctx,
backupVersion,
exportCfg,
opts,
dcs,
deets,
errs)
default:
err = clues.Wrap(clues.New(sels.Service.String()), "service not supported")
}
ctrl.incrementAwaitingMessages()
ctrl.UpdateStatus(status)
return expCollections, err
}

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -71,3 +72,15 @@ func (ctrl Controller) ConsumeRestoreCollections(
}
func (ctrl Controller) CacheItemInfo(dii details.ItemInfo) {}
func (ctrl Controller) ExportRestoreCollections(
_ context.Context,
_ int,
_ selectors.Selector,
_ control.ExportConfig,
_ control.Options,
_ []data.RestoreCollection,
_ *fault.Bus,
) ([]export.Collection, error) {
return nil, ctrl.Err
}

View File

@ -0,0 +1,166 @@
package onedrive
import (
"context"
"strings"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/onedrive/metadata"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
var _ export.Collection = &exportCollection{}
// exportCollection is the implementation of export.ExportCollection for OneDrive
type exportCollection struct {
// baseDir contains the path of the collection
baseDir string
// backingCollection is the restore collection from which we will
// create the export collection.
backingCollection data.RestoreCollection
// backupVersion is the backupVersion of the backup this collection was part
// of. This is required to figure out how to get the name of the
// item.
backupVersion int
}
func (ec exportCollection) BasePath() string {
return ec.baseDir
}
func (ec exportCollection) Items(ctx context.Context) <-chan export.Item {
ch := make(chan export.Item)
go items(ctx, ec, ch)
return ch
}
// items converts items in backing collection to export items
func items(ctx context.Context, ec exportCollection, ch chan<- export.Item) {
defer close(ch)
errs := fault.New(false)
// There will only be a single item in the backingCollections
// for OneDrive
for item := range ec.backingCollection.Items(ctx, errs) {
itemUUID := item.UUID()
if isMetadataFile(itemUUID, ec.backupVersion) {
continue
}
name, err := getItemName(ctx, itemUUID, ec.backupVersion, ec.backingCollection)
ch <- export.Item{
ID: itemUUID,
Data: export.ItemData{
Name: name,
Body: item.ToReader(),
},
Error: err,
}
}
eitems, erecovereable := errs.ItemsAndRecovered()
// Return all the items that we failed to get from kopia at the end
for _, err := range eitems {
ch <- export.Item{
ID: err.ID,
Error: &err,
}
}
for _, ec := range erecovereable {
ch <- export.Item{
Error: ec,
}
}
}
// isMetadataFile is used to determine if a path corresponds to a
// metadata file. This is OneDrive specific logic and depends on the
// version of the backup unlike metadata.IsMetadataFile which only has
// to be concerned about the current version.
func isMetadataFile(id string, backupVersion int) bool {
if backupVersion < version.OneDrive1DataAndMetaFiles {
return false
}
return strings.HasSuffix(id, metadata.MetaFileSuffix) ||
strings.HasSuffix(id, metadata.DirMetaFileSuffix)
}
// getItemName is used to get the name of the item.
// How we get the name depends on the version of the backup.
func getItemName(
ctx context.Context,
id string,
backupVersion int,
fin data.FetchItemByNamer,
) (string, error) {
if backupVersion < version.OneDrive1DataAndMetaFiles {
return id, nil
}
if backupVersion < version.OneDrive5DirMetaNoName {
return strings.TrimSuffix(id, metadata.DataFileSuffix), nil
}
if strings.HasSuffix(id, metadata.DataFileSuffix) {
trimmedName := strings.TrimSuffix(id, metadata.DataFileSuffix)
metaName := trimmedName + metadata.MetaFileSuffix
meta, err := fetchAndReadMetadata(ctx, fin, metaName)
if err != nil {
return "", clues.Wrap(err, "getting metadata").WithClues(ctx)
}
return meta.FileName, nil
}
return "", clues.New("invalid item id").WithClues(ctx)
}
// ExportRestoreCollections will create the export collections for the
// given restore collections.
func ExportRestoreCollections(
ctx context.Context,
backupVersion int,
exportCfg control.ExportConfig,
opts control.Options,
dcs []data.RestoreCollection,
deets *details.Builder,
errs *fault.Bus,
) ([]export.Collection, error) {
var (
el = errs.Local()
ec = make([]export.Collection, 0, len(dcs))
)
for _, dc := range dcs {
drivePath, err := path.ToDrivePath(dc.FullPath())
if err != nil {
return nil, clues.Wrap(err, "transforming path to drive path").WithClues(ctx)
}
baseDir := path.Builder{}.Append(drivePath.Folders...)
ec = append(ec, exportCollection{
baseDir: baseDir.String(),
backingCollection: dc,
backupVersion: backupVersion,
})
}
return ec, el.Failure()
}

View File

@ -0,0 +1,463 @@
package onedrive
import (
"bytes"
"context"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/data"
odConsts "github.com/alcionai/corso/src/internal/m365/onedrive/consts"
"github.com/alcionai/corso/src/internal/m365/onedrive/metadata"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
type ExportUnitSuite struct {
tester.Suite
}
func TestExportUnitSuite(t *testing.T) {
suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *ExportUnitSuite) TestIsMetadataFile() {
table := []struct {
name string
id string
backupVersion int
isMeta bool
}{
{
name: "legacy",
backupVersion: version.OneDrive1DataAndMetaFiles,
isMeta: false,
},
{
name: "metadata file",
backupVersion: version.OneDrive3IsMetaMarker,
id: "name" + metadata.MetaFileSuffix,
isMeta: true,
},
{
name: "dir metadata file",
backupVersion: version.OneDrive3IsMetaMarker,
id: "name" + metadata.DirMetaFileSuffix,
isMeta: true,
},
{
name: "non metadata file",
backupVersion: version.OneDrive3IsMetaMarker,
id: "name" + metadata.DataFileSuffix,
isMeta: false,
},
}
for _, test := range table {
suite.Run(test.name, func() {
assert.Equal(suite.T(), test.isMeta, isMetadataFile(test.id, test.backupVersion), "is metadata")
})
}
}
type metadataStream struct {
id string
name string
}
func (ms metadataStream) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewBufferString(`{"filename": "` + ms.name + `"}`))
}
func (ms metadataStream) UUID() string { return ms.id }
func (ms metadataStream) Deleted() bool { return false }
type finD struct {
id string
name string
err error
}
func (fd finD) FetchItemByName(ctx context.Context, name string) (data.Stream, error) {
if fd.err != nil {
return nil, fd.err
}
if name == fd.id {
return metadataStream{id: fd.id, name: fd.name}, nil
}
return nil, assert.AnError
}
func (suite *ExportUnitSuite) TestGetItemName() {
table := []struct {
tname string
id string
backupVersion int
name string
fin data.FetchItemByNamer
errFunc assert.ErrorAssertionFunc
}{
{
tname: "legacy",
id: "name",
backupVersion: version.OneDrive1DataAndMetaFiles,
name: "name",
errFunc: assert.NoError,
},
{
tname: "name in filename",
id: "name.data",
backupVersion: version.OneDrive4DirIncludesPermissions,
name: "name",
errFunc: assert.NoError,
},
{
tname: "name in metadata",
id: "id.data",
backupVersion: version.Backup,
name: "name",
fin: finD{id: "id.meta", name: "name"},
errFunc: assert.NoError,
},
{
tname: "name in metadata but error",
id: "id.data",
backupVersion: version.Backup,
name: "",
fin: finD{err: assert.AnError},
errFunc: assert.Error,
},
}
for _, test := range table {
suite.Run(test.tname, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
name, err := getItemName(
ctx,
test.id,
test.backupVersion,
test.fin,
)
test.errFunc(t, err)
assert.Equal(t, test.name, name, "name")
})
}
}
type mockRestoreCollection struct {
path path.Path
items []mockDataStream
}
func (rc mockRestoreCollection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Stream {
ch := make(chan data.Stream)
go func() {
defer close(ch)
el := errs.Local()
for _, item := range rc.items {
if item.err != nil {
el.AddRecoverable(ctx, item.err)
continue
}
ch <- item
}
}()
return ch
}
func (rc mockRestoreCollection) FullPath() path.Path {
return rc.path
}
type mockDataStream struct {
id string
data string
err error
}
func (ms mockDataStream) ToReader() io.ReadCloser {
if ms.data != "" {
return io.NopCloser(bytes.NewBufferString(ms.data))
}
return nil
}
func (ms mockDataStream) UUID() string { return ms.id }
func (ms mockDataStream) Deleted() bool { return false }
func (suite *ExportUnitSuite) TestGetItems() {
table := []struct {
name string
version int
backingCollection data.RestoreCollection
expectedItems []export.Item
}{
{
name: "single item",
version: 1,
backingCollection: data.NoFetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "name1", data: "body1"},
},
},
},
expectedItems: []export.Item{
{
ID: "name1",
Data: export.ItemData{
Name: "name1",
Body: io.NopCloser((bytes.NewBufferString("body1"))),
},
},
},
},
{
name: "multiple items",
version: 1,
backingCollection: data.NoFetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "name1", data: "body1"},
{id: "name2", data: "body2"},
},
},
},
expectedItems: []export.Item{
{
ID: "name1",
Data: export.ItemData{
Name: "name1",
Body: io.NopCloser((bytes.NewBufferString("body1"))),
},
},
{
ID: "name2",
Data: export.ItemData{
Name: "name2",
Body: io.NopCloser((bytes.NewBufferString("body2"))),
},
},
},
},
{
name: "single item with data suffix",
version: 2,
backingCollection: data.NoFetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "name1.data", data: "body1"},
},
},
},
expectedItems: []export.Item{
{
ID: "name1.data",
Data: export.ItemData{
Name: "name1",
Body: io.NopCloser((bytes.NewBufferString("body1"))),
},
},
},
},
{
name: "single item name from metadata",
version: version.Backup,
backingCollection: data.FetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "id1.data", data: "body1"},
},
},
FetchItemByNamer: finD{id: "id1.meta", name: "name1"},
},
expectedItems: []export.Item{
{
ID: "id1.data",
Data: export.ItemData{
Name: "name1",
Body: io.NopCloser((bytes.NewBufferString("body1"))),
},
},
},
},
{
name: "single item name from metadata with error",
version: version.Backup,
backingCollection: data.FetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "id1.data"},
},
},
FetchItemByNamer: finD{err: assert.AnError},
},
expectedItems: []export.Item{
{
ID: "id1.data",
Error: assert.AnError,
},
},
},
{
name: "items with success and metadata read error",
version: version.Backup,
backingCollection: data.FetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "missing.data"},
{id: "id1.data", data: "body1"},
},
},
FetchItemByNamer: finD{id: "id1.meta", name: "name1"},
},
expectedItems: []export.Item{
{
ID: "missing.data",
Error: assert.AnError,
},
{
ID: "id1.data",
Data: export.ItemData{
Name: "name1",
Body: io.NopCloser(bytes.NewBufferString("body1")),
},
},
},
},
{
name: "items with success and fetch error",
version: version.OneDrive1DataAndMetaFiles,
backingCollection: data.FetchRestoreCollection{
Collection: mockRestoreCollection{
items: []mockDataStream{
{id: "name0", data: "body0"},
{id: "name1", err: assert.AnError},
{id: "name2", data: "body2"},
},
},
},
expectedItems: []export.Item{
{
ID: "name0",
Data: export.ItemData{
Name: "name0",
Body: io.NopCloser(bytes.NewBufferString("body0")),
},
},
{
ID: "name2",
Data: export.ItemData{
Name: "name2",
Body: io.NopCloser(bytes.NewBufferString("body2")),
},
},
{
ID: "",
Error: assert.AnError,
},
},
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
ec := exportCollection{
baseDir: "",
backingCollection: test.backingCollection,
backupVersion: test.version,
}
items := ec.Items(ctx)
fitems := []export.Item{}
for item := range items {
fitems = append(fitems, item)
}
assert.Len(t, fitems, len(test.expectedItems), "num of items")
// We do not have any grantees about the ordering of the
// items in the SDK, but leaving the test this way for now
// to simplify testing.
for i, item := range fitems {
assert.Equal(t, test.expectedItems[i].ID, item.ID, "id")
assert.Equal(t, test.expectedItems[i].Data.Name, item.Data.Name, "name")
assert.Equal(t, test.expectedItems[i].Data.Body, item.Data.Body, "body")
assert.ErrorIs(t, item.Error, test.expectedItems[i].Error)
}
})
}
}
func (suite *ExportUnitSuite) TestExportRestoreCollections() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
dpb := odConsts.DriveFolderPrefixBuilder("driveID1")
p, err := dpb.ToDataLayerOneDrivePath("t", "u", false)
assert.NoError(t, err, "build path")
dcs := []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: mockRestoreCollection{
path: p,
items: []mockDataStream{
{id: "id1.data", data: "body1"},
},
},
FetchItemByNamer: finD{id: "id1.meta", name: "name1"},
},
}
expectedItems := []export.Item{
{
ID: "id1.data",
Data: export.ItemData{
Name: "name1",
Body: io.NopCloser((bytes.NewBufferString("body1"))),
},
},
}
exportCfg := control.ExportConfig{}
ecs, err := ExportRestoreCollections(ctx, int(version.Backup), exportCfg, control.Options{}, dcs, nil, fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
items := ecs[0].Items(ctx)
fitems := []export.Item{}
for item := range items {
fitems = append(fitems, item)
}
assert.Equal(t, expectedItems, fitems, "items")
}

View File

@ -40,6 +40,7 @@ const (
OpUnknown Operation = iota
Backup
Restore
Export
)
// Constructor for ConnectorOperationStatus. If the counts do not agree, an error is returned.

View File

@ -0,0 +1,359 @@
package operations
import (
"context"
"errors"
"fmt"
"time"
"github.com/alcionai/clues"
"github.com/google/uuid"
"github.com/alcionai/corso/src/internal/archive"
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/streamstore"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store"
)
const (
// CopyBufferSize is the size of the copy buffer for disk
// write operations
// TODO(meain): tweak this value
CopyBufferSize = 5 * 1024 * 1024
)
// ExportOperation wraps an operation with export-specific props.
type ExportOperation struct {
operation
BackupID model.StableID
Results RestoreResults
Selectors selectors.Selector
ExportCfg control.ExportConfig
Version string
acct account.Account
ec inject.ExportConsumer
}
// NewExportOperation constructs and validates a export operation.
func NewExportOperation(
ctx context.Context,
opts control.Options,
kw *kopia.Wrapper,
sw *store.Wrapper,
ec inject.ExportConsumer,
acct account.Account,
backupID model.StableID,
sel selectors.Selector,
exportCfg control.ExportConfig,
bus events.Eventer,
) (ExportOperation, error) {
op := ExportOperation{
operation: newOperation(opts, bus, count.New(), kw, sw),
acct: acct,
BackupID: backupID,
ExportCfg: exportCfg,
Selectors: sel,
Version: "v0",
ec: ec,
}
if err := op.validate(); err != nil {
return ExportOperation{}, err
}
return op, nil
}
func (op ExportOperation) validate() error {
if op.ec == nil {
return clues.New("missing export consumer")
}
return op.operation.validate()
}
// aggregates stats from the export.Run().
// primarily used so that the defer can take in a
// pointer wrapping the values, while those values
// get populated asynchronously.
type exportStats struct {
cs []data.RestoreCollection
ctrl *data.CollectionStats
bytesRead *stats.ByteCounter
resourceCount int
// a transient value only used to pair up start-end events.
exportID string
}
// Run begins a synchronous export operation.
func (op *ExportOperation) Run(ctx context.Context) (
expColl []export.Collection,
err error,
) {
defer func() {
if crErr := crash.Recovery(ctx, recover(), "export"); crErr != nil {
err = crErr
}
}()
var (
opStats = exportStats{
bytesRead: &stats.ByteCounter{},
exportID: uuid.NewString(),
}
start = time.Now()
sstore = streamstore.NewStreamer(op.kopia, op.acct.ID(), op.Selectors.PathService())
)
// -----
// Setup
// -----
ctx, end := diagnostics.Span(ctx, "operations:export:run")
defer func() {
end()
// wait for the progress display to clean up
observe.Complete()
}()
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
ctx = clues.Add(
ctx,
"tenant_id", clues.Hide(op.acct.ID()),
"backup_id", op.BackupID,
"service", op.Selectors.Service)
defer func() {
op.bus.Event(
ctx,
events.ExportEnd,
map[string]any{
events.BackupID: op.BackupID,
events.DataRetrieved: op.Results.BytesRead,
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
events.EndTime: dttm.Format(op.Results.CompletedAt),
events.ItemsRead: op.Results.ItemsRead,
events.ItemsWritten: op.Results.ItemsWritten,
events.Resources: op.Results.ResourceOwners,
events.ExportID: opStats.exportID,
events.Service: op.Selectors.Service.String(),
events.StartTime: dttm.Format(op.Results.StartedAt),
events.Status: op.Status.String(),
})
}()
// -----
// Execution
// -----
expCollections, err := op.do(ctx, &opStats, sstore, start)
if err != nil {
// No return here! We continue down to persistResults, even in case of failure.
logger.CtxErr(ctx, err).Error("running export")
if errors.Is(err, kopia.ErrNoRestorePath) {
op.Errors.Fail(clues.New("empty backup or unknown path provided"))
}
op.Errors.Fail(clues.Wrap(err, "running export"))
}
finalizeErrorHandling(ctx, op.Options, op.Errors, "running export")
LogFaultErrors(ctx, op.Errors.Errors(), "running export")
// -----
// Persistence
// -----
err = op.finalizeMetrics(ctx, start, &opStats)
if err != nil {
op.Errors.Fail(clues.Wrap(err, "finalizing export metrics"))
return nil, op.Errors.Failure()
}
logger.Ctx(ctx).Infow("completed export", "results", op.Results)
return expCollections, nil
}
func (op *ExportOperation) do(
ctx context.Context,
opStats *exportStats,
detailsStore streamstore.Reader,
start time.Time,
) ([]export.Collection, error) {
logger.Ctx(ctx).
With("control_options", op.Options, "selectors", op.Selectors).
Info("exporting selection")
bup, deets, err := getBackupAndDetailsFromID(
ctx,
op.BackupID,
op.store,
detailsStore,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "getting backup and details")
}
observe.Message(ctx, "Exporting", observe.Bullet, clues.Hide(bup.Selector.DiscreteOwner))
paths, err := formatDetailsForRestoration(ctx, bup.Version, op.Selectors, deets, op.ec, op.Errors)
if err != nil {
return nil, clues.Wrap(err, "formatting paths from details")
}
ctx = clues.Add(
ctx,
"resource_owner_id", bup.Selector.ID(),
"resource_owner_name", clues.Hide(bup.Selector.Name()),
"details_entries", len(deets.Entries),
"details_paths", len(paths),
"backup_snapshot_id", bup.SnapshotID,
"backup_version", bup.Version)
op.bus.Event(
ctx,
events.ExportStart,
map[string]any{
events.StartTime: start,
events.BackupID: op.BackupID,
events.BackupCreateTime: bup.CreationTime,
events.ExportID: opStats.exportID,
})
observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to export", len(paths), op.BackupID))
kopiaComplete := observe.MessageWithCompletion(ctx, "Enumerating items in repository")
defer close(kopiaComplete)
dcs, err := op.kopia.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors)
if err != nil {
return nil, clues.Wrap(err, "producing collections to export")
}
kopiaComplete <- struct{}{}
ctx = clues.Add(ctx, "coll_count", len(dcs))
// should always be 1, since backups are 1:1 with resourceOwners.
opStats.resourceCount = 1
opStats.cs = dcs
expCollections, err := exportRestoreCollections(
ctx,
op.ec,
bup.Version,
op.Selectors,
op.ExportCfg,
op.Options,
dcs,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "exporting collections")
}
opStats.ctrl = op.ec.Wait()
logger.Ctx(ctx).Debug(opStats.ctrl)
if op.ExportCfg.Archive {
zc, err := archive.ZipExportCollection(ctx, expCollections)
if err != nil {
return nil, clues.Wrap(err, "zipping export collections")
}
return []export.Collection{zc}, nil
}
return expCollections, nil
}
// persists details and statistics about the export operation.
func (op *ExportOperation) finalizeMetrics(
ctx context.Context,
started time.Time,
opStats *exportStats,
) error {
op.Results.StartedAt = started
op.Results.CompletedAt = time.Now()
op.Status = Completed
if op.Errors.Failure() != nil {
op.Status = Failed
}
op.Results.BytesRead = opStats.bytesRead.NumBytes
op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count
op.Results.ResourceOwners = opStats.resourceCount
if opStats.ctrl == nil {
op.Status = Failed
return clues.New("restoration never completed")
}
if op.Status != Failed && opStats.ctrl.IsZero() {
op.Status = NoData
}
// We don't have data on what all items were written
// op.Results.ItemsWritten = opStats.ctrl.Successes
return op.Errors.Failure()
}
// ---------------------------------------------------------------------------
// Exporter funcs
// ---------------------------------------------------------------------------
func exportRestoreCollections(
ctx context.Context,
ec inject.ExportConsumer,
backupVersion int,
sel selectors.Selector,
exportCfg control.ExportConfig,
opts control.Options,
dcs []data.RestoreCollection,
errs *fault.Bus,
) ([]export.Collection, error) {
complete := observe.MessageWithCompletion(ctx, "Preparing export")
defer func() {
complete <- struct{}{}
close(complete)
}()
expCollections, err := ec.ExportRestoreCollections(
ctx,
backupVersion,
sel,
exportCfg,
opts,
dcs,
errs)
if err != nil {
return nil, clues.Wrap(err, "exporting collections")
}
return expCollections, nil
}

View File

@ -0,0 +1,321 @@
package operations
import (
"archive/zip"
"bytes"
"context"
"io"
"strings"
"testing"
"time"
"github.com/alcionai/clues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/archive"
"github.com/alcionai/corso/src/internal/data"
evmock "github.com/alcionai/corso/src/internal/events/mock"
"github.com/alcionai/corso/src/internal/kopia"
exchMock "github.com/alcionai/corso/src/internal/m365/exchange/mock"
"github.com/alcionai/corso/src/internal/m365/mock"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store"
)
type ExportOpSuite struct {
tester.Suite
}
func TestExportOpSuite(t *testing.T) {
suite.Run(t, &ExportOpSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *ExportOpSuite) TestExportOperation_PersistResults() {
var (
kw = &kopia.Wrapper{}
sw = &store.Wrapper{}
ctrl = &mock.Controller{}
now = time.Now()
exportCfg = control.DefaultExportConfig()
)
table := []struct {
expectStatus OpStatus
expectErr assert.ErrorAssertionFunc
stats exportStats
fail error
}{
{
expectStatus: Completed,
expectErr: assert.NoError,
stats: exportStats{
resourceCount: 1,
bytesRead: &stats.ByteCounter{
NumBytes: 42,
},
cs: []data.RestoreCollection{
data.NoFetchRestoreCollection{
Collection: &exchMock.DataCollection{},
},
},
ctrl: &data.CollectionStats{
Objects: 1,
Successes: 1,
},
},
},
{
expectStatus: Failed,
expectErr: assert.Error,
fail: assert.AnError,
stats: exportStats{
bytesRead: &stats.ByteCounter{},
ctrl: &data.CollectionStats{},
},
},
{
expectStatus: NoData,
expectErr: assert.NoError,
stats: exportStats{
bytesRead: &stats.ByteCounter{},
cs: []data.RestoreCollection{},
ctrl: &data.CollectionStats{},
},
},
}
for _, test := range table {
suite.Run(test.expectStatus.String(), func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
op, err := NewExportOperation(
ctx,
control.Defaults(),
kw,
sw,
ctrl,
account.Account{},
"foo",
selectors.Selector{DiscreteOwner: "test"},
exportCfg,
evmock.NewBus())
require.NoError(t, err, clues.ToCore(err))
op.Errors.Fail(test.fail)
err = op.finalizeMetrics(ctx, now, &test.stats)
test.expectErr(t, err, clues.ToCore(err))
assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status")
assert.Equal(t, len(test.stats.cs), op.Results.ItemsRead, "items read")
assert.Equal(t, test.stats.bytesRead.NumBytes, op.Results.BytesRead, "resource owners")
assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners")
assert.Equal(t, now, op.Results.StartedAt, "started at")
assert.Less(t, now, op.Results.CompletedAt, "completed at")
})
}
}
type expCol struct {
base string
items []export.Item
}
func (ec expCol) BasePath() string { return ec.base }
func (ec expCol) Items(ctx context.Context) <-chan export.Item {
ch := make(chan export.Item)
go func() {
defer close(ch)
for _, item := range ec.items {
ch <- item
}
}()
return ch
}
// ReadSeekCloser implements io.ReadSeekCloser.
type ReadSeekCloser struct {
*bytes.Reader
}
// NewReadSeekCloser creates a new ReadSeekCloser from a byte slice.
func NewReadSeekCloser(byts []byte) *ReadSeekCloser {
return &ReadSeekCloser{
Reader: bytes.NewReader(byts),
}
}
// Close implements the io.Closer interface.
func (r *ReadSeekCloser) Close() error {
// Nothing to close for a byte slice.
return nil
}
func (suite *ExportOpSuite) TestZipExports() {
table := []struct {
name string
collection []export.Collection
shouldErr bool
readErr bool
}{
{
name: "nothing",
collection: []export.Collection{},
shouldErr: true,
},
{
name: "empty",
collection: []export.Collection{
expCol{
base: "",
items: []export.Item{},
},
},
},
{
name: "one item",
collection: []export.Collection{
expCol{
base: "",
items: []export.Item{
{
ID: "id1",
Data: export.ItemData{
Name: "test",
Body: NewReadSeekCloser([]byte("test")),
},
},
},
},
},
},
{
name: "multiple items",
collection: []export.Collection{
expCol{
base: "",
items: []export.Item{
{
ID: "id1",
Data: export.ItemData{
Name: "test",
Body: NewReadSeekCloser([]byte("test")),
},
},
},
},
expCol{
base: "/fold",
items: []export.Item{
{
ID: "id2",
Data: export.ItemData{
Name: "test2",
Body: NewReadSeekCloser([]byte("test2")),
},
},
},
},
},
},
{
name: "one item with err",
collection: []export.Collection{
expCol{
base: "",
items: []export.Item{
{
ID: "id3",
Error: assert.AnError,
},
},
},
},
readErr: true,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
zc, err := archive.ZipExportCollection(ctx, test.collection)
if test.shouldErr {
assert.Error(t, err, "error")
return
}
require.NoError(t, err, "error")
assert.Empty(t, zc.BasePath(), "base path")
zippedItems := []export.ItemData{}
count := 0
for item := range zc.Items(ctx) {
assert.True(t, strings.HasPrefix(item.Data.Name, "Corso_Export_"), "name prefix")
assert.True(t, strings.HasSuffix(item.Data.Name, ".zip"), "name suffix")
data, err := io.ReadAll(item.Data.Body)
if test.readErr {
assert.Error(t, err, "read error")
return
}
size := int64(len(data))
item.Data.Body.Close()
reader, err := zip.NewReader(bytes.NewReader(data), size)
require.NoError(t, err, "zip reader")
for _, f := range reader.File {
rc, err := f.Open()
assert.NoError(t, err, "open file in zip")
data, err := io.ReadAll(rc)
require.NoError(t, err, "read zip file content")
rc.Close()
zippedItems = append(zippedItems, export.ItemData{
Name: f.Name,
Body: NewReadSeekCloser([]byte(data)),
})
}
count++
}
assert.Equal(t, 1, count, "single item")
expectedZippedItems := []export.ItemData{}
for _, col := range test.collection {
for item := range col.Items(ctx) {
if col.BasePath() != "" {
item.Data.Name = strings.Join([]string{col.BasePath(), item.Data.Name}, "/")
}
_, err := item.Data.Body.(io.ReadSeeker).Seek(0, io.SeekStart)
require.NoError(t, err, "seek")
expectedZippedItems = append(expectedZippedItems, item.Data)
}
}
assert.Equal(t, expectedZippedItems, zippedItems, "items")
})
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -59,6 +60,22 @@ type (
CacheItemInfo(v details.ItemInfo)
}
ExportConsumer interface {
ExportRestoreCollections(
ctx context.Context,
backupVersion int,
selector selectors.Selector,
exportCfg control.ExportConfig,
opts control.Options,
dcs []data.RestoreCollection,
errs *fault.Bus,
) ([]export.Collection, error)
Wait() *data.CollectionStats
CacheItemInfoer
}
RepoMaintenancer interface {
RepoMaintenance(ctx context.Context, opts repository.Maintenance) error
}

22
src/pkg/control/export.go Normal file
View File

@ -0,0 +1,22 @@
package control
// ExportConfig contains config for exports
type ExportConfig struct {
// Archive decides if we should create an archive from the data
// instead of just returning all the files. If Archive is set to
// true, we return a single collection with a single file which is
// the archive.
Archive bool
// DataFormat decides the format in which we return the data. This is
// only useful for outlook exports, for example they can be in eml
// or pst for emails.
// TODO: Enable once we support outlook exports
// DataFormat string
}
func DefaultExportConfig() ExportConfig {
return ExportConfig{
Archive: false,
}
}

45
src/pkg/export/export.go Normal file
View File

@ -0,0 +1,45 @@
package export
import (
"context"
"io"
)
// Collection is the interface that is returned to the SDK consumer
type Collection interface {
// BasePath gets the base path of the collection
BasePath() string
// Items gets the items within the collection(folder)
Items(context.Context) <-chan Item
}
// ItemData is the data for an individual item.
type ItemData struct {
// Name is the name of the item. This is the name that the item
// would have had in the service.
Name string
// Body is the body of the item. This is an io.ReadCloser and the
// SDK consumer is responsible for closing it.
Body io.ReadCloser
}
// Item is the item that is returned to the SDK consumer
type Item struct {
// ID will be a unique id for the item. This is same as the id
// that is used to store the data. This is not the name and is
// mostly used just for tracking.
ID string
// Data contains the actual data of the item. It will have both
// the name of the item and an io.ReadCloser which contains the
// body of the item.
Data ItemData
// Error will contain any error that happened while trying to get
// the item/items like when trying to resolve the name of the item.
// In case we have the error bound to a particular item, we will
// also return the id of the item.
Error error
}

View File

@ -208,6 +208,32 @@ func (e *Bus) Errors() *Errors {
}
}
// ItemsAndRecovered returns the items that failed along with other
// recoverable errors
func (e *Bus) ItemsAndRecovered() ([]Item, []error) {
var (
is = map[string]Item{}
non = []error{}
)
for _, err := range e.recoverable {
var ie *Item
if !errors.As(err, &ie) {
non = append(non, err)
continue
}
is[ie.dedupeID()] = *ie
}
var ie *Item
if errors.As(e.failure, &ie) {
is[ie.dedupeID()] = *ie
}
return maps.Values(is), non
}
// ---------------------------------------------------------------------------
// Errors Data
// ---------------------------------------------------------------------------

View File

@ -74,6 +74,12 @@ type Repository interface {
sel selectors.Selector,
restoreCfg control.RestoreConfig,
) (operations.RestoreOperation, error)
NewExport(
ctx context.Context,
backupID string,
sel selectors.Selector,
exportCfg control.ExportConfig,
) (operations.ExportOperation, error)
NewMaintenance(
ctx context.Context,
mOpts rep.Maintenance,
@ -349,6 +355,31 @@ func (r repository) NewBackupWithLookup(
r.Bus)
}
// NewExport generates a exportOperation runner.
func (r repository) NewExport(
ctx context.Context,
backupID string,
sel selectors.Selector,
exportCfg control.ExportConfig,
) (operations.ExportOperation, error) {
ctrl, err := connectToM365(ctx, sel.PathService(), r.Account, r.Opts)
if err != nil {
return operations.ExportOperation{}, clues.Wrap(err, "connecting to m365")
}
return operations.NewExportOperation(
ctx,
r.Opts,
r.dataLayer,
store.NewKopiaStore(r.modelStore),
ctrl,
r.Account,
model.StableID(backupID),
sel,
exportCfg,
r.Bus)
}
// NewRestore generates a restoreOperation runner.
func (r repository) NewRestore(
ctx context.Context,