diff --git a/src/internal/archive/zip.go b/src/internal/archive/zip.go new file mode 100644 index 000000000..f3e02ad66 --- /dev/null +++ b/src/internal/archive/zip.go @@ -0,0 +1,99 @@ +package archive + +import ( + "archive/zip" + "context" + "io" + "path" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/common/dttm" + "github.com/alcionai/corso/src/pkg/export" +) + +const ( + // ZipCopyBufferSize is the size of the copy buffer for zip + // write operations + // TODO(meain): tweak this value + ZipCopyBufferSize = 5 * 1024 * 1024 +) + +type zipCollection struct { + reader io.ReadCloser +} + +func (z zipCollection) BasePath() string { + return "" +} + +func (z zipCollection) Items(ctx context.Context) <-chan export.Item { + rc := make(chan export.Item, 1) + defer close(rc) + + rc <- export.Item{ + Data: export.ItemData{ + Name: "Corso_Export_" + dttm.FormatNow(dttm.HumanReadable) + ".zip", + Body: z.reader, + }, + } + + return rc +} + +// ZipExportCollection takes a list of export collections and zips +// them into a single collection. +func ZipExportCollection( + ctx context.Context, + expCollections []export.Collection, +) (export.Collection, error) { + if len(expCollections) == 0 { + return nil, clues.New("no export collections provided") + } + + reader, writer := io.Pipe() + wr := zip.NewWriter(writer) + + go func() { + defer writer.Close() + defer wr.Close() + + buf := make([]byte, ZipCopyBufferSize) + + for _, ec := range expCollections { + folder := ec.BasePath() + items := ec.Items(ctx) + + for item := range items { + err := item.Error + if err != nil { + writer.CloseWithError(clues.Wrap(err, "getting export item").With("id", item.ID)) + return + } + + name := item.Data.Name + + // We assume folder and name to not contain any path separators. + // Also, this should always use `/` as this is + // created within a zip file and not written to disk. + // TODO(meain): Exchange paths might contain a path + // separator and will have to have special handling. + + //nolint:forbidigo + f, err := wr.Create(path.Join(folder, name)) + if err != nil { + writer.CloseWithError(clues.Wrap(err, "creating zip entry").With("name", name).With("id", item.ID)) + return + } + + _, err = io.CopyBuffer(f, item.Data.Body, buf) + if err != nil { + writer.CloseWithError(clues.Wrap(err, "writing zip entry").With("name", name).With("id", item.ID)) + return + } + } + } + }() + + return zipCollection{reader}, nil +} diff --git a/src/internal/data/data_collection.go b/src/internal/data/data_collection.go index b85e1e977..cec096783 100644 --- a/src/internal/data/data_collection.go +++ b/src/internal/data/data_collection.go @@ -91,6 +91,11 @@ func (c NoFetchRestoreCollection) FetchItemByName(context.Context, string) (Stre return nil, ErrNotFound } +type FetchRestoreCollection struct { + Collection + FetchItemByNamer +} + // Stream represents a single item within a Collection // that can be consumed as a stream (it embeds io.Reader) type Stream interface { diff --git a/src/internal/events/events.go b/src/internal/events/events.go index 1252052f7..99c1651ac 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -35,6 +35,8 @@ const ( BackupEnd = "Backup End" RestoreStart = "Restore Start" RestoreEnd = "Restore End" + ExportStart = "Export Start" + ExportEnd = "Export End" MaintenanceStart = "Maintenance Start" MaintenanceEnd = "Maintenance End" @@ -49,6 +51,7 @@ const ( ItemsWritten = "items_written" Resources = "resources" RestoreID = "restore_id" + ExportID = "export_id" Service = "service" StartTime = "start_time" Status = "status" diff --git a/src/internal/m365/controller.go b/src/internal/m365/controller.go index 4dd2c19e8..051ab5fb2 100644 --- a/src/internal/m365/controller.go +++ b/src/internal/m365/controller.go @@ -24,6 +24,7 @@ import ( var ( _ inject.BackupProducer = &Controller{} _ inject.RestoreConsumer = &Controller{} + _ inject.ExportConsumer = &Controller{} ) // Controller is a struct used to wrap the GraphServiceClient and diff --git a/src/internal/m365/export.go b/src/internal/m365/export.go new file mode 100644 index 000000000..3840d377b --- /dev/null +++ b/src/internal/m365/export.go @@ -0,0 +1,61 @@ +package m365 + +import ( + "context" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/internal/m365/onedrive" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/selectors" +) + +// ExportRestoreCollections exports data from the specified collections +func (ctrl *Controller) ExportRestoreCollections( + ctx context.Context, + backupVersion int, + sels selectors.Selector, + exportCfg control.ExportConfig, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, +) ([]export.Collection, error) { + ctx, end := diagnostics.Span(ctx, "m365:export") + defer end() + + ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: sels.PathService()}) + ctx = clues.Add(ctx, "export_config", exportCfg) // TODO(meain): needs PII control + + var ( + expCollections []export.Collection + status *support.ControllerOperationStatus + deets = &details.Builder{} + err error + ) + + switch sels.Service { + case selectors.ServiceOneDrive: + expCollections, err = onedrive.ExportRestoreCollections( + ctx, + backupVersion, + exportCfg, + opts, + dcs, + deets, + errs) + default: + err = clues.Wrap(clues.New(sels.Service.String()), "service not supported") + } + + ctrl.incrementAwaitingMessages() + ctrl.UpdateStatus(status) + + return expCollections, err +} diff --git a/src/internal/m365/mock/connector.go b/src/internal/m365/mock/connector.go index 977306883..2c2ece635 100644 --- a/src/internal/m365/mock/connector.go +++ b/src/internal/m365/mock/connector.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" + "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -71,3 +72,15 @@ func (ctrl Controller) ConsumeRestoreCollections( } func (ctrl Controller) CacheItemInfo(dii details.ItemInfo) {} + +func (ctrl Controller) ExportRestoreCollections( + _ context.Context, + _ int, + _ selectors.Selector, + _ control.ExportConfig, + _ control.Options, + _ []data.RestoreCollection, + _ *fault.Bus, +) ([]export.Collection, error) { + return nil, ctrl.Err +} diff --git a/src/internal/m365/onedrive/export.go b/src/internal/m365/onedrive/export.go new file mode 100644 index 000000000..bf68ede8d --- /dev/null +++ b/src/internal/m365/onedrive/export.go @@ -0,0 +1,166 @@ +package onedrive + +import ( + "context" + "strings" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/onedrive/metadata" + "github.com/alcionai/corso/src/internal/version" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +var _ export.Collection = &exportCollection{} + +// exportCollection is the implementation of export.ExportCollection for OneDrive +type exportCollection struct { + // baseDir contains the path of the collection + baseDir string + + // backingCollection is the restore collection from which we will + // create the export collection. + backingCollection data.RestoreCollection + + // backupVersion is the backupVersion of the backup this collection was part + // of. This is required to figure out how to get the name of the + // item. + backupVersion int +} + +func (ec exportCollection) BasePath() string { + return ec.baseDir +} + +func (ec exportCollection) Items(ctx context.Context) <-chan export.Item { + ch := make(chan export.Item) + go items(ctx, ec, ch) + + return ch +} + +// items converts items in backing collection to export items +func items(ctx context.Context, ec exportCollection, ch chan<- export.Item) { + defer close(ch) + + errs := fault.New(false) + + // There will only be a single item in the backingCollections + // for OneDrive + for item := range ec.backingCollection.Items(ctx, errs) { + itemUUID := item.UUID() + if isMetadataFile(itemUUID, ec.backupVersion) { + continue + } + + name, err := getItemName(ctx, itemUUID, ec.backupVersion, ec.backingCollection) + + ch <- export.Item{ + ID: itemUUID, + Data: export.ItemData{ + Name: name, + Body: item.ToReader(), + }, + Error: err, + } + } + + eitems, erecovereable := errs.ItemsAndRecovered() + + // Return all the items that we failed to get from kopia at the end + for _, err := range eitems { + ch <- export.Item{ + ID: err.ID, + Error: &err, + } + } + + for _, ec := range erecovereable { + ch <- export.Item{ + Error: ec, + } + } +} + +// isMetadataFile is used to determine if a path corresponds to a +// metadata file. This is OneDrive specific logic and depends on the +// version of the backup unlike metadata.IsMetadataFile which only has +// to be concerned about the current version. +func isMetadataFile(id string, backupVersion int) bool { + if backupVersion < version.OneDrive1DataAndMetaFiles { + return false + } + + return strings.HasSuffix(id, metadata.MetaFileSuffix) || + strings.HasSuffix(id, metadata.DirMetaFileSuffix) +} + +// getItemName is used to get the name of the item. +// How we get the name depends on the version of the backup. +func getItemName( + ctx context.Context, + id string, + backupVersion int, + fin data.FetchItemByNamer, +) (string, error) { + if backupVersion < version.OneDrive1DataAndMetaFiles { + return id, nil + } + + if backupVersion < version.OneDrive5DirMetaNoName { + return strings.TrimSuffix(id, metadata.DataFileSuffix), nil + } + + if strings.HasSuffix(id, metadata.DataFileSuffix) { + trimmedName := strings.TrimSuffix(id, metadata.DataFileSuffix) + metaName := trimmedName + metadata.MetaFileSuffix + + meta, err := fetchAndReadMetadata(ctx, fin, metaName) + if err != nil { + return "", clues.Wrap(err, "getting metadata").WithClues(ctx) + } + + return meta.FileName, nil + } + + return "", clues.New("invalid item id").WithClues(ctx) +} + +// ExportRestoreCollections will create the export collections for the +// given restore collections. +func ExportRestoreCollections( + ctx context.Context, + backupVersion int, + exportCfg control.ExportConfig, + opts control.Options, + dcs []data.RestoreCollection, + deets *details.Builder, + errs *fault.Bus, +) ([]export.Collection, error) { + var ( + el = errs.Local() + ec = make([]export.Collection, 0, len(dcs)) + ) + + for _, dc := range dcs { + drivePath, err := path.ToDrivePath(dc.FullPath()) + if err != nil { + return nil, clues.Wrap(err, "transforming path to drive path").WithClues(ctx) + } + + baseDir := path.Builder{}.Append(drivePath.Folders...) + + ec = append(ec, exportCollection{ + baseDir: baseDir.String(), + backingCollection: dc, + backupVersion: backupVersion, + }) + } + + return ec, el.Failure() +} diff --git a/src/internal/m365/onedrive/export_test.go b/src/internal/m365/onedrive/export_test.go new file mode 100644 index 000000000..b28b5f3a5 --- /dev/null +++ b/src/internal/m365/onedrive/export_test.go @@ -0,0 +1,463 @@ +package onedrive + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/data" + odConsts "github.com/alcionai/corso/src/internal/m365/onedrive/consts" + "github.com/alcionai/corso/src/internal/m365/onedrive/metadata" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/internal/version" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +type ExportUnitSuite struct { + tester.Suite +} + +func TestExportUnitSuite(t *testing.T) { + suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *ExportUnitSuite) TestIsMetadataFile() { + table := []struct { + name string + id string + backupVersion int + isMeta bool + }{ + { + name: "legacy", + backupVersion: version.OneDrive1DataAndMetaFiles, + isMeta: false, + }, + { + name: "metadata file", + backupVersion: version.OneDrive3IsMetaMarker, + id: "name" + metadata.MetaFileSuffix, + isMeta: true, + }, + { + name: "dir metadata file", + backupVersion: version.OneDrive3IsMetaMarker, + id: "name" + metadata.DirMetaFileSuffix, + isMeta: true, + }, + { + name: "non metadata file", + backupVersion: version.OneDrive3IsMetaMarker, + id: "name" + metadata.DataFileSuffix, + isMeta: false, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + assert.Equal(suite.T(), test.isMeta, isMetadataFile(test.id, test.backupVersion), "is metadata") + }) + } +} + +type metadataStream struct { + id string + name string +} + +func (ms metadataStream) ToReader() io.ReadCloser { + return io.NopCloser(bytes.NewBufferString(`{"filename": "` + ms.name + `"}`)) +} +func (ms metadataStream) UUID() string { return ms.id } +func (ms metadataStream) Deleted() bool { return false } + +type finD struct { + id string + name string + err error +} + +func (fd finD) FetchItemByName(ctx context.Context, name string) (data.Stream, error) { + if fd.err != nil { + return nil, fd.err + } + + if name == fd.id { + return metadataStream{id: fd.id, name: fd.name}, nil + } + + return nil, assert.AnError +} + +func (suite *ExportUnitSuite) TestGetItemName() { + table := []struct { + tname string + id string + backupVersion int + name string + fin data.FetchItemByNamer + errFunc assert.ErrorAssertionFunc + }{ + { + tname: "legacy", + id: "name", + backupVersion: version.OneDrive1DataAndMetaFiles, + name: "name", + errFunc: assert.NoError, + }, + { + tname: "name in filename", + id: "name.data", + backupVersion: version.OneDrive4DirIncludesPermissions, + name: "name", + errFunc: assert.NoError, + }, + { + tname: "name in metadata", + id: "id.data", + backupVersion: version.Backup, + name: "name", + fin: finD{id: "id.meta", name: "name"}, + errFunc: assert.NoError, + }, + { + tname: "name in metadata but error", + id: "id.data", + backupVersion: version.Backup, + name: "", + fin: finD{err: assert.AnError}, + errFunc: assert.Error, + }, + } + + for _, test := range table { + suite.Run(test.tname, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + name, err := getItemName( + ctx, + test.id, + test.backupVersion, + test.fin, + ) + test.errFunc(t, err) + + assert.Equal(t, test.name, name, "name") + }) + } +} + +type mockRestoreCollection struct { + path path.Path + items []mockDataStream +} + +func (rc mockRestoreCollection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Stream { + ch := make(chan data.Stream) + + go func() { + defer close(ch) + + el := errs.Local() + + for _, item := range rc.items { + if item.err != nil { + el.AddRecoverable(ctx, item.err) + continue + } + + ch <- item + } + }() + + return ch +} + +func (rc mockRestoreCollection) FullPath() path.Path { + return rc.path +} + +type mockDataStream struct { + id string + data string + err error +} + +func (ms mockDataStream) ToReader() io.ReadCloser { + if ms.data != "" { + return io.NopCloser(bytes.NewBufferString(ms.data)) + } + + return nil +} +func (ms mockDataStream) UUID() string { return ms.id } +func (ms mockDataStream) Deleted() bool { return false } + +func (suite *ExportUnitSuite) TestGetItems() { + table := []struct { + name string + version int + backingCollection data.RestoreCollection + expectedItems []export.Item + }{ + { + name: "single item", + version: 1, + backingCollection: data.NoFetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "name1", data: "body1"}, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "name1", + Data: export.ItemData{ + Name: "name1", + Body: io.NopCloser((bytes.NewBufferString("body1"))), + }, + }, + }, + }, + { + name: "multiple items", + version: 1, + backingCollection: data.NoFetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "name1", data: "body1"}, + {id: "name2", data: "body2"}, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "name1", + Data: export.ItemData{ + Name: "name1", + Body: io.NopCloser((bytes.NewBufferString("body1"))), + }, + }, + { + ID: "name2", + Data: export.ItemData{ + Name: "name2", + Body: io.NopCloser((bytes.NewBufferString("body2"))), + }, + }, + }, + }, + { + name: "single item with data suffix", + version: 2, + backingCollection: data.NoFetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "name1.data", data: "body1"}, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "name1.data", + Data: export.ItemData{ + Name: "name1", + Body: io.NopCloser((bytes.NewBufferString("body1"))), + }, + }, + }, + }, + { + name: "single item name from metadata", + version: version.Backup, + backingCollection: data.FetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "id1.data", data: "body1"}, + }, + }, + FetchItemByNamer: finD{id: "id1.meta", name: "name1"}, + }, + expectedItems: []export.Item{ + { + ID: "id1.data", + Data: export.ItemData{ + Name: "name1", + Body: io.NopCloser((bytes.NewBufferString("body1"))), + }, + }, + }, + }, + { + name: "single item name from metadata with error", + version: version.Backup, + backingCollection: data.FetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "id1.data"}, + }, + }, + FetchItemByNamer: finD{err: assert.AnError}, + }, + expectedItems: []export.Item{ + { + ID: "id1.data", + Error: assert.AnError, + }, + }, + }, + { + name: "items with success and metadata read error", + version: version.Backup, + backingCollection: data.FetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "missing.data"}, + {id: "id1.data", data: "body1"}, + }, + }, + FetchItemByNamer: finD{id: "id1.meta", name: "name1"}, + }, + expectedItems: []export.Item{ + { + ID: "missing.data", + Error: assert.AnError, + }, + { + ID: "id1.data", + Data: export.ItemData{ + Name: "name1", + Body: io.NopCloser(bytes.NewBufferString("body1")), + }, + }, + }, + }, + { + name: "items with success and fetch error", + version: version.OneDrive1DataAndMetaFiles, + backingCollection: data.FetchRestoreCollection{ + Collection: mockRestoreCollection{ + items: []mockDataStream{ + {id: "name0", data: "body0"}, + {id: "name1", err: assert.AnError}, + {id: "name2", data: "body2"}, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "name0", + Data: export.ItemData{ + Name: "name0", + Body: io.NopCloser(bytes.NewBufferString("body0")), + }, + }, + { + ID: "name2", + Data: export.ItemData{ + Name: "name2", + Body: io.NopCloser(bytes.NewBufferString("body2")), + }, + }, + { + ID: "", + Error: assert.AnError, + }, + }, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + ec := exportCollection{ + baseDir: "", + backingCollection: test.backingCollection, + backupVersion: test.version, + } + + items := ec.Items(ctx) + + fitems := []export.Item{} + for item := range items { + fitems = append(fitems, item) + } + + assert.Len(t, fitems, len(test.expectedItems), "num of items") + + // We do not have any grantees about the ordering of the + // items in the SDK, but leaving the test this way for now + // to simplify testing. + for i, item := range fitems { + assert.Equal(t, test.expectedItems[i].ID, item.ID, "id") + assert.Equal(t, test.expectedItems[i].Data.Name, item.Data.Name, "name") + assert.Equal(t, test.expectedItems[i].Data.Body, item.Data.Body, "body") + assert.ErrorIs(t, item.Error, test.expectedItems[i].Error) + } + }) + } +} + +func (suite *ExportUnitSuite) TestExportRestoreCollections() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + dpb := odConsts.DriveFolderPrefixBuilder("driveID1") + + p, err := dpb.ToDataLayerOneDrivePath("t", "u", false) + assert.NoError(t, err, "build path") + + dcs := []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: mockRestoreCollection{ + path: p, + items: []mockDataStream{ + {id: "id1.data", data: "body1"}, + }, + }, + FetchItemByNamer: finD{id: "id1.meta", name: "name1"}, + }, + } + + expectedItems := []export.Item{ + { + ID: "id1.data", + Data: export.ItemData{ + Name: "name1", + Body: io.NopCloser((bytes.NewBufferString("body1"))), + }, + }, + } + + exportCfg := control.ExportConfig{} + ecs, err := ExportRestoreCollections(ctx, int(version.Backup), exportCfg, control.Options{}, dcs, nil, fault.New(true)) + assert.NoError(t, err, "export collections error") + + assert.Len(t, ecs, 1, "num of collections") + + items := ecs[0].Items(ctx) + + fitems := []export.Item{} + for item := range items { + fitems = append(fitems, item) + } + + assert.Equal(t, expectedItems, fitems, "items") +} diff --git a/src/internal/m365/support/status.go b/src/internal/m365/support/status.go index f241909fe..b1a7d2449 100644 --- a/src/internal/m365/support/status.go +++ b/src/internal/m365/support/status.go @@ -40,6 +40,7 @@ const ( OpUnknown Operation = iota Backup Restore + Export ) // Constructor for ConnectorOperationStatus. If the counts do not agree, an error is returned. diff --git a/src/internal/operations/export.go b/src/internal/operations/export.go new file mode 100644 index 000000000..dc4823935 --- /dev/null +++ b/src/internal/operations/export.go @@ -0,0 +1,359 @@ +package operations + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/alcionai/clues" + "github.com/google/uuid" + + "github.com/alcionai/corso/src/internal/archive" + "github.com/alcionai/corso/src/internal/common/crash" + "github.com/alcionai/corso/src/internal/common/dttm" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/internal/events" + "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/internal/streamstore" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/store" +) + +const ( + // CopyBufferSize is the size of the copy buffer for disk + // write operations + // TODO(meain): tweak this value + CopyBufferSize = 5 * 1024 * 1024 +) + +// ExportOperation wraps an operation with export-specific props. +type ExportOperation struct { + operation + + BackupID model.StableID + Results RestoreResults + Selectors selectors.Selector + ExportCfg control.ExportConfig + Version string + + acct account.Account + ec inject.ExportConsumer +} + +// NewExportOperation constructs and validates a export operation. +func NewExportOperation( + ctx context.Context, + opts control.Options, + kw *kopia.Wrapper, + sw *store.Wrapper, + ec inject.ExportConsumer, + acct account.Account, + backupID model.StableID, + sel selectors.Selector, + exportCfg control.ExportConfig, + bus events.Eventer, +) (ExportOperation, error) { + op := ExportOperation{ + operation: newOperation(opts, bus, count.New(), kw, sw), + acct: acct, + BackupID: backupID, + ExportCfg: exportCfg, + Selectors: sel, + Version: "v0", + ec: ec, + } + if err := op.validate(); err != nil { + return ExportOperation{}, err + } + + return op, nil +} + +func (op ExportOperation) validate() error { + if op.ec == nil { + return clues.New("missing export consumer") + } + + return op.operation.validate() +} + +// aggregates stats from the export.Run(). +// primarily used so that the defer can take in a +// pointer wrapping the values, while those values +// get populated asynchronously. +type exportStats struct { + cs []data.RestoreCollection + ctrl *data.CollectionStats + bytesRead *stats.ByteCounter + resourceCount int + + // a transient value only used to pair up start-end events. + exportID string +} + +// Run begins a synchronous export operation. +func (op *ExportOperation) Run(ctx context.Context) ( + expColl []export.Collection, + err error, +) { + defer func() { + if crErr := crash.Recovery(ctx, recover(), "export"); crErr != nil { + err = crErr + } + }() + + var ( + opStats = exportStats{ + bytesRead: &stats.ByteCounter{}, + exportID: uuid.NewString(), + } + start = time.Now() + sstore = streamstore.NewStreamer(op.kopia, op.acct.ID(), op.Selectors.PathService()) + ) + + // ----- + // Setup + // ----- + + ctx, end := diagnostics.Span(ctx, "operations:export:run") + defer func() { + end() + // wait for the progress display to clean up + observe.Complete() + }() + + ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx}) + defer flushMetrics() + + ctx = clues.Add( + ctx, + "tenant_id", clues.Hide(op.acct.ID()), + "backup_id", op.BackupID, + "service", op.Selectors.Service) + + defer func() { + op.bus.Event( + ctx, + events.ExportEnd, + map[string]any{ + events.BackupID: op.BackupID, + events.DataRetrieved: op.Results.BytesRead, + events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt), + events.EndTime: dttm.Format(op.Results.CompletedAt), + events.ItemsRead: op.Results.ItemsRead, + events.ItemsWritten: op.Results.ItemsWritten, + events.Resources: op.Results.ResourceOwners, + events.ExportID: opStats.exportID, + events.Service: op.Selectors.Service.String(), + events.StartTime: dttm.Format(op.Results.StartedAt), + events.Status: op.Status.String(), + }) + }() + + // ----- + // Execution + // ----- + + expCollections, err := op.do(ctx, &opStats, sstore, start) + if err != nil { + // No return here! We continue down to persistResults, even in case of failure. + logger.CtxErr(ctx, err).Error("running export") + + if errors.Is(err, kopia.ErrNoRestorePath) { + op.Errors.Fail(clues.New("empty backup or unknown path provided")) + } + + op.Errors.Fail(clues.Wrap(err, "running export")) + } + + finalizeErrorHandling(ctx, op.Options, op.Errors, "running export") + LogFaultErrors(ctx, op.Errors.Errors(), "running export") + + // ----- + // Persistence + // ----- + + err = op.finalizeMetrics(ctx, start, &opStats) + if err != nil { + op.Errors.Fail(clues.Wrap(err, "finalizing export metrics")) + return nil, op.Errors.Failure() + } + + logger.Ctx(ctx).Infow("completed export", "results", op.Results) + + return expCollections, nil +} + +func (op *ExportOperation) do( + ctx context.Context, + opStats *exportStats, + detailsStore streamstore.Reader, + start time.Time, +) ([]export.Collection, error) { + logger.Ctx(ctx). + With("control_options", op.Options, "selectors", op.Selectors). + Info("exporting selection") + + bup, deets, err := getBackupAndDetailsFromID( + ctx, + op.BackupID, + op.store, + detailsStore, + op.Errors) + if err != nil { + return nil, clues.Wrap(err, "getting backup and details") + } + + observe.Message(ctx, "Exporting", observe.Bullet, clues.Hide(bup.Selector.DiscreteOwner)) + + paths, err := formatDetailsForRestoration(ctx, bup.Version, op.Selectors, deets, op.ec, op.Errors) + if err != nil { + return nil, clues.Wrap(err, "formatting paths from details") + } + + ctx = clues.Add( + ctx, + "resource_owner_id", bup.Selector.ID(), + "resource_owner_name", clues.Hide(bup.Selector.Name()), + "details_entries", len(deets.Entries), + "details_paths", len(paths), + "backup_snapshot_id", bup.SnapshotID, + "backup_version", bup.Version) + + op.bus.Event( + ctx, + events.ExportStart, + map[string]any{ + events.StartTime: start, + events.BackupID: op.BackupID, + events.BackupCreateTime: bup.CreationTime, + events.ExportID: opStats.exportID, + }) + + observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to export", len(paths), op.BackupID)) + + kopiaComplete := observe.MessageWithCompletion(ctx, "Enumerating items in repository") + defer close(kopiaComplete) + + dcs, err := op.kopia.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors) + if err != nil { + return nil, clues.Wrap(err, "producing collections to export") + } + + kopiaComplete <- struct{}{} + + ctx = clues.Add(ctx, "coll_count", len(dcs)) + + // should always be 1, since backups are 1:1 with resourceOwners. + opStats.resourceCount = 1 + opStats.cs = dcs + + expCollections, err := exportRestoreCollections( + ctx, + op.ec, + bup.Version, + op.Selectors, + op.ExportCfg, + op.Options, + dcs, + op.Errors) + if err != nil { + return nil, clues.Wrap(err, "exporting collections") + } + + opStats.ctrl = op.ec.Wait() + + logger.Ctx(ctx).Debug(opStats.ctrl) + + if op.ExportCfg.Archive { + zc, err := archive.ZipExportCollection(ctx, expCollections) + if err != nil { + return nil, clues.Wrap(err, "zipping export collections") + } + + return []export.Collection{zc}, nil + } + + return expCollections, nil +} + +// persists details and statistics about the export operation. +func (op *ExportOperation) finalizeMetrics( + ctx context.Context, + started time.Time, + opStats *exportStats, +) error { + op.Results.StartedAt = started + op.Results.CompletedAt = time.Now() + + op.Status = Completed + + if op.Errors.Failure() != nil { + op.Status = Failed + } + + op.Results.BytesRead = opStats.bytesRead.NumBytes + op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count + op.Results.ResourceOwners = opStats.resourceCount + + if opStats.ctrl == nil { + op.Status = Failed + return clues.New("restoration never completed") + } + + if op.Status != Failed && opStats.ctrl.IsZero() { + op.Status = NoData + } + + // We don't have data on what all items were written + // op.Results.ItemsWritten = opStats.ctrl.Successes + + return op.Errors.Failure() +} + +// --------------------------------------------------------------------------- +// Exporter funcs +// --------------------------------------------------------------------------- + +func exportRestoreCollections( + ctx context.Context, + ec inject.ExportConsumer, + backupVersion int, + sel selectors.Selector, + exportCfg control.ExportConfig, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, +) ([]export.Collection, error) { + complete := observe.MessageWithCompletion(ctx, "Preparing export") + defer func() { + complete <- struct{}{} + close(complete) + }() + + expCollections, err := ec.ExportRestoreCollections( + ctx, + backupVersion, + sel, + exportCfg, + opts, + dcs, + errs) + if err != nil { + return nil, clues.Wrap(err, "exporting collections") + } + + return expCollections, nil +} diff --git a/src/internal/operations/export_test.go b/src/internal/operations/export_test.go new file mode 100644 index 000000000..c81114da9 --- /dev/null +++ b/src/internal/operations/export_test.go @@ -0,0 +1,321 @@ +package operations + +import ( + "archive/zip" + "bytes" + "context" + "io" + "strings" + "testing" + "time" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/archive" + "github.com/alcionai/corso/src/internal/data" + evmock "github.com/alcionai/corso/src/internal/events/mock" + "github.com/alcionai/corso/src/internal/kopia" + exchMock "github.com/alcionai/corso/src/internal/m365/exchange/mock" + "github.com/alcionai/corso/src/internal/m365/mock" + "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/store" +) + +type ExportOpSuite struct { + tester.Suite +} + +func TestExportOpSuite(t *testing.T) { + suite.Run(t, &ExportOpSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *ExportOpSuite) TestExportOperation_PersistResults() { + var ( + kw = &kopia.Wrapper{} + sw = &store.Wrapper{} + ctrl = &mock.Controller{} + now = time.Now() + exportCfg = control.DefaultExportConfig() + ) + + table := []struct { + expectStatus OpStatus + expectErr assert.ErrorAssertionFunc + stats exportStats + fail error + }{ + { + expectStatus: Completed, + expectErr: assert.NoError, + stats: exportStats{ + resourceCount: 1, + bytesRead: &stats.ByteCounter{ + NumBytes: 42, + }, + cs: []data.RestoreCollection{ + data.NoFetchRestoreCollection{ + Collection: &exchMock.DataCollection{}, + }, + }, + ctrl: &data.CollectionStats{ + Objects: 1, + Successes: 1, + }, + }, + }, + { + expectStatus: Failed, + expectErr: assert.Error, + fail: assert.AnError, + stats: exportStats{ + bytesRead: &stats.ByteCounter{}, + ctrl: &data.CollectionStats{}, + }, + }, + { + expectStatus: NoData, + expectErr: assert.NoError, + stats: exportStats{ + bytesRead: &stats.ByteCounter{}, + cs: []data.RestoreCollection{}, + ctrl: &data.CollectionStats{}, + }, + }, + } + for _, test := range table { + suite.Run(test.expectStatus.String(), func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + op, err := NewExportOperation( + ctx, + control.Defaults(), + kw, + sw, + ctrl, + account.Account{}, + "foo", + selectors.Selector{DiscreteOwner: "test"}, + exportCfg, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + op.Errors.Fail(test.fail) + + err = op.finalizeMetrics(ctx, now, &test.stats) + test.expectErr(t, err, clues.ToCore(err)) + + assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status") + assert.Equal(t, len(test.stats.cs), op.Results.ItemsRead, "items read") + assert.Equal(t, test.stats.bytesRead.NumBytes, op.Results.BytesRead, "resource owners") + assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners") + assert.Equal(t, now, op.Results.StartedAt, "started at") + assert.Less(t, now, op.Results.CompletedAt, "completed at") + }) + } +} + +type expCol struct { + base string + items []export.Item +} + +func (ec expCol) BasePath() string { return ec.base } +func (ec expCol) Items(ctx context.Context) <-chan export.Item { + ch := make(chan export.Item) + + go func() { + defer close(ch) + + for _, item := range ec.items { + ch <- item + } + }() + + return ch +} + +// ReadSeekCloser implements io.ReadSeekCloser. +type ReadSeekCloser struct { + *bytes.Reader +} + +// NewReadSeekCloser creates a new ReadSeekCloser from a byte slice. +func NewReadSeekCloser(byts []byte) *ReadSeekCloser { + return &ReadSeekCloser{ + Reader: bytes.NewReader(byts), + } +} + +// Close implements the io.Closer interface. +func (r *ReadSeekCloser) Close() error { + // Nothing to close for a byte slice. + return nil +} + +func (suite *ExportOpSuite) TestZipExports() { + table := []struct { + name string + collection []export.Collection + shouldErr bool + readErr bool + }{ + { + name: "nothing", + collection: []export.Collection{}, + shouldErr: true, + }, + { + name: "empty", + collection: []export.Collection{ + expCol{ + base: "", + items: []export.Item{}, + }, + }, + }, + { + name: "one item", + collection: []export.Collection{ + expCol{ + base: "", + items: []export.Item{ + { + ID: "id1", + Data: export.ItemData{ + Name: "test", + Body: NewReadSeekCloser([]byte("test")), + }, + }, + }, + }, + }, + }, + { + name: "multiple items", + collection: []export.Collection{ + expCol{ + base: "", + items: []export.Item{ + { + ID: "id1", + Data: export.ItemData{ + Name: "test", + Body: NewReadSeekCloser([]byte("test")), + }, + }, + }, + }, + expCol{ + base: "/fold", + items: []export.Item{ + { + ID: "id2", + Data: export.ItemData{ + Name: "test2", + Body: NewReadSeekCloser([]byte("test2")), + }, + }, + }, + }, + }, + }, + { + name: "one item with err", + collection: []export.Collection{ + expCol{ + base: "", + items: []export.Item{ + { + ID: "id3", + Error: assert.AnError, + }, + }, + }, + }, + readErr: true, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + zc, err := archive.ZipExportCollection(ctx, test.collection) + + if test.shouldErr { + assert.Error(t, err, "error") + return + } + + require.NoError(t, err, "error") + assert.Empty(t, zc.BasePath(), "base path") + + zippedItems := []export.ItemData{} + + count := 0 + for item := range zc.Items(ctx) { + assert.True(t, strings.HasPrefix(item.Data.Name, "Corso_Export_"), "name prefix") + assert.True(t, strings.HasSuffix(item.Data.Name, ".zip"), "name suffix") + + data, err := io.ReadAll(item.Data.Body) + if test.readErr { + assert.Error(t, err, "read error") + return + } + + size := int64(len(data)) + + item.Data.Body.Close() + + reader, err := zip.NewReader(bytes.NewReader(data), size) + require.NoError(t, err, "zip reader") + + for _, f := range reader.File { + rc, err := f.Open() + assert.NoError(t, err, "open file in zip") + + data, err := io.ReadAll(rc) + require.NoError(t, err, "read zip file content") + + rc.Close() + + zippedItems = append(zippedItems, export.ItemData{ + Name: f.Name, + Body: NewReadSeekCloser([]byte(data)), + }) + } + + count++ + } + + assert.Equal(t, 1, count, "single item") + + expectedZippedItems := []export.ItemData{} + for _, col := range test.collection { + for item := range col.Items(ctx) { + if col.BasePath() != "" { + item.Data.Name = strings.Join([]string{col.BasePath(), item.Data.Name}, "/") + } + _, err := item.Data.Body.(io.ReadSeeker).Seek(0, io.SeekStart) + require.NoError(t, err, "seek") + expectedZippedItems = append(expectedZippedItems, item.Data) + } + } + assert.Equal(t, expectedZippedItems, zippedItems, "items") + }) + } +} diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go index 912b46743..5f6da9230 100644 --- a/src/internal/operations/inject/inject.go +++ b/src/internal/operations/inject/inject.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/repository" "github.com/alcionai/corso/src/pkg/count" + "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -59,6 +60,22 @@ type ( CacheItemInfo(v details.ItemInfo) } + ExportConsumer interface { + ExportRestoreCollections( + ctx context.Context, + backupVersion int, + selector selectors.Selector, + exportCfg control.ExportConfig, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, + ) ([]export.Collection, error) + + Wait() *data.CollectionStats + + CacheItemInfoer + } + RepoMaintenancer interface { RepoMaintenance(ctx context.Context, opts repository.Maintenance) error } diff --git a/src/pkg/control/export.go b/src/pkg/control/export.go new file mode 100644 index 000000000..e58633a5f --- /dev/null +++ b/src/pkg/control/export.go @@ -0,0 +1,22 @@ +package control + +// ExportConfig contains config for exports +type ExportConfig struct { + // Archive decides if we should create an archive from the data + // instead of just returning all the files. If Archive is set to + // true, we return a single collection with a single file which is + // the archive. + Archive bool + + // DataFormat decides the format in which we return the data. This is + // only useful for outlook exports, for example they can be in eml + // or pst for emails. + // TODO: Enable once we support outlook exports + // DataFormat string +} + +func DefaultExportConfig() ExportConfig { + return ExportConfig{ + Archive: false, + } +} diff --git a/src/pkg/export/export.go b/src/pkg/export/export.go new file mode 100644 index 000000000..76a6b6d8b --- /dev/null +++ b/src/pkg/export/export.go @@ -0,0 +1,45 @@ +package export + +import ( + "context" + "io" +) + +// Collection is the interface that is returned to the SDK consumer +type Collection interface { + // BasePath gets the base path of the collection + BasePath() string + + // Items gets the items within the collection(folder) + Items(context.Context) <-chan Item +} + +// ItemData is the data for an individual item. +type ItemData struct { + // Name is the name of the item. This is the name that the item + // would have had in the service. + Name string + + // Body is the body of the item. This is an io.ReadCloser and the + // SDK consumer is responsible for closing it. + Body io.ReadCloser +} + +// Item is the item that is returned to the SDK consumer +type Item struct { + // ID will be a unique id for the item. This is same as the id + // that is used to store the data. This is not the name and is + // mostly used just for tracking. + ID string + + // Data contains the actual data of the item. It will have both + // the name of the item and an io.ReadCloser which contains the + // body of the item. + Data ItemData + + // Error will contain any error that happened while trying to get + // the item/items like when trying to resolve the name of the item. + // In case we have the error bound to a particular item, we will + // also return the id of the item. + Error error +} diff --git a/src/pkg/fault/fault.go b/src/pkg/fault/fault.go index 97cc0bae3..488656fa4 100644 --- a/src/pkg/fault/fault.go +++ b/src/pkg/fault/fault.go @@ -208,6 +208,32 @@ func (e *Bus) Errors() *Errors { } } +// ItemsAndRecovered returns the items that failed along with other +// recoverable errors +func (e *Bus) ItemsAndRecovered() ([]Item, []error) { + var ( + is = map[string]Item{} + non = []error{} + ) + + for _, err := range e.recoverable { + var ie *Item + if !errors.As(err, &ie) { + non = append(non, err) + continue + } + + is[ie.dedupeID()] = *ie + } + + var ie *Item + if errors.As(e.failure, &ie) { + is[ie.dedupeID()] = *ie + } + + return maps.Values(is), non +} + // --------------------------------------------------------------------------- // Errors Data // --------------------------------------------------------------------------- diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 8a8abb33e..1e344bc7a 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -74,6 +74,12 @@ type Repository interface { sel selectors.Selector, restoreCfg control.RestoreConfig, ) (operations.RestoreOperation, error) + NewExport( + ctx context.Context, + backupID string, + sel selectors.Selector, + exportCfg control.ExportConfig, + ) (operations.ExportOperation, error) NewMaintenance( ctx context.Context, mOpts rep.Maintenance, @@ -349,6 +355,31 @@ func (r repository) NewBackupWithLookup( r.Bus) } +// NewExport generates a exportOperation runner. +func (r repository) NewExport( + ctx context.Context, + backupID string, + sel selectors.Selector, + exportCfg control.ExportConfig, +) (operations.ExportOperation, error) { + ctrl, err := connectToM365(ctx, sel.PathService(), r.Account, r.Opts) + if err != nil { + return operations.ExportOperation{}, clues.Wrap(err, "connecting to m365") + } + + return operations.NewExportOperation( + ctx, + r.Opts, + r.dataLayer, + store.NewKopiaStore(r.modelStore), + ctrl, + r.Account, + model.StableID(backupID), + sel, + exportCfg, + r.Bus) +} + // NewRestore generates a restoreOperation runner. func (r repository) NewRestore( ctx context.Context,