Compare commits
11 Commits
main
...
rl_slider_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aef9d14b32 | ||
|
|
a5a2152cf1 | ||
|
|
b28392d743 | ||
|
|
e53b0303dd | ||
|
|
09ca1b4ae9 | ||
|
|
cb31e00cdf | ||
|
|
4f6e60558a | ||
|
|
e7fa269532 | ||
|
|
969d1fb6b6 | ||
|
|
110b477346 | ||
|
|
07740938fb |
@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased] (beta)
|
## [Unreleased] (beta)
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- Change file extension of messages export to json to match the content
|
||||||
|
|
||||||
## [v0.15.0] (beta) - 2023-10-31
|
## [v0.15.0] (beta) - 2023-10-31
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
@ -34,6 +34,7 @@ require (
|
|||||||
github.com/tidwall/pretty v1.2.1
|
github.com/tidwall/pretty v1.2.1
|
||||||
github.com/tomlazar/table v0.1.2
|
github.com/tomlazar/table v0.1.2
|
||||||
github.com/vbauerster/mpb/v8 v8.1.6
|
github.com/vbauerster/mpb/v8 v8.1.6
|
||||||
|
go.uber.org/goleak v1.3.0
|
||||||
go.uber.org/zap v1.26.0
|
go.uber.org/zap v1.26.0
|
||||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
|
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
|
||||||
golang.org/x/time v0.3.0
|
golang.org/x/time v0.3.0
|
||||||
|
|||||||
@ -468,8 +468,8 @@ go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPi
|
|||||||
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
|
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
|
||||||
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
|
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
|
||||||
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
|
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
|
||||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||||
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
|
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
|
||||||
|
|||||||
8
src/internal/common/limiters/limiter.go
Normal file
8
src/internal/common/limiters/limiter.go
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
package limiters
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type Limiter interface {
|
||||||
|
Wait(ctx context.Context) error
|
||||||
|
Shutdown()
|
||||||
|
}
|
||||||
197
src/internal/common/limiters/sliding_window.go
Normal file
197
src/internal/common/limiters/sliding_window.go
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
package limiters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
|
)
|
||||||
|
|
||||||
|
type token struct{}
|
||||||
|
|
||||||
|
type fixedWindow struct {
|
||||||
|
count []int
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Limiter = &slidingWindow{}
|
||||||
|
|
||||||
|
type slidingWindow struct {
|
||||||
|
// capacity is the maximum number of requests allowed in a sliding window at
|
||||||
|
// any given time.
|
||||||
|
capacity int
|
||||||
|
// windowSize is the total duration of the sliding window. Limiter will allow
|
||||||
|
// at most capacity requests in this duration.
|
||||||
|
windowSize time.Duration
|
||||||
|
// slideInterval controls how frequently the window slides. Smaller interval
|
||||||
|
// provides better accuracy at the cost of more frequent sliding & more
|
||||||
|
// memory usage.
|
||||||
|
slideInterval time.Duration
|
||||||
|
|
||||||
|
// numIntervals is the number of intervals in the window. Calculated as
|
||||||
|
// windowSize / slideInterval.
|
||||||
|
numIntervals int
|
||||||
|
// currentInterval tracks the current slide interval
|
||||||
|
currentInterval int
|
||||||
|
|
||||||
|
// Each request acquires a token from the permits channel. If the channel
|
||||||
|
// is empty, the request is blocked until a permit is available or if the
|
||||||
|
// context is cancelled.
|
||||||
|
permits chan token
|
||||||
|
|
||||||
|
// curr and prev are fixed windows of size windowSize. Each window contains
|
||||||
|
// a slice of intervals which hold a count of the number of tokens granted
|
||||||
|
// during that interval.
|
||||||
|
curr fixedWindow
|
||||||
|
prev fixedWindow
|
||||||
|
|
||||||
|
// mu synchronizes access to the curr and prev windows
|
||||||
|
mu sync.Mutex
|
||||||
|
// stopTicker stops the recurring slide ticker
|
||||||
|
stopTicker chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSlidingWindowLimiter(
|
||||||
|
windowSize, slideInterval time.Duration,
|
||||||
|
capacity int,
|
||||||
|
) (Limiter, error) {
|
||||||
|
if err := validate(windowSize, slideInterval, capacity); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ni := int(windowSize / slideInterval)
|
||||||
|
|
||||||
|
s := &slidingWindow{
|
||||||
|
windowSize: windowSize,
|
||||||
|
slideInterval: slideInterval,
|
||||||
|
capacity: capacity,
|
||||||
|
permits: make(chan token, capacity),
|
||||||
|
numIntervals: ni,
|
||||||
|
prev: fixedWindow{
|
||||||
|
count: make([]int, ni),
|
||||||
|
},
|
||||||
|
curr: fixedWindow{
|
||||||
|
count: make([]int, ni),
|
||||||
|
},
|
||||||
|
currentInterval: -1,
|
||||||
|
stopTicker: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.initialize()
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait blocks a request until a token is available or the context is cancelled.
|
||||||
|
// TODO(pandeyabs): Implement WaitN.
|
||||||
|
func (s *slidingWindow) Wait(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return clues.Stack(ctx.Err())
|
||||||
|
case <-s.permits:
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.curr.count[s.currentInterval]++
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown cleans up the slide goroutine. If shutdown is not called, the slide
|
||||||
|
// goroutine will continue to run until the program exits.
|
||||||
|
func (s *slidingWindow) Shutdown() {
|
||||||
|
close(s.stopTicker)
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize starts the slide goroutine and prefills tokens to full capacity.
|
||||||
|
func (s *slidingWindow) initialize() {
|
||||||
|
// Ok to not hold the mutex here since nothing else is running yet.
|
||||||
|
s.nextInterval()
|
||||||
|
|
||||||
|
// Start a goroutine which runs every slideInterval. This goroutine will
|
||||||
|
// continue to run until the program exits or until Shutdown is called.
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(s.slideInterval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
s.slide()
|
||||||
|
case <-s.stopTicker:
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Prefill permits to allow tokens to be granted immediately
|
||||||
|
for i := 0; i < s.capacity; i++ {
|
||||||
|
s.permits <- token{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nextInterval increments the current interval and slides the fixed
|
||||||
|
// windows if needed. Should be called with the mutex held.
|
||||||
|
func (s *slidingWindow) nextInterval() {
|
||||||
|
// Increment current interval
|
||||||
|
s.currentInterval = (s.currentInterval + 1) % s.numIntervals
|
||||||
|
|
||||||
|
// Slide the fixed windows if windowSize time has elapsed.
|
||||||
|
if s.currentInterval == 0 {
|
||||||
|
// sum := int64(0)
|
||||||
|
// for i := 0; i < int(s.numIntervals); i++ {
|
||||||
|
// fmt.Printf("Curr Interval %d: %d\n", i, s.curr.count[i])
|
||||||
|
// sum += s.curr.count[i]
|
||||||
|
// }
|
||||||
|
|
||||||
|
//fmt.Printf("Switching window: Total tokens issued during window: %d\n\n", sum)
|
||||||
|
|
||||||
|
s.prev = s.curr
|
||||||
|
s.curr = fixedWindow{
|
||||||
|
count: make([]int, s.numIntervals),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// slide moves the window forward by one interval. It reclaims tokens from the
|
||||||
|
// interval that we slid past and adds them back to available permits. If the
|
||||||
|
// permits are already at capacity, excess tokens are discarded.
|
||||||
|
func (s *slidingWindow) slide() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.nextInterval()
|
||||||
|
|
||||||
|
for i := 0; i < s.prev.count[s.currentInterval]; i++ {
|
||||||
|
s.permits <- token{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func validate(
|
||||||
|
windowSize, slideInterval time.Duration,
|
||||||
|
capacity int,
|
||||||
|
) error {
|
||||||
|
if windowSize <= 0 {
|
||||||
|
return clues.New("invalid window size")
|
||||||
|
}
|
||||||
|
|
||||||
|
if slideInterval <= 0 {
|
||||||
|
return clues.New("invalid slide interval")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow capacity to be 0 for testing purposes
|
||||||
|
if capacity < 0 {
|
||||||
|
return clues.New("invalid window capacity")
|
||||||
|
}
|
||||||
|
|
||||||
|
if windowSize < slideInterval {
|
||||||
|
return clues.New("window too small to fit intervals")
|
||||||
|
}
|
||||||
|
|
||||||
|
if windowSize%slideInterval != 0 {
|
||||||
|
return clues.New("window not divisible by slide interval")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
260
src/internal/common/limiters/sliding_window_test.go
Normal file
260
src/internal/common/limiters/sliding_window_test.go
Normal file
@ -0,0 +1,260 @@
|
|||||||
|
package limiters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SlidingWindowUnitTestSuite struct {
|
||||||
|
tester.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindowLimiterSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &SlidingWindowUnitTestSuite{Suite: tester.NewUnitSuite(t)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWaitBasic tests the Wait() functionality of the limiter with multiple
|
||||||
|
// concurrent requests.
|
||||||
|
func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
windowSize = 1 * time.Second
|
||||||
|
// Assume slide interval is equal to window size for simplicity.
|
||||||
|
slideInterval = 1 * time.Second
|
||||||
|
capacity = 100
|
||||||
|
startTime = time.Now()
|
||||||
|
numRequests = 3 * capacity
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mu sync.Mutex
|
||||||
|
intervalToCount = make(map[time.Duration]int)
|
||||||
|
)
|
||||||
|
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
// Check if all tokens are available for use post initialization.
|
||||||
|
require.Equal(t, capacity, len(s.(*slidingWindow).permits))
|
||||||
|
|
||||||
|
// Make concurrent requests to the limiter
|
||||||
|
for i := 0; i < numRequests; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
err := s.Wait(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Number of seconds since startTime
|
||||||
|
bucket := time.Since(startTime).Truncate(windowSize)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
intervalToCount[bucket]++
|
||||||
|
mu.Unlock()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Verify that number of requests allowed in each window is less than or equal
|
||||||
|
// to window capacity
|
||||||
|
for _, c := range intervalToCount {
|
||||||
|
require.True(t, c <= capacity, "count: %d, capacity: %d", c, capacity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWaitSliding tests the sliding window functionality of the limiter with
|
||||||
|
// time distributed Wait() calls.
|
||||||
|
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
windowSize = 1 * time.Second
|
||||||
|
slideInterval = 10 * time.Millisecond
|
||||||
|
capacity = 100
|
||||||
|
// Test will run for duration of 2 windowSize.
|
||||||
|
numRequests = 2 * capacity
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
// Make concurrent requests to the limiter
|
||||||
|
for i := 0; i < numRequests; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
// Sleep for a random duration to spread out requests over multiple slide
|
||||||
|
// intervals & windows, so that we can test the sliding window logic better.
|
||||||
|
// Without this, the requests will be bunched up in the very first intervals
|
||||||
|
// of the 2 windows. Rest of the intervals will be empty.
|
||||||
|
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
|
||||||
|
|
||||||
|
err := s.Wait(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Verify that number of requests allowed in each window is less than or equal
|
||||||
|
// to window capacity
|
||||||
|
sw := s.(*slidingWindow)
|
||||||
|
data := append(sw.prev.count, sw.curr.count...)
|
||||||
|
|
||||||
|
sums := slidingSums(data, sw.numIntervals)
|
||||||
|
|
||||||
|
for _, sum := range sums {
|
||||||
|
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
windowSize = 100 * time.Millisecond
|
||||||
|
slideInterval = 10 * time.Millisecond
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
// Initialize limiter with capacity = 0 to test context cancellations.
|
||||||
|
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 2*windowSize)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
err := s.Wait(ctx)
|
||||||
|
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
windowSize time.Duration
|
||||||
|
slideInterval time.Duration
|
||||||
|
capacity int
|
||||||
|
expectErr assert.ErrorAssertionFunc
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Invalid window size",
|
||||||
|
windowSize: 0,
|
||||||
|
slideInterval: 10 * time.Millisecond,
|
||||||
|
capacity: 100,
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid slide interval",
|
||||||
|
windowSize: 100 * time.Millisecond,
|
||||||
|
slideInterval: 0,
|
||||||
|
capacity: 100,
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Slide interval > window size",
|
||||||
|
windowSize: 10 * time.Millisecond,
|
||||||
|
slideInterval: 100 * time.Millisecond,
|
||||||
|
capacity: 100,
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid capacity",
|
||||||
|
windowSize: 100 * time.Millisecond,
|
||||||
|
slideInterval: 10 * time.Millisecond,
|
||||||
|
capacity: -1,
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Window not divisible by slide interval",
|
||||||
|
windowSize: 100 * time.Millisecond,
|
||||||
|
slideInterval: 11 * time.Millisecond,
|
||||||
|
capacity: 100,
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Valid parameters",
|
||||||
|
windowSize: 100 * time.Millisecond,
|
||||||
|
slideInterval: 10 * time.Millisecond,
|
||||||
|
capacity: 100,
|
||||||
|
expectErr: assert.NoError,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
s, err := NewSlidingWindowLimiter(
|
||||||
|
test.windowSize,
|
||||||
|
test.slideInterval,
|
||||||
|
test.capacity)
|
||||||
|
if s != nil {
|
||||||
|
s.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
test.expectErr(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func slidingSums(data []int, w int) []int {
|
||||||
|
var (
|
||||||
|
sum = 0
|
||||||
|
res = make([]int, len(data)-w+1)
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := 0; i < w; i++ {
|
||||||
|
sum += data[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
res[0] = sum
|
||||||
|
|
||||||
|
for i := 1; i < len(data)-w+1; i++ {
|
||||||
|
sum = sum - data[i-1] + data[i+w-1]
|
||||||
|
res[i] = sum
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
@ -61,10 +61,12 @@ func streamItems(
|
|||||||
stats.UpdateResourceCount(path.ChannelMessagesCategory)
|
stats.UpdateResourceCount(path.ChannelMessagesCategory)
|
||||||
body = data.ReaderWithStats(body, path.ChannelMessagesCategory, stats)
|
body = data.ReaderWithStats(body, path.ChannelMessagesCategory, stats)
|
||||||
|
|
||||||
|
// messages are exported as json and should be named as such
|
||||||
|
name := item.ID() + ".json"
|
||||||
|
|
||||||
ch <- export.Item{
|
ch <- export.Item{
|
||||||
ID: item.ID(),
|
ID: item.ID(),
|
||||||
// channel message items have no name
|
Name: name,
|
||||||
Name: item.ID(),
|
|
||||||
Body: body,
|
Body: body,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,7 +46,7 @@ func (suite *ExportUnitSuite) TestStreamItems() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectName: "zim",
|
expectName: "zim.json",
|
||||||
expectErr: assert.NoError,
|
expectErr: assert.NoError,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -71,7 +71,7 @@ func (suite *ExportUnitSuite) TestStreamItems() {
|
|||||||
clues.New("I miss my cupcake."),
|
clues.New("I miss my cupcake."),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectName: "gir",
|
expectName: "gir.json",
|
||||||
expectErr: assert.Error,
|
expectErr: assert.Error,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import (
|
|||||||
khttp "github.com/microsoft/kiota-http-go"
|
khttp "github.com/microsoft/kiota-http-go"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
limiters "github.com/alcionai/corso/src/internal/common/limiters"
|
||||||
"github.com/alcionai/corso/src/pkg/count"
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
@ -102,6 +103,10 @@ var (
|
|||||||
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
|
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
|
||||||
// also used as the exchange service limiter
|
// also used as the exchange service limiter
|
||||||
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
|
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
|
||||||
|
|
||||||
|
// 10 min window, 1 second sliding interval, 10k capacity
|
||||||
|
//exchangeLimiter = limiters.NewLimiter(10*time.Minute, 100*time.Millisecond, 10000)
|
||||||
|
exchangeLimiter = limiters.NewLimiter(10*time.Minute, 1*time.Second, 9800)
|
||||||
)
|
)
|
||||||
|
|
||||||
type LimiterCfg struct {
|
type LimiterCfg struct {
|
||||||
|
|||||||
@ -204,7 +204,9 @@ func internalMiddleware(
|
|||||||
&LoggingMiddleware{},
|
&LoggingMiddleware{},
|
||||||
throttler,
|
throttler,
|
||||||
&RateLimiterMiddleware{},
|
&RateLimiterMiddleware{},
|
||||||
&MetricsMiddleware{},
|
&MetricsMiddleware{
|
||||||
|
counter: counter,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(cc.appendMiddleware) > 0 {
|
if len(cc.appendMiddleware) > 0 {
|
||||||
|
|||||||
@ -70,3 +70,12 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string
|
|||||||
|
|
||||||
return string(respDump)
|
return string(respDump)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getReqDump(ctx context.Context, req *http.Request) string {
|
||||||
|
reqDump, err := httputil.DumpRequest(req, true)
|
||||||
|
if err != nil {
|
||||||
|
logger.CtxErr(ctx, err).Error("dumping http response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(reqDump)
|
||||||
|
}
|
||||||
|
|||||||
@ -334,6 +334,12 @@ func (mw *MetricsMiddleware) Intercept(
|
|||||||
middlewareIndex int,
|
middlewareIndex int,
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
) (*http.Response, error) {
|
) (*http.Response, error) {
|
||||||
|
// Log request
|
||||||
|
dump := getReqDump(req.Context(), req)
|
||||||
|
logger.Ctx(req.Context()).Debugw("making graph api req: ",
|
||||||
|
"request", dump,
|
||||||
|
"requests_so_far", mw.counter.Get(count.APICallTokensConsumed))
|
||||||
|
|
||||||
var (
|
var (
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
resp, err = pipeline.Next(req, middlewareIndex)
|
resp, err = pipeline.Next(req, middlewareIndex)
|
||||||
|
|||||||
@ -306,7 +306,9 @@ func kiotaMiddlewares(
|
|||||||
mw,
|
mw,
|
||||||
throttler,
|
throttler,
|
||||||
&RateLimiterMiddleware{},
|
&RateLimiterMiddleware{},
|
||||||
&MetricsMiddleware{})
|
&MetricsMiddleware{
|
||||||
|
counter: counter,
|
||||||
|
})
|
||||||
|
|
||||||
if len(cc.appendMiddleware) > 0 {
|
if len(cc.appendMiddleware) > 0 {
|
||||||
mw = append(mw, cc.appendMiddleware...)
|
mw = append(mw, cc.appendMiddleware...)
|
||||||
|
|||||||
@ -70,7 +70,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() {
|
|||||||
expectedItems = []export.Item{
|
expectedItems = []export.Item{
|
||||||
{
|
{
|
||||||
ID: itemID,
|
ID: itemID,
|
||||||
Name: dii.Groups.ItemName,
|
Name: dii.Groups.ItemName + ".json",
|
||||||
// Body: body, not checked
|
// Body: body, not checked
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@ -83,9 +83,10 @@ func NewBackupOperation(
|
|||||||
selector selectors.Selector,
|
selector selectors.Selector,
|
||||||
owner idname.Provider,
|
owner idname.Provider,
|
||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
|
counter *count.Bus,
|
||||||
) (BackupOperation, error) {
|
) (BackupOperation, error) {
|
||||||
op := BackupOperation{
|
op := BackupOperation{
|
||||||
operation: newOperation(opts, bus, count.New(), kw, sw),
|
operation: newOperation(opts, bus, counter, kw, sw),
|
||||||
ResourceOwner: owner,
|
ResourceOwner: owner,
|
||||||
Selectors: selector,
|
Selectors: selector,
|
||||||
Version: "v0",
|
Version: "v0",
|
||||||
|
|||||||
@ -429,7 +429,8 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() {
|
|||||||
acct,
|
acct,
|
||||||
sel,
|
sel,
|
||||||
sel,
|
sel,
|
||||||
evmock.NewBus())
|
evmock.NewBus(),
|
||||||
|
count.New())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
op.Errors.Fail(test.fail)
|
op.Errors.Fail(test.fail)
|
||||||
@ -1487,7 +1488,8 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
test.acct,
|
test.acct,
|
||||||
sel,
|
sel,
|
||||||
sel,
|
sel,
|
||||||
evmock.NewBus())
|
evmock.NewBus(),
|
||||||
|
count.New())
|
||||||
test.errCheck(t, err, clues.ToCore(err))
|
test.errCheck(t, err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1930,7 +1932,8 @@ func (suite *AssistBackupIntegrationSuite) TestBackupTypesForFailureModes() {
|
|||||||
acct,
|
acct,
|
||||||
osel.Selector,
|
osel.Selector,
|
||||||
selectors.Selector{DiscreteOwner: userID},
|
selectors.Selector{DiscreteOwner: userID},
|
||||||
evmock.NewBus())
|
evmock.NewBus(),
|
||||||
|
count.New())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
err = bo.Run(ctx)
|
err = bo.Run(ctx)
|
||||||
@ -2245,7 +2248,8 @@ func (suite *AssistBackupIntegrationSuite) TestExtensionsIncrementals() {
|
|||||||
acct,
|
acct,
|
||||||
osel.Selector,
|
osel.Selector,
|
||||||
selectors.Selector{DiscreteOwner: userID},
|
selectors.Selector{DiscreteOwner: userID},
|
||||||
evmock.NewBus())
|
evmock.NewBus(),
|
||||||
|
count.New())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
err = bo.Run(ctx)
|
err = bo.Run(ctx)
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/account"
|
"github.com/alcionai/corso/src/pkg/account"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||||
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
selTD "github.com/alcionai/corso/src/pkg/selectors/testdata"
|
selTD "github.com/alcionai/corso/src/pkg/selectors/testdata"
|
||||||
@ -181,7 +182,8 @@ func (suite *MaintenanceOpNightlySuite) TestRepoMaintenance_GarbageCollection()
|
|||||||
acct,
|
acct,
|
||||||
osel.Selector,
|
osel.Selector,
|
||||||
selectors.Selector{DiscreteOwner: userID},
|
selectors.Selector{DiscreteOwner: userID},
|
||||||
evmock.NewBus())
|
evmock.NewBus(),
|
||||||
|
count.New())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
err = bo.Run(ctx)
|
err = bo.Run(ctx)
|
||||||
|
|||||||
@ -118,12 +118,13 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchange() {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
sel = test.selector().Selector
|
sel = test.selector().Selector
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
whatSet = deeTD.CategoryFromRepoRef
|
whatSet = deeTD.CategoryFromRepoRef
|
||||||
)
|
)
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
sel = bod.sel
|
sel = bod.sel
|
||||||
@ -185,7 +186,8 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchange() {
|
|||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
incMB,
|
incMB,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
)
|
)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &incBO, incMB, true)
|
runAndCheckBackup(t, ctx, &incBO, incMB, true)
|
||||||
@ -238,10 +240,11 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchangeBasic_groups9Versio
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
sel = selectors.NewExchangeBackup([]string{suite.its.user.ID})
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
sel = selectors.NewExchangeBackup([]string{suite.its.user.ID})
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
opts = control.DefaultOptions()
|
||||||
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
)
|
)
|
||||||
|
|
||||||
sel.Include(
|
sel.Include(
|
||||||
@ -255,7 +258,8 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchangeBasic_groups9Versio
|
|||||||
mb,
|
mb,
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
opts,
|
opts,
|
||||||
version.All8MigrateUserPNToID)
|
version.All8MigrateUserPNToID,
|
||||||
|
counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -290,12 +294,14 @@ func (suite *ExchangeBackupIntgSuite) TestBackup_Run_exchangeBasic_groups9Versio
|
|||||||
false)
|
false)
|
||||||
|
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
notForcedFull := newTestBackupOp(
|
notForcedFull := newTestBackupOp(
|
||||||
t,
|
t,
|
||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
mb,
|
mb,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
notForcedFull.BackupVersion = version.Groups9Update
|
notForcedFull.BackupVersion = version.Groups9Update
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, ¬ForcedFull, mb, false)
|
runAndCheckBackup(t, ctx, ¬ForcedFull, mb, false)
|
||||||
@ -357,6 +363,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
|||||||
var (
|
var (
|
||||||
acct = tconfig.NewM365Account(t)
|
acct = tconfig.NewM365Account(t)
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
now = dttm.Now()
|
now = dttm.Now()
|
||||||
service = path.ExchangeService
|
service = path.ExchangeService
|
||||||
categories = map[path.CategoryType][][]string{
|
categories = map[path.CategoryType][][]string{
|
||||||
@ -379,7 +386,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
|||||||
)
|
)
|
||||||
|
|
||||||
opts.ToggleFeatures = toggles
|
opts.ToggleFeatures = toggles
|
||||||
ctrl, sels := ControllerWithSelector(t, ctx, acct, sel.Selector, nil, nil)
|
ctrl, sels := ControllerWithSelector(t, ctx, acct, sel.Selector, nil, nil, counter)
|
||||||
sel.DiscreteOwner = sels.ID()
|
sel.DiscreteOwner = sels.ID()
|
||||||
sel.DiscreteOwnerName = sels.Name()
|
sel.DiscreteOwnerName = sels.Name()
|
||||||
|
|
||||||
@ -578,7 +585,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
// run the initial backup
|
// run the initial backup
|
||||||
@ -911,15 +918,16 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
|||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
var (
|
var (
|
||||||
t = suite.T()
|
t = suite.T()
|
||||||
incMB = evmock.NewBus()
|
incMB = evmock.NewBus()
|
||||||
atid = creds.AzureTenantID
|
counter = count.New()
|
||||||
|
atid = creds.AzureTenantID
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, flush := tester.WithContext(t, ctx)
|
ctx, flush := tester.WithContext(t, ctx)
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
incBO := newTestBackupOp(t, ctx, bod, incMB, opts)
|
incBO := newTestBackupOp(t, ctx, bod, incMB, opts, counter)
|
||||||
|
|
||||||
suite.Run("PreTestSetup", func() {
|
suite.Run("PreTestSetup", func() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
@ -1043,11 +1051,12 @@ func (suite *ExchangeRestoreNightlyIntgSuite) TestRestore_Run_exchangeWithAdvanc
|
|||||||
baseSel.DiscreteOwner = suite.its.user.ID
|
baseSel.DiscreteOwner = suite.its.user.ID
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
opts = control.DefaultOptions()
|
counter = count.New()
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, baseSel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, baseSel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -1353,11 +1362,12 @@ func (suite *ExchangeRestoreNightlyIntgSuite) TestRestore_Run_exchangeAlternateP
|
|||||||
baseSel.DiscreteOwner = suite.its.user.ID
|
baseSel.DiscreteOwner = suite.its.user.ID
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
opts = control.DefaultOptions()
|
counter = count.New()
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, baseSel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, baseSel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
|
|||||||
@ -88,6 +88,7 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic_groups9VersionBum
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
|
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
whatSet = deeTD.CategoryFromRepoRef
|
whatSet = deeTD.CategoryFromRepoRef
|
||||||
@ -103,7 +104,8 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic_groups9VersionBum
|
|||||||
mb,
|
mb,
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
opts,
|
opts,
|
||||||
version.All8MigrateUserPNToID)
|
version.All8MigrateUserPNToID,
|
||||||
|
counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -138,12 +140,14 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic_groups9VersionBum
|
|||||||
false)
|
false)
|
||||||
|
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
forcedFull := newTestBackupOp(
|
forcedFull := newTestBackupOp(
|
||||||
t,
|
t,
|
||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
mb,
|
mb,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
forcedFull.BackupVersion = version.Groups9Update
|
forcedFull.BackupVersion = version.Groups9Update
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &forcedFull, mb, false)
|
runAndCheckBackup(t, ctx, &forcedFull, mb, false)
|
||||||
@ -203,6 +207,7 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
|
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
whatSet = deeTD.CategoryFromRepoRef
|
whatSet = deeTD.CategoryFromRepoRef
|
||||||
@ -212,7 +217,7 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() {
|
|||||||
selTD.GroupsBackupLibraryFolderScope(sel),
|
selTD.GroupsBackupLibraryFolderScope(sel),
|
||||||
selTD.GroupsBackupChannelScope(sel))
|
selTD.GroupsBackupChannelScope(sel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -254,12 +259,13 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsExtensions() {
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
|
||||||
tenID = tconfig.M365TenantID(t)
|
opts = control.DefaultOptions()
|
||||||
svc = path.GroupsService
|
tenID = tconfig.M365TenantID(t)
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
svc = path.GroupsService
|
||||||
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
)
|
)
|
||||||
|
|
||||||
opts.ItemExtensionFactory = getTestExtensionFactories()
|
opts.ItemExtensionFactory = getTestExtensionFactories()
|
||||||
@ -267,7 +273,7 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsExtensions() {
|
|||||||
// does not apply to channel messages
|
// does not apply to channel messages
|
||||||
sel.Include(selTD.GroupsBackupLibraryFolderScope(sel))
|
sel.Include(selTD.GroupsBackupLibraryFolderScope(sel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
|
|||||||
@ -115,6 +115,7 @@ func prepNewTestBackupOp(
|
|||||||
sel selectors.Selector,
|
sel selectors.Selector,
|
||||||
opts control.Options,
|
opts control.Options,
|
||||||
backupVersion int,
|
backupVersion int,
|
||||||
|
counter *count.Bus,
|
||||||
) (
|
) (
|
||||||
operations.BackupOperation,
|
operations.BackupOperation,
|
||||||
*backupOpDependencies,
|
*backupOpDependencies,
|
||||||
@ -161,14 +162,16 @@ func prepNewTestBackupOp(
|
|||||||
bod.acct,
|
bod.acct,
|
||||||
sel,
|
sel,
|
||||||
nil,
|
nil,
|
||||||
bod.close)
|
bod.close,
|
||||||
|
counter)
|
||||||
|
|
||||||
bo := newTestBackupOp(
|
bo := newTestBackupOp(
|
||||||
t,
|
t,
|
||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
bus,
|
bus,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
bo.BackupVersion = backupVersion
|
bo.BackupVersion = backupVersion
|
||||||
|
|
||||||
bod.sss = streamstore.NewStreamer(
|
bod.sss = streamstore.NewStreamer(
|
||||||
@ -189,6 +192,7 @@ func newTestBackupOp(
|
|||||||
bod *backupOpDependencies,
|
bod *backupOpDependencies,
|
||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
opts control.Options,
|
opts control.Options,
|
||||||
|
counter *count.Bus,
|
||||||
) operations.BackupOperation {
|
) operations.BackupOperation {
|
||||||
bod.ctrl.IDNameLookup = idname.NewCache(map[string]string{bod.sel.ID(): bod.sel.Name()})
|
bod.ctrl.IDNameLookup = idname.NewCache(map[string]string{bod.sel.ID(): bod.sel.Name()})
|
||||||
|
|
||||||
@ -201,7 +205,8 @@ func newTestBackupOp(
|
|||||||
bod.acct,
|
bod.acct,
|
||||||
bod.sel,
|
bod.sel,
|
||||||
bod.sel,
|
bod.sel,
|
||||||
bus)
|
bus,
|
||||||
|
counter)
|
||||||
if !assert.NoError(t, err, clues.ToCore(err)) {
|
if !assert.NoError(t, err, clues.ToCore(err)) {
|
||||||
bod.close(t, ctx)
|
bod.close(t, ctx)
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
@ -561,13 +566,14 @@ func ControllerWithSelector(
|
|||||||
sel selectors.Selector,
|
sel selectors.Selector,
|
||||||
ins idname.Cacher,
|
ins idname.Cacher,
|
||||||
onFail func(*testing.T, context.Context),
|
onFail func(*testing.T, context.Context),
|
||||||
|
counter *count.Bus,
|
||||||
) (*m365.Controller, selectors.Selector) {
|
) (*m365.Controller, selectors.Selector) {
|
||||||
ctrl, err := m365.NewController(
|
ctrl, err := m365.NewController(
|
||||||
ctx,
|
ctx,
|
||||||
acct,
|
acct,
|
||||||
sel.PathService(),
|
sel.PathService(),
|
||||||
control.DefaultOptions(),
|
control.DefaultOptions(),
|
||||||
count.New())
|
counter)
|
||||||
if !assert.NoError(t, err, clues.ToCore(err)) {
|
if !assert.NoError(t, err, clues.ToCore(err)) {
|
||||||
if onFail != nil {
|
if onFail != nil {
|
||||||
onFail(t, ctx)
|
onFail(t, ctx)
|
||||||
|
|||||||
@ -71,18 +71,19 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDrive() {
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
tenID = tconfig.M365TenantID(t)
|
tenID = tconfig.M365TenantID(t)
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
userID = tconfig.SecondaryM365UserID(t)
|
counter = count.New()
|
||||||
osel = selectors.NewOneDriveBackup([]string{userID})
|
userID = tconfig.SecondaryM365UserID(t)
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
osel = selectors.NewOneDriveBackup([]string{userID})
|
||||||
svc = path.OneDriveService
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
opts = control.DefaultOptions()
|
svc = path.OneDriveService
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -117,11 +118,12 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveBasic_groups9Versio
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
userID = tconfig.SecondaryM365UserID(t)
|
counter = count.New()
|
||||||
osel = selectors.NewOneDriveBackup([]string{userID})
|
userID = tconfig.SecondaryM365UserID(t)
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
osel = selectors.NewOneDriveBackup([]string{userID})
|
||||||
opts = control.DefaultOptions()
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
||||||
@ -132,7 +134,8 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveBasic_groups9Versio
|
|||||||
mb,
|
mb,
|
||||||
osel.Selector,
|
osel.Selector,
|
||||||
opts,
|
opts,
|
||||||
version.All8MigrateUserPNToID)
|
version.All8MigrateUserPNToID,
|
||||||
|
counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -167,12 +170,14 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveBasic_groups9Versio
|
|||||||
false)
|
false)
|
||||||
|
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
notForcedFull := newTestBackupOp(
|
notForcedFull := newTestBackupOp(
|
||||||
t,
|
t,
|
||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
mb,
|
mb,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
notForcedFull.BackupVersion = version.Groups9Update
|
notForcedFull.BackupVersion = version.Groups9Update
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, ¬ForcedFull, mb, false)
|
runAndCheckBackup(t, ctx, ¬ForcedFull, mb, false)
|
||||||
@ -282,10 +287,11 @@ func runDriveIncrementalTest(
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
acct = tconfig.NewM365Account(t)
|
acct = tconfig.NewM365Account(t)
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
counter = count.New()
|
||||||
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
|
|
||||||
// `now` has to be formatted with SimpleDateTimeTesting as
|
// `now` has to be formatted with SimpleDateTimeTesting as
|
||||||
// some drives cannot have `:` in file/folder names
|
// some drives cannot have `:` in file/folder names
|
||||||
@ -315,7 +321,7 @@ func runDriveIncrementalTest(
|
|||||||
creds, err := acct.M365Config()
|
creds, err := acct.M365Config()
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
ctrl, sel := ControllerWithSelector(t, ctx, acct, sel, nil, nil)
|
ctrl, sel := ControllerWithSelector(t, ctx, acct, sel, nil, nil, counter)
|
||||||
ac := ctrl.AC.Drives()
|
ac := ctrl.AC.Drives()
|
||||||
rh := getRestoreHandler(ctrl.AC)
|
rh := getRestoreHandler(ctrl.AC)
|
||||||
|
|
||||||
@ -436,7 +442,7 @@ func runDriveIncrementalTest(
|
|||||||
locRef)
|
locRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
sel = bod.sel
|
sel = bod.sel
|
||||||
@ -825,14 +831,16 @@ func runDriveIncrementalTest(
|
|||||||
bod.ctrl = cleanCtrl
|
bod.ctrl = cleanCtrl
|
||||||
|
|
||||||
var (
|
var (
|
||||||
t = suite.T()
|
t = suite.T()
|
||||||
incMB = evmock.NewBus()
|
incMB = evmock.NewBus()
|
||||||
incBO = newTestBackupOp(
|
counter = count.New()
|
||||||
|
incBO = newTestBackupOp(
|
||||||
t,
|
t,
|
||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
incMB,
|
incMB,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, flush := tester.WithContext(t, ctx)
|
ctx, flush := tester.WithContext(t, ctx)
|
||||||
@ -986,6 +994,7 @@ func runDriveAssistBaseGroupsUpdate(
|
|||||||
var (
|
var (
|
||||||
whatSet = deeTD.CategoryFromRepoRef
|
whatSet = deeTD.CategoryFromRepoRef
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1001,7 +1010,8 @@ func runDriveAssistBaseGroupsUpdate(
|
|||||||
mb,
|
mb,
|
||||||
sel,
|
sel,
|
||||||
opts,
|
opts,
|
||||||
version.All8MigrateUserPNToID)
|
version.All8MigrateUserPNToID,
|
||||||
|
counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
suite.Run("makeAssistBackup", func() {
|
suite.Run("makeAssistBackup", func() {
|
||||||
@ -1035,8 +1045,9 @@ func runDriveAssistBaseGroupsUpdate(
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
opts = control.DefaultOptions()
|
counter = count.New()
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
forcedFull := newTestBackupOp(
|
forcedFull := newTestBackupOp(
|
||||||
@ -1044,7 +1055,8 @@ func runDriveAssistBaseGroupsUpdate(
|
|||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
mb,
|
mb,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
forcedFull.BackupVersion = version.Groups9Update
|
forcedFull.BackupVersion = version.Groups9Update
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &forcedFull, mb, false)
|
runAndCheckBackup(t, ctx, &forcedFull, mb, false)
|
||||||
@ -1106,9 +1118,10 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
acct = tconfig.NewM365Account(t)
|
acct = tconfig.NewM365Account(t)
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
|
|
||||||
categories = map[path.CategoryType][][]string{
|
categories = map[path.CategoryType][][]string{
|
||||||
path.FilesCategory: {{bupMD.DeltaURLsFileName}, {bupMD.PreviousPathFileName}},
|
path.FilesCategory: {{bupMD.DeltaURLsFileName}, {bupMD.PreviousPathFileName}},
|
||||||
@ -1123,7 +1136,7 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
|||||||
acct,
|
acct,
|
||||||
path.OneDriveService,
|
path.OneDriveService,
|
||||||
control.DefaultOptions(),
|
control.DefaultOptions(),
|
||||||
count.New())
|
counter)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
userable, err := ctrl.AC.Users().GetByID(ctx, suite.its.user.ID)
|
userable, err := ctrl.AC.Users().GetByID(ctx, suite.its.user.ID)
|
||||||
@ -1135,7 +1148,7 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
|||||||
oldsel := selectors.NewOneDriveBackup([]string{uname})
|
oldsel := selectors.NewOneDriveBackup([]string{uname})
|
||||||
oldsel.Include(selTD.OneDriveBackupFolderScope(oldsel))
|
oldsel.Include(selTD.OneDriveBackupFolderScope(oldsel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, oldsel.Selector, opts, 0)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, oldsel.Selector, opts, 0, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
sel := bod.sel
|
sel := bod.sel
|
||||||
@ -1163,7 +1176,7 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
|||||||
var (
|
var (
|
||||||
incMB = evmock.NewBus()
|
incMB = evmock.NewBus()
|
||||||
// the incremental backup op should have a proper user ID for the id.
|
// the incremental backup op should have a proper user ID for the id.
|
||||||
incBO = newTestBackupOp(t, ctx, bod, incMB, opts)
|
incBO = newTestBackupOp(t, ctx, bod, incMB, opts, counter)
|
||||||
)
|
)
|
||||||
|
|
||||||
require.NotEqualf(
|
require.NotEqualf(
|
||||||
@ -1234,20 +1247,21 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveExtensions() {
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
tenID = tconfig.M365TenantID(t)
|
tenID = tconfig.M365TenantID(t)
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
userID = tconfig.SecondaryM365UserID(t)
|
counter = count.New()
|
||||||
osel = selectors.NewOneDriveBackup([]string{userID})
|
userID = tconfig.SecondaryM365UserID(t)
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
osel = selectors.NewOneDriveBackup([]string{userID})
|
||||||
svc = path.OneDriveService
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
opts = control.DefaultOptions()
|
svc = path.OneDriveService
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
opts.ItemExtensionFactory = getTestExtensionFactories()
|
opts.ItemExtensionFactory = getTestExtensionFactories()
|
||||||
|
|
||||||
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, osel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -1326,11 +1340,12 @@ func runDriveRestoreWithAdvancedOptions(
|
|||||||
// a backup is required to run restores
|
// a backup is required to run restores
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
opts = control.DefaultOptions()
|
counter = count.New()
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -1598,11 +1613,12 @@ func runDriveRestoreToAlternateProtectedResource(
|
|||||||
// a backup is required to run restores
|
// a backup is required to run restores
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
opts = control.DefaultOptions()
|
counter = count.New()
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
|
|||||||
@ -68,7 +68,7 @@ func prepNewTestRestoreOp(
|
|||||||
backupStore storage.Storage,
|
backupStore storage.Storage,
|
||||||
backupID model.StableID,
|
backupID model.StableID,
|
||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
ctr *count.Bus,
|
counter *count.Bus,
|
||||||
sel selectors.Selector,
|
sel selectors.Selector,
|
||||||
opts control.Options,
|
opts control.Options,
|
||||||
restoreCfg control.RestoreConfig,
|
restoreCfg control.RestoreConfig,
|
||||||
@ -112,7 +112,8 @@ func prepNewTestRestoreOp(
|
|||||||
rod.acct,
|
rod.acct,
|
||||||
sel,
|
sel,
|
||||||
nil,
|
nil,
|
||||||
rod.close)
|
rod.close,
|
||||||
|
counter)
|
||||||
|
|
||||||
ro := newTestRestoreOp(
|
ro := newTestRestoreOp(
|
||||||
t,
|
t,
|
||||||
@ -120,7 +121,7 @@ func prepNewTestRestoreOp(
|
|||||||
rod,
|
rod,
|
||||||
backupID,
|
backupID,
|
||||||
bus,
|
bus,
|
||||||
ctr,
|
counter,
|
||||||
opts,
|
opts,
|
||||||
restoreCfg)
|
restoreCfg)
|
||||||
|
|
||||||
@ -142,7 +143,7 @@ func newTestRestoreOp(
|
|||||||
rod *restoreOpDependencies,
|
rod *restoreOpDependencies,
|
||||||
backupID model.StableID,
|
backupID model.StableID,
|
||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
ctr *count.Bus,
|
counter *count.Bus,
|
||||||
opts control.Options,
|
opts control.Options,
|
||||||
restoreCfg control.RestoreConfig,
|
restoreCfg control.RestoreConfig,
|
||||||
) operations.RestoreOperation {
|
) operations.RestoreOperation {
|
||||||
@ -159,7 +160,7 @@ func newTestRestoreOp(
|
|||||||
rod.sel,
|
rod.sel,
|
||||||
restoreCfg,
|
restoreCfg,
|
||||||
bus,
|
bus,
|
||||||
ctr)
|
counter)
|
||||||
if !assert.NoError(t, err, clues.ToCore(err)) {
|
if !assert.NoError(t, err, clues.ToCore(err)) {
|
||||||
rod.close(t, ctx)
|
rod.close(t, ctx)
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
|
|||||||
@ -53,10 +53,11 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePointBasic_groups9Ve
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
sel = selectors.NewSharePointBackup([]string{suite.its.site.ID})
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
sel = selectors.NewSharePointBackup([]string{suite.its.site.ID})
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
opts = control.DefaultOptions()
|
||||||
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
)
|
)
|
||||||
|
|
||||||
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
||||||
@ -67,7 +68,8 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePointBasic_groups9Ve
|
|||||||
mb,
|
mb,
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
opts,
|
opts,
|
||||||
version.All8MigrateUserPNToID)
|
version.All8MigrateUserPNToID,
|
||||||
|
counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -102,12 +104,14 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePointBasic_groups9Ve
|
|||||||
false)
|
false)
|
||||||
|
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
notForcedFull := newTestBackupOp(
|
notForcedFull := newTestBackupOp(
|
||||||
t,
|
t,
|
||||||
ctx,
|
ctx,
|
||||||
bod,
|
bod,
|
||||||
mb,
|
mb,
|
||||||
opts)
|
opts,
|
||||||
|
counter)
|
||||||
notForcedFull.BackupVersion = version.Groups9Update
|
notForcedFull.BackupVersion = version.Groups9Update
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, ¬ForcedFull, mb, false)
|
runAndCheckBackup(t, ctx, ¬ForcedFull, mb, false)
|
||||||
@ -207,14 +211,15 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePointBasic() {
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
sel = selectors.NewSharePointBackup([]string{suite.its.site.ID})
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
sel = selectors.NewSharePointBackup([]string{suite.its.site.ID})
|
||||||
|
opts = control.DefaultOptions()
|
||||||
)
|
)
|
||||||
|
|
||||||
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -236,19 +241,20 @@ func (suite *SharePointBackupIntgSuite) TestBackup_Run_sharePointExtensions() {
|
|||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
sel = selectors.NewSharePointBackup([]string{suite.its.site.ID})
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
sel = selectors.NewSharePointBackup([]string{suite.its.site.ID})
|
||||||
tenID = tconfig.M365TenantID(t)
|
opts = control.DefaultOptions()
|
||||||
svc = path.SharePointService
|
tenID = tconfig.M365TenantID(t)
|
||||||
ws = deeTD.DriveIDFromRepoRef
|
svc = path.SharePointService
|
||||||
|
ws = deeTD.DriveIDFromRepoRef
|
||||||
)
|
)
|
||||||
|
|
||||||
opts.ItemExtensionFactory = getTestExtensionFactories()
|
opts.ItemExtensionFactory = getTestExtensionFactories()
|
||||||
|
|
||||||
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
sel.Include(selTD.SharePointBackupFolderScope(sel))
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
@ -385,6 +391,7 @@ func (suite *SharePointRestoreNightlyIntgSuite) TestRestore_Run_sharepointDelete
|
|||||||
// run a backup
|
// run a backup
|
||||||
var (
|
var (
|
||||||
mb = evmock.NewBus()
|
mb = evmock.NewBus()
|
||||||
|
counter = count.New()
|
||||||
opts = control.DefaultOptions()
|
opts = control.DefaultOptions()
|
||||||
graphClient = suite.its.ac.Stable.Client()
|
graphClient = suite.its.ac.Stable.Client()
|
||||||
)
|
)
|
||||||
@ -394,7 +401,7 @@ func (suite *SharePointRestoreNightlyIntgSuite) TestRestore_Run_sharepointDelete
|
|||||||
bsel.Filter(bsel.Library(rc.Location))
|
bsel.Filter(bsel.Library(rc.Location))
|
||||||
bsel.DiscreteOwner = suite.its.site.ID
|
bsel.DiscreteOwner = suite.its.site.ID
|
||||||
|
|
||||||
bo, bod := prepNewTestBackupOp(t, ctx, mb, bsel.Selector, opts, version.Backup)
|
bo, bod := prepNewTestBackupOp(t, ctx, mb, bsel.Selector, opts, version.Backup, counter)
|
||||||
defer bod.close(t, ctx)
|
defer bod.close(t, ctx)
|
||||||
|
|
||||||
runAndCheckBackup(t, ctx, &bo, mb, false)
|
runAndCheckBackup(t, ctx, &bo, mb, false)
|
||||||
|
|||||||
@ -93,7 +93,8 @@ func (r repository) NewBackupWithLookup(
|
|||||||
r.Account,
|
r.Account,
|
||||||
sel,
|
sel,
|
||||||
sel, // the selector acts as an IDNamer for its discrete resource owner.
|
sel, // the selector acts as an IDNamer for its discrete resource owner.
|
||||||
r.Bus)
|
r.Bus,
|
||||||
|
r.counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backup retrieves a backup by id.
|
// Backup retrieves a backup by id.
|
||||||
|
|||||||
@ -85,7 +85,7 @@ func connectToM365(
|
|||||||
r.Account,
|
r.Account,
|
||||||
pst,
|
pst,
|
||||||
r.Opts,
|
r.Opts,
|
||||||
r.counter.Local())
|
r.counter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, clues.Wrap(err, "creating m365 client controller")
|
return nil, clues.Wrap(err, "creating m365 client controller")
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user