Compare commits

...

3 Commits

Author SHA1 Message Date
Ashlie Martinez
5209e8be93 PoC for updated kopia iterator
Some small refinements like using the iterator during merging could be
done. This fixes streaming directory traversal.
2023-10-04 08:05:37 -07:00
Ashlie Martinez
deaa48f472 Use local fs for non-retention tests
Switch kopia package tests that don't require retention to use the local
file system for speed. Tests that do check retention settings require
S3.
2023-10-02 12:16:26 -07:00
Ashlie Martinez
933ce690ca Use separate config dirs for kopia
Fixes possible issues of opening the incorrect repo if tests are run in
parallel.
2023-10-02 12:16:10 -07:00
8 changed files with 389 additions and 288 deletions

View File

@ -2,7 +2,8 @@ module github.com/alcionai/corso/src
go 1.21 go 1.21
replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 //replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3
replace github.com/kopia/kopia => ./submodules/kopia
require ( require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1
@ -61,8 +62,8 @@ require (
github.com/subosito/gotenv v1.4.2 // indirect github.com/subosito/gotenv v1.4.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.48.0 // indirect github.com/valyala/fasthttp v1.48.0 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
) )
require ( require (
@ -84,7 +85,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect github.com/klauspost/pgzip v1.2.6 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect github.com/klauspost/reedsolomon v1.11.8 // indirect
@ -117,8 +118,8 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect github.com/zeebo/blake3 v0.2.3 // indirect
go.opentelemetry.io/otel v1.18.0 // indirect go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.18.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.13.0 // indirect golang.org/x/crypto v0.13.0 // indirect
golang.org/x/mod v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect
@ -126,7 +127,7 @@ require (
golang.org/x/sync v0.3.0 // indirect golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect
google.golang.org/grpc v1.57.0 // indirect google.golang.org/grpc v1.58.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect

View File

@ -57,8 +57,6 @@ github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpH
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8 h1:8KZyOE9IOxJ9Dg/n4pJKukcOxDAO3fxUcEP2MM0tNCg= github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8 h1:8KZyOE9IOxJ9Dg/n4pJKukcOxDAO3fxUcEP2MM0tNCg=
github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8/go.mod h1:iyJK9p061Zb1CqE+DFsofdjAc7/QTHGxdbJsf/mtVHo= github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8/go.mod h1:iyJK9p061Zb1CqE+DFsofdjAc7/QTHGxdbJsf/mtVHo=
github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 h1:6YjRGjEZr/Bmux1XkS13Re1m1LI7VAcbFsA3PiqO2BI=
github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3/go.mod h1:u5wAx1XN07PJsO1BLBkGicwSrbmAC1biONnumSCA210=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -246,8 +244,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
@ -449,12 +447,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ= go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@ -756,8 +754,8 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 h1:wukfNtZmZUurLN/atp2hiIeTKn7QJWIQdHzqmsOnAOk= google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 h1:N3bU/SQDCDyD6R528GJ/PwW9KjYcJA3dgyH+MovAkIM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:KSqppvjFjtoCI+KGd4PELB0qLNxdJHRGqRI09mB6pQA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -774,8 +772,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@ -22,6 +22,20 @@ import (
storeTD "github.com/alcionai/corso/src/pkg/storage/testdata" storeTD "github.com/alcionai/corso/src/pkg/storage/testdata"
) )
func openLocalKopiaRepo(
t tester.TestT,
ctx context.Context, //revive:disable-line:context-as-argument
) (*conn, error) {
st := storeTD.NewFilesystemStorage(t)
k := NewConn(st)
if err := k.Initialize(ctx, repository.Options{}, repository.Retention{}); err != nil {
return nil, err
}
return k, nil
}
func openKopiaRepo( func openKopiaRepo(
t tester.TestT, t tester.TestT,
ctx context.Context, //revive:disable-line:context-as-argument ctx context.Context, //revive:disable-line:context-as-argument
@ -81,7 +95,7 @@ func (suite *WrapperIntegrationSuite) TestRepoExistsError() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewFilesystemStorage(t)
k := NewConn(st) k := NewConn(st)
err := k.Initialize(ctx, repository.Options{}, repository.Retention{}) err := k.Initialize(ctx, repository.Options{}, repository.Retention{})
@ -101,7 +115,7 @@ func (suite *WrapperIntegrationSuite) TestBadProviderErrors() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewFilesystemStorage(t)
st.Provider = storage.ProviderUnknown st.Provider = storage.ProviderUnknown
k := NewConn(st) k := NewConn(st)
@ -115,7 +129,7 @@ func (suite *WrapperIntegrationSuite) TestConnectWithoutInitErrors() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewFilesystemStorage(t)
k := NewConn(st) k := NewConn(st)
err := k.Connect(ctx, repository.Options{}) err := k.Connect(ctx, repository.Options{})
@ -408,7 +422,7 @@ func (suite *WrapperIntegrationSuite) TestSetUserAndHost() {
Host: "bar", Host: "bar",
} }
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewFilesystemStorage(t)
k := NewConn(st) k := NewConn(st)
err := k.Initialize(ctx, opts, repository.Retention{}) err := k.Initialize(ctx, opts, repository.Retention{})

View File

@ -29,7 +29,7 @@ type fooModel struct {
//revive:disable-next-line:context-as-argument //revive:disable-next-line:context-as-argument
func getModelStore(t *testing.T, ctx context.Context) *ModelStore { func getModelStore(t *testing.T, ctx context.Context) *ModelStore {
c, err := openKopiaRepo(t, ctx) c, err := openLocalKopiaRepo(t, ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return &ModelStore{c: c, modelVersion: globalModelVersion} return &ModelStore{c: c, modelVersion: globalModelVersion}
@ -856,7 +856,7 @@ func openConnAndModelStore(
t *testing.T, t *testing.T,
ctx context.Context, //revive:disable-line:context-as-argument ctx context.Context, //revive:disable-line:context-as-argument
) (*conn, *ModelStore) { ) (*conn, *ModelStore) {
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewFilesystemStorage(t)
c := NewConn(st) c := NewConn(st)
err := c.Initialize(ctx, repository.Options{}, repository.Retention{}) err := c.Initialize(ctx, repository.Options{}, repository.Retention{})

View File

@ -20,7 +20,6 @@ import (
"github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/graph/metadata" "github.com/alcionai/corso/src/internal/m365/graph/metadata"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
@ -235,38 +234,77 @@ func (cp *corsoProgress) get(k string) *itemDetails {
return cp.pending[k] return cp.pending[k]
} }
func collectionEntries( // These define a small state machine as to which source to return an entry from
ctx context.Context, // next. Since these are in-memory only values we can use iota. All iterators
ctr func(context.Context, fs.Entry) error, // start by streaming static entries.
streamedEnts data.BackupCollection, //
progress *corsoProgress, // Since some phases require initialization of the underlying data source we
) (map[string]struct{}, error) { // insert additional phases to allow that. Once initialization is completed the
if streamedEnts == nil { // phase should be updated to the next phase.
return nil, nil //
} // A similar tactic can be used to handle tearing down resources for underlying
// data sources if needed.
const (
staticEntsPhase = iota
preStreamEntsPhase
streamEntsPhase
preBaseDirEntsPhase
baseDirEntsPhase
postBaseDirEntsPhase
terminationPhase
)
var ( type corsoDirectoryIterator struct {
params snapshotParams
staticEnts []fs.Entry
globalExcludeSet prefixmatcher.StringSetReader
progress *corsoProgress
// seenEnts contains the encoded names of entries that we've already streamed
// so we can skip returning them again when looking at base entries.
seenEnts map[string]struct{}
// locationPath contains the human-readable location of the underlying
// collection.
locationPath *path.Builder locationPath *path.Builder
// Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot.
seen = map[string]struct{}{}
items = streamedEnts.Items(ctx, progress.errs)
)
if lp, ok := streamedEnts.(data.LocationPather); ok { // excludeSet is the individual exclude set to use for the longest prefix for
locationPath = lp.LocationPath() // this iterator.
} excludeSet map[string]struct{}
// traversalPhase is the current state in the state machine.
traversalPhase int
// streamItemsChan contains the channel for the backing collection if there is
// one. Once the backing collection has been traversed this is set to nil.
streamItemsChan <-chan data.Item
// staticEntsIdx contains the index in staticEnts of the next item to be
// returned. Once all static entries have been traversed this is set to
// len(staticEnts).
staticEntsIdx int
// baseDirIter contains the handle to the iterator for the base directory
// found during hierarchy merging. Once all base directory entries have been
// traversed this is set to nil.
baseDirIter fs.DirectoryIterator
}
func (d *corsoDirectoryIterator) nextStreamEnt(
ctx context.Context,
) (fs.Entry, error) {
// Loop over results until we get something we can return. Required because we
// could see deleted items.
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return seen, clues.Stack(ctx.Err()).WithClues(ctx) return nil, clues.Stack(ctx.Err()).WithClues(ctx)
case e, ok := <-items: case e, ok := <-d.streamItemsChan:
// Channel was closed, no more entries to return.
if !ok { if !ok {
return seen, nil return nil, nil
} }
// Got an entry to process, see if it's a deletion marker or something to
// return to kopia.
encodedName := encodeAsPath(e.ID()) encodedName := encodeAsPath(e.ID())
// Even if this item has been deleted and should not appear at all in // Even if this item has been deleted and should not appear at all in
@ -282,13 +320,13 @@ func collectionEntries(
// items we need to track. Namely, we can track the created time of the // items we need to track. Namely, we can track the created time of the
// item and if it's after the base snapshot was finalized we can skip it // item and if it's after the base snapshot was finalized we can skip it
// because it's not possible for the base snapshot to contain that item. // because it's not possible for the base snapshot to contain that item.
seen[encodedName] = struct{}{} d.seenEnts[encodedName] = struct{}{}
// For now assuming that item IDs don't need escaping. // For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().AppendItem(e.ID()) itemPath, err := d.params.currentPath.AppendItem(e.ID())
if err != nil { if err != nil {
err = clues.Wrap(err, "getting full item path") err = clues.Wrap(err, "getting full item path")
progress.errs.AddRecoverable(ctx, err) d.progress.errs.AddRecoverable(ctx, err)
logger.CtxErr(ctx, err).Error("getting full item path") logger.CtxErr(ctx, err).Error("getting full item path")
@ -316,7 +354,7 @@ func collectionEntries(
// Relative path given to us in the callback is missing the root // Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race // element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed. // conditions when the item is completed.
d := &itemDetails{ deetsEnt := &itemDetails{
infoer: ei, infoer: ei,
repoPath: itemPath, repoPath: itemPath,
// Also use the current path as the previous path for this item. This // Also use the current path as the previous path for this item. This
@ -327,75 +365,47 @@ func collectionEntries(
// This all works out because cached item checks in kopia are direct // This all works out because cached item checks in kopia are direct
// path + metadata comparisons. // path + metadata comparisons.
prevPath: itemPath, prevPath: itemPath,
locationPath: locationPath, locationPath: d.locationPath,
modTime: &modTime, modTime: &modTime,
} }
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) d.progress.put(
encodeAsPath(itemPath.PopFront().Elements()...),
deetsEnt)
} }
entry := virtualfs.StreamingFileWithModTimeFromReader( return virtualfs.StreamingFileWithModTimeFromReader(
encodedName, encodedName,
modTime, modTime,
e.ToReader()) e.ToReader()), nil
err = ctr(ctx, entry)
if err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
return seen, clues.Wrap(err, "executing callback").
WithClues(ctx).
With("item_path", itemPath)
}
} }
} }
} }
func streamBaseEntries( func (d *corsoDirectoryIterator) nextBaseEnt(
ctx context.Context, ctx context.Context,
ctr func(context.Context, fs.Entry) error, ) (fs.Entry, error) {
params snapshotParams,
locationPath *path.Builder,
encodedSeen map[string]struct{},
globalExcludeSet prefixmatcher.StringSetReader,
progress *corsoProgress,
) error {
if params.baseDir == nil {
return nil
}
var ( var (
longest string entry fs.Entry
excludeSet map[string]struct{} err error
) )
if globalExcludeSet != nil { for entry, err = d.baseDirIter.Next(ctx); entry != nil && err == nil; entry, err = d.baseDirIter.Next(ctx) {
longest, excludeSet, _ = globalExcludeSet.LongestPrefix(
params.currentPath.String())
}
ctx = clues.Add(
ctx,
"current_directory_path", params.currentPath,
"longest_prefix", path.LoggableDir(longest))
err := params.baseDir.IterateEntries(
ctx,
func(innerCtx context.Context, entry fs.Entry) error {
if err := innerCtx.Err(); err != nil {
return clues.Stack(err).WithClues(ctx)
}
entName, err := decodeElement(entry.Name()) entName, err := decodeElement(entry.Name())
if err != nil { if err != nil {
return clues.Wrap(err, "decoding entry name"). err = clues.Wrap(err, "decoding entry name").
WithClues(ctx). WithClues(ctx).
With("entry_name", entry.Name()) With("entry_name", clues.Hide(entry.Name()))
d.progress.errs.AddRecoverable(ctx, err)
continue
} }
if d, ok := entry.(fs.Directory); ok { ctx = clues.Add(ctx, "entry_name", clues.Hide(entName))
if dir, ok := entry.(fs.Directory); ok {
// Don't walk subdirectories in this function. // Don't walk subdirectories in this function.
if !params.streamBaseEnts { if !d.params.streamBaseEnts {
return nil continue
} }
// Do walk subdirectories. The previous and current path of the // Do walk subdirectories. The previous and current path of the
@ -404,66 +414,78 @@ func streamBaseEntries(
// no BackupCollection associated with it (part of the criteria for // no BackupCollection associated with it (part of the criteria for
// allowing walking directories in this function) there shouldn't be any // allowing walking directories in this function) there shouldn't be any
// LocationPath information associated with the directory. // LocationPath information associated with the directory.
newP, err := params.currentPath.Append(false, entName) newP, err := d.params.currentPath.Append(false, entName)
if err != nil { if err != nil {
return clues.Wrap(err, "getting current directory path"). err = clues.Wrap(err, "getting current directory path").
WithClues(ctx) WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
} }
oldP, err := params.prevPath.Append(false, entName) ctx = clues.Add(ctx, "child_directory_path", newP)
oldP, err := d.params.prevPath.Append(false, entName)
if err != nil { if err != nil {
return clues.Wrap(err, "getting previous directory path"). err = clues.Wrap(err, "getting previous directory path").
WithClues(ctx) WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
} }
e := virtualfs.NewStreamingDirectory( return virtualfs.NewStreamingDirectory(
entry.Name(), entry.Name(),
getStreamItemFunc( &corsoDirectoryIterator{
snapshotParams{ params: snapshotParams{
currentPath: newP, currentPath: newP,
prevPath: oldP, prevPath: oldP,
collection: nil, collection: nil,
baseDir: d, baseDir: dir,
streamBaseEnts: params.streamBaseEnts, streamBaseEnts: d.params.streamBaseEnts,
}, },
nil, globalExcludeSet: d.globalExcludeSet,
globalExcludeSet, progress: d.progress,
progress)) }), nil
return clues.Wrap(ctr(ctx, e), "executing callback on subdirectory").
WithClues(ctx).
With("directory_path", newP).
OrNil()
} }
// This entry was either updated or deleted. In either case, the external // This entry was either updated or deleted. In either case, the external
// service notified us about it and it's already been handled so we should // service notified us about it and it's already been handled so we should
// skip it here. // skip it here.
if _, ok := encodedSeen[entry.Name()]; ok { if _, ok := d.seenEnts[entry.Name()]; ok {
return nil continue
} }
// This entry was marked as deleted by a service that can't tell us the // This entry was marked as deleted by a service that can't tell us the
// previous path of deleted items, only the item ID. // previous path of deleted items, only the item ID.
if _, ok := excludeSet[entName]; ok { if _, ok := d.excludeSet[entName]; ok {
return nil continue
} }
// For now assuming that item IDs don't need escaping. // This is a path used in corso not kopia so it doesn't need to encode the
itemPath, err := params.currentPath.AppendItem(entName) // item name.
itemPath, err := d.params.currentPath.AppendItem(entName)
if err != nil { if err != nil {
return clues.Wrap(err, "getting full item path for base entry"). err = clues.Wrap(err, "getting full item path for base entry").
WithClues(ctx) WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
} }
ctx = clues.Add(ctx, "item_path", itemPath)
// We need the previous path so we can find this item in the base snapshot's // We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be // backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what // unable to find it in the old backup details because we wouldn't know what
// to look for. // to look for.
prevItemPath, err := params.prevPath.AppendItem(entName) prevItemPath, err := d.params.prevPath.AppendItem(entName)
if err != nil { if err != nil {
return clues.Wrap(err, "getting previous full item path for base entry"). err = clues.Wrap(err, "getting previous full item path for base entry").
WithClues(ctx) WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
} }
// Meta files aren't in backup details since it's the set of items the // Meta files aren't in backup details since it's the set of items the
@ -477,79 +499,137 @@ func streamBaseEntries(
// sure we have enough metadata to find those entries. To do that we add // sure we have enough metadata to find those entries. To do that we add
// the item to progress and having progress aggregate everything for // the item to progress and having progress aggregate everything for
// later. // later.
d := &itemDetails{ detailsEnt := &itemDetails{
repoPath: itemPath, repoPath: itemPath,
prevPath: prevItemPath, prevPath: prevItemPath,
locationPath: locationPath, locationPath: d.locationPath,
modTime: ptr.To(entry.ModTime()), modTime: ptr.To(entry.ModTime()),
} }
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) d.progress.put(
encodeAsPath(itemPath.PopFront().Elements()...),
detailsEnt)
} }
if err := ctr(ctx, entry); err != nil { return entry, nil
return clues.Wrap(err, "executing callback on item").
WithClues(ctx).
With("item_path", itemPath)
} }
return nil return nil, clues.Stack(err).OrNil()
})
if err != nil {
return clues.Wrap(err, "traversing items in base snapshot directory").
WithClues(ctx)
}
return nil
} }
// getStreamItemFunc returns a function that can be used by kopia's func (d *corsoDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) {
// virtualfs.StreamingDirectory to iterate through directory entries and call // TODO(ashmrtn): Figure out the expected way to do error handling.
// kopia callbacks on directory entries. It binds the directory to the given // Execute the state machine until either:
// DataCollection. // * we get an entry to return
func getStreamItemFunc( // * we get an error to return
params snapshotParams, // * we exhaust all underlying data sources (end of iteration)
staticEnts []fs.Entry, //
globalExcludeSet prefixmatcher.StringSetReader, // Multiple executions of the state machine may be required for things like
progress *corsoProgress, // setting up underlying data sources or finding that there's no more entries
) func(context.Context, func(context.Context, fs.Entry) error) error { // in the current data source and needing to switch to the next one.
return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error { //
ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc") // Returned entries and errors are handled with inline return statements.
defer end() for d.traversalPhase != terminationPhase {
switch d.traversalPhase {
case staticEntsPhase:
if d.staticEntsIdx < len(d.staticEnts) {
ent := d.staticEnts[d.staticEntsIdx]
d.staticEntsIdx++
// Return static entries in this directory first. return ent, nil
for _, d := range staticEnts {
if err := ctr(ctx, d); err != nil {
return clues.Wrap(err, "executing callback on static directory").
WithClues(ctx)
}
} }
var locationPath *path.Builder d.traversalPhase = preStreamEntsPhase
if lp, ok := params.collection.(data.LocationPather); ok { case preStreamEntsPhase:
locationPath = lp.LocationPath() if d.params.collection != nil {
if lp, ok := d.params.collection.(data.LocationPather); ok {
d.locationPath = lp.LocationPath()
} }
seen, err := collectionEntries(ctx, ctr, params.collection, progress) d.streamItemsChan = d.params.collection.Items(ctx, d.progress.errs)
d.seenEnts = map[string]struct{}{}
d.traversalPhase = streamEntsPhase
} else {
d.traversalPhase = preBaseDirEntsPhase
}
case streamEntsPhase:
ent, err := d.nextStreamEnt(ctx)
if ent != nil {
return ent, nil
} else if err != nil {
// This assumes that once we hit an error we won't generate any more
// valid entries.
d.traversalPhase = preBaseDirEntsPhase
return nil, clues.Stack(err)
}
// Done iterating through stream entries, advance the state machine state.
d.traversalPhase = preBaseDirEntsPhase
case preBaseDirEntsPhase:
if d.params.baseDir != nil {
var err error
d.baseDirIter, err = d.params.baseDir.Iterate(ctx)
if err != nil { if err != nil {
return clues.Wrap(err, "streaming collection entries") // We have no iterator from which to pull entries, switch to the next
// state machine state.
d.traversalPhase = postBaseDirEntsPhase
return nil, clues.Wrap(err, "getting base directory iterator")
} }
if err := streamBaseEntries( if d.globalExcludeSet != nil {
ctx, longest, excludeSet, _ := d.globalExcludeSet.LongestPrefix(
ctr, d.params.currentPath.String())
params, d.excludeSet = excludeSet
locationPath,
seen, logger.Ctx(ctx).Debugw("found exclude set", "set_prefix", longest)
globalExcludeSet,
progress); err != nil {
return clues.Wrap(err, "streaming base snapshot entries")
} }
return nil d.traversalPhase = baseDirEntsPhase
} else {
// We have no iterator from which to pull entries, switch to the next
// state machine state.
d.traversalPhase = postBaseDirEntsPhase
} }
case baseDirEntsPhase:
ent, err := d.nextBaseEnt(ctx)
if ent != nil {
return ent, nil
} else if err != nil {
// This assumes that once we hit an error we won't generate any more
// valid entries.
d.traversalPhase = postBaseDirEntsPhase
return nil, clues.Stack(err)
}
// Done iterating through base entries, advance the state machine state.
d.traversalPhase = postBaseDirEntsPhase
case postBaseDirEntsPhase:
// Making a separate phase so adding additional phases after this one is
// less error prone if we ever need to do that.
if d.baseDirIter != nil {
d.baseDirIter.Close()
d.baseDirIter = nil
}
d.seenEnts = nil
d.excludeSet = nil
d.traversalPhase = terminationPhase
}
}
return nil, nil
} }
// Close releases any remaining resources the iterator may have at the end of
// iteration.
func (d *corsoDirectoryIterator) Close() {}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up. // buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory. // Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs( func buildKopiaDirs(
@ -586,11 +666,12 @@ func buildKopiaDirs(
return virtualfs.NewStreamingDirectory( return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName), encodeAsPath(dirName),
getStreamItemFunc( &corsoDirectoryIterator{
dir.snapshotParams, params: dir.snapshotParams,
childDirs, staticEnts: childDirs,
globalExcludeSet, globalExcludeSet: globalExcludeSet,
progress)), nil progress: progress,
}), nil
} }
type snapshotParams struct { type snapshotParams struct {
@ -958,7 +1039,10 @@ func traverseBaseDir(
var hasItems bool var hasItems bool
if changed { if changed {
err = dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error { err = fs.IterateEntries(
ctx,
dir,
func(innerCtx context.Context, entry fs.Entry) error {
dEntry, ok := entry.(fs.Directory) dEntry, ok := entry.(fs.Directory)
if !ok { if !ok {
hasItems = true hasItems = true

View File

@ -2553,12 +2553,13 @@ type mockStaticDirectory struct {
iterateCount int iterateCount int
} }
func (msd *mockStaticDirectory) IterateEntries( func (msd *mockStaticDirectory) Iterate(
ctx context.Context, ctx context.Context,
callback func(context.Context, fs.Entry) error, ) (fs.DirectoryIterator, error) {
) error {
msd.iterateCount++ msd.iterateCount++
return msd.Directory.IterateEntries(ctx, callback) iter, err := msd.Directory.Iterate(ctx)
return iter, clues.Stack(err).OrNil()
} }
func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreePruning() { func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreePruning() {

View File

@ -184,7 +184,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_FirstRun_NoChanges() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
k, err := openKopiaRepo(t, ctx) k, err := openLocalKopiaRepo(t, ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
w := &Wrapper{k} w := &Wrapper{k}
@ -204,7 +204,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
k, err := openKopiaRepo(t, ctx) k, err := openLocalKopiaRepo(t, ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
w := &Wrapper{k} w := &Wrapper{k}
@ -241,7 +241,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
k, err := openKopiaRepo(t, ctx) k, err := openLocalKopiaRepo(t, ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
w := &Wrapper{k} w := &Wrapper{k}
@ -754,7 +754,7 @@ func (suite *KopiaIntegrationSuite) SetupTest() {
t := suite.T() t := suite.T()
suite.ctx, suite.flush = tester.NewContext(t) suite.ctx, suite.flush = tester.NewContext(t)
c, err := openKopiaRepo(t, suite.ctx) c, err := openLocalKopiaRepo(t, suite.ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
suite.w = &Wrapper{c} suite.w = &Wrapper{c}
@ -1245,7 +1245,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
k, err := openKopiaRepo(t, ctx) k, err := openLocalKopiaRepo(t, ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
err = k.Compression(ctx, "s2-default") err = k.Compression(ctx, "s2-default")
@ -1559,7 +1559,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
//nolint:forbidigo //nolint:forbidigo
suite.ctx, _ = logger.CtxOrSeed(context.Background(), ls) suite.ctx, _ = logger.CtxOrSeed(context.Background(), ls)
c, err := openKopiaRepo(t, suite.ctx) c, err := openLocalKopiaRepo(t, suite.ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
suite.w = &Wrapper{c} suite.w = &Wrapper{c}

View File

@ -68,6 +68,9 @@ func NewFilesystemStorage(t tester.TestT) storage.Storage {
}, },
storage.CommonConfig{ storage.CommonConfig{
Corso: GetAndInsertCorso(""), Corso: GetAndInsertCorso(""),
// Use separate kopia configs for each instance. Place in a new folder to
// avoid mixing data.
KopiaCfgDir: t.TempDir(),
}) })
require.NoError(t, err, "creating storage", clues.ToCore(err)) require.NoError(t, err, "creating storage", clues.ToCore(err))