Use a WaitGroup for AwaitStatus (#716)

## Description

This builds on the `MergeStatus` proposal and `WaitGroup` discussion proposed in #494 
Required for OneDrive where we are operating on multiple collections

## Type of change

Please check the type of change your PR introduces:
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 🐹 Trivial/Minor

## Issue(s)
#494 

## Test Plan

<!-- How will this be tested prior to merging.-->

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Vaibhav Kamra 2022-09-02 10:17:17 -07:00 committed by GitHub
parent 21c2e4af14
commit ed1a4bebce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 219 additions and 93 deletions

View File

@ -50,7 +50,7 @@ type Collection struct {
service graph.Service
collectionType optionIdentifier
statusCh chan<- *support.ConnectorOperationStatus
statusUpdater support.StatusUpdater
// FullPath is the slice representation of the action context passed down through the hierarchy.
// The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string
@ -62,14 +62,14 @@ func NewCollection(
fullPath []string,
collectionType optionIdentifier,
service graph.Service,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) Collection {
collection := Collection{
user: user,
data: make(chan data.Stream, collectionChannelBufferSize),
jobs: make([]string, 0),
service: service,
statusCh: statusCh,
statusUpdater: statusUpdater,
fullPath: fullPath,
collectionType: collectionType,
}
@ -169,7 +169,7 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, errs e
attempted := len(col.jobs)
status := support.CreateStatus(ctx, support.Backup, attempted, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
col.statusCh <- status
col.statusUpdater(status)
}
// GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc

View File

@ -33,7 +33,7 @@ type GraphIterateFunc func(
qp graph.QueryParams,
errs error,
collections map[string]*Collection,
graphStatusChannel chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) func(any) bool
// IterateSelectAllDescendablesForCollection utility function for
@ -45,7 +45,7 @@ func IterateSelectAllDescendablesForCollections(
qp graph.QueryParams,
errs error,
collections map[string]*Collection,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) func(any) bool {
var (
isCategorySet bool
@ -88,7 +88,7 @@ func IterateSelectAllDescendablesForCollections(
[]string{qp.Credentials.TenantID, qp.User, category, directory},
collectionType,
service,
statusCh,
statusUpdater,
)
collections[directory] = &edc
}
@ -108,7 +108,7 @@ func IterateSelectAllEventsForCollections(
qp graph.QueryParams,
errs error,
collections map[string]*Collection,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) func(any) bool {
return func(eventItem any) bool {
event, ok := eventItem.(models.Eventable)
@ -170,7 +170,7 @@ func IterateSelectAllEventsForCollections(
[]string{qp.Credentials.TenantID, qp.User, eventsCategory, directory},
events,
service,
statusCh,
statusUpdater,
)
collections[directory] = &edc
}
@ -189,7 +189,7 @@ func IterateAndFilterMessagesForCollections(
qp graph.QueryParams,
errs error,
collections map[string]*Collection,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) func(any) bool {
var isFilterSet bool
@ -199,7 +199,7 @@ func IterateAndFilterMessagesForCollections(
ctx,
qp,
collections,
statusCh,
statusUpdater,
)
if err != nil {
errs = support.WrapAndAppend(qp.User, err, errs)
@ -231,7 +231,7 @@ func IterateFilterFolderDirectoriesForCollections(
qp graph.QueryParams,
errs error,
collections map[string]*Collection,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) func(any) bool {
var (
service graph.Service
@ -279,7 +279,7 @@ func IterateFilterFolderDirectoriesForCollections(
[]string{qp.Credentials.TenantID, qp.User, mailCategory, directory},
messages,
service,
statusCh,
statusUpdater,
)
collections[directory] = &temp

View File

@ -126,7 +126,7 @@ func CollectMailFolders(
ctx context.Context,
qp graph.QueryParams,
collections map[string]*Collection,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) error {
queryService, err := createService(qp.Credentials, qp.FailFast)
if err != nil {
@ -156,7 +156,7 @@ func CollectMailFolders(
qp,
err,
collections,
statusCh,
statusUpdater,
)
iterateFailure := pageIterator.Iterate(callbackFunc)

View File

@ -7,7 +7,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"sync"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
@ -28,16 +28,19 @@ import (
// bookkeeping and interfacing with other component.
type GraphConnector struct {
graphService
tenant string
Users map[string]string // key<email> value<id>
status *support.ConnectorOperationStatus // contains the status of the last run status
statusCh chan *support.ConnectorOperationStatus
awaitingMessages int32
credentials account.M365Config
tenant string
Users map[string]string // key<email> value<id>
credentials account.M365Config
// wg is used to track completion of GC tasks
wg *sync.WaitGroup
// mutex used to synchronize updates to `status`
mu sync.Mutex
status support.ConnectorOperationStatus // contains the status of the last run status
}
// Service returns the GC's embedded graph.Service
func (gc GraphConnector) Service() graph.Service {
func (gc *GraphConnector) Service() graph.Service {
return gc.graphService
}
@ -70,8 +73,7 @@ func NewGraphConnector(acct account.Account) (*GraphConnector, error) {
gc := GraphConnector{
tenant: m365.TenantID,
Users: make(map[string]string, 0),
status: nil,
statusCh: make(chan *support.ConnectorOperationStatus),
wg: &sync.WaitGroup{},
credentials: m365,
}
@ -204,7 +206,8 @@ func buildFromMap(isKey bool, mapping map[string]string) []string {
// ExchangeDataStream returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
// Assumption: User exists
// Add iota to this call -> mail, contacts, calendar, etc.
//
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(
ctx context.Context,
selector selectors.Selector,
@ -303,10 +306,7 @@ func (gc *GraphConnector) RestoreExchangeDataCollection(
gc.incrementAwaitingMessages()
status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs)
// set the channel asynchronously so that this func doesn't block.
go func(cos *support.ConnectorOperationStatus) {
gc.statusCh <- cos
}(status)
gc.UpdateStatus(status)
return errs
}
@ -356,7 +356,7 @@ func (gc *GraphConnector) createCollections(
// callbackFunc iterates through all M365 object target and fills exchange.Collection.jobs[]
// with corresponding item M365IDs. New collections are created for each directory.
// Each directory used the M365 Identifier. The use of ID stops collisions betweens users
callbackFunc := gIter(ctx, qp, errs, collections, gc.statusCh)
callbackFunc := gIter(ctx, qp, errs, collections, gc.UpdateStatus)
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
@ -377,32 +377,32 @@ func (gc *GraphConnector) createCollections(
return allCollections, errs
}
// AwaitStatus updates status field based on item within statusChannel.
// AwaitStatus waits for all gc tasks to complete and then returns status
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
if gc.awaitingMessages > 0 {
atomic.AddInt32(&gc.awaitingMessages, -1)
gc.status = <-gc.statusCh
}
gc.wg.Wait()
return &gc.status
}
return gc.status
// UpdateStatus is used by gc initiated tasks to indicate completion
func (gc *GraphConnector) UpdateStatus(status *support.ConnectorOperationStatus) {
gc.mu.Lock()
defer gc.mu.Unlock()
gc.status = support.MergeStatus(gc.status, *status)
gc.wg.Done()
}
// Status returns the current status of the graphConnector operaion.
func (gc *GraphConnector) Status() *support.ConnectorOperationStatus {
func (gc *GraphConnector) Status() support.ConnectorOperationStatus {
return gc.status
}
// PrintableStatus returns a string formatted version of the GC status.
func (gc *GraphConnector) PrintableStatus() string {
if gc.status == nil {
return ""
}
return gc.status.String()
}
func (gc *GraphConnector) incrementAwaitingMessages() {
atomic.AddInt32(&gc.awaitingMessages, 1)
gc.wg.Add(1)
}
// IsRecoverableError returns true iff error is a RecoverableGCEerror

View File

@ -2,6 +2,7 @@ package connector
import (
"context"
"sync"
"testing"
"github.com/pkg/errors"
@ -93,29 +94,39 @@ func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() {
assert.NotNil(suite.T(), dc)
}
func statusTestTask(gc *GraphConnector, objects, success, folder int) {
status := support.CreateStatus(
context.Background(),
support.Restore,
objects, success, folder,
support.WrapAndAppend(
"tres",
errors.New("three"),
support.WrapAndAppend("arc376", errors.New("one"), errors.New("two")),
),
)
gc.UpdateStatus(status)
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() {
gc := GraphConnector{
statusCh: make(chan *support.ConnectorOperationStatus),
}
suite.Equal(len(gc.PrintableStatus()), 0)
gc := GraphConnector{wg: &sync.WaitGroup{}}
// Two tasks
gc.incrementAwaitingMessages()
gc.incrementAwaitingMessages()
go func() {
status := support.CreateStatus(
context.Background(),
support.Restore,
12, 9, 8,
support.WrapAndAppend(
"tres",
errors.New("three"),
support.WrapAndAppend("arc376", errors.New("one"), errors.New("two")),
),
)
gc.statusCh <- status
}()
// Each helper task processes 4 objects, 1 success, 3 errors, 1 folders
go statusTestTask(&gc, 4, 1, 1)
go statusTestTask(&gc, 4, 1, 1)
gc.AwaitStatus()
suite.Greater(len(gc.PrintableStatus()), 0)
suite.Greater(gc.Status().ObjectCount, 0)
suite.NotEmpty(gc.PrintableStatus())
// Expect 8 objects
suite.Equal(8, gc.Status().ObjectCount)
// Expect 2 success
suite.Equal(2, gc.Status().Successful)
// Expect 2 folders
suite.Equal(2, gc.Status().FolderCount)
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_ErrorChecking() {

View File

@ -64,8 +64,6 @@ func (suite *GraphConnectorIntegrationSuite) TestSetTenantUsers() {
newConnector := GraphConnector{
tenant: "test_tenant",
Users: make(map[string]string, 0),
status: nil,
statusCh: make(chan *support.ConnectorOperationStatus),
credentials: suite.connector.credentials,
}
@ -94,8 +92,9 @@ func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() {
collectionList, err := connector.ExchangeDataCollection(context.Background(), sel.Selector)
assert.NotNil(t, collectionList, "collection list")
assert.NoError(t, err)
assert.True(t, connector.awaitingMessages > 0)
assert.Nil(t, connector.status)
assert.Zero(t, connector.status.ObjectCount)
assert.Zero(t, connector.status.FolderCount)
assert.Zero(t, connector.status.Successful)
streams := make(map[string]<-chan data.Stream)
// Verify Items() call returns an iterable channel(e.g. a channel that has been closed)
@ -105,10 +104,8 @@ func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() {
streams[testName] = temp
}
for i := 0; i < int(connector.awaitingMessages); i++ {
status := connector.AwaitStatus()
assert.NotNil(t, status)
}
status := connector.AwaitStatus()
assert.NotZero(t, status.Successful)
for name, channel := range streams {
suite.T().Run(name, func(t *testing.T) {
@ -294,8 +291,8 @@ func (suite *GraphConnectorIntegrationSuite) TestAccessOfInboxAllUsers() {
// Exchange Functions
//-------------------------------------------------------
// TestCreateAndDeleteMailFolder ensures GraphConnector has the ability
// to create and remove folders within the tenant
// TestCreateAndDeleteMailFolder ensures GraphConnector has the ability
// to create and remove folders within the tenant
func (suite *GraphConnectorIntegrationSuite) TestCreateAndDeleteMailFolder() {
now := time.Now()
folderName := "TestFolder: " + common.FormatSimpleDateTime(now)

View File

@ -34,10 +34,10 @@ type Collection struct {
// M365 IDs of file items within this collection
driveItemIDs []string
// M365 ID of the drive this collection was created from
driveID string
service graph.Service
statusCh chan<- *support.ConnectorOperationStatus
itemReader itemReaderFunc
driveID string
service graph.Service
statusUpdater support.StatusUpdater
itemReader itemReaderFunc
}
// itemReadFunc returns a reader for the specified item
@ -49,15 +49,15 @@ type itemReaderFunc func(
// NewCollection creates a Collection
func NewCollection(folderPath, driveID string, service graph.Service,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) *Collection {
c := &Collection{
folderPath: folderPath,
driveItemIDs: []string{},
driveID: driveID,
service: service,
data: make(chan data.Stream, collectionChannelBufferSize),
statusCh: statusCh,
folderPath: folderPath,
driveItemIDs: []string{},
driveID: driveID,
service: service,
data: make(chan data.Stream, collectionChannelBufferSize),
statusUpdater: statusUpdater,
}
// Allows tests to set a mock populator
c.itemReader = driveItemReader
@ -133,5 +133,5 @@ func (oc *Collection) populateItems(ctx context.Context) {
1, // num folders (always 1)
errs)
logger.Ctx(ctx).Debug(status.String())
oc.statusCh <- status
oc.statusUpdater(status)
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"path/filepath"
"sync"
"testing"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
@ -14,6 +15,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/graph"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
)
@ -40,9 +42,23 @@ func TestOneDriveCollectionSuite(t *testing.T) {
suite.Run(t, new(OneDriveCollectionSuite))
}
// Returns a status update function that signals the specified WaitGroup when it is done
func (suite *OneDriveCollectionSuite) testStatusUpdater(
wg *sync.WaitGroup,
statusToUpdate *support.ConnectorOperationStatus,
) support.StatusUpdater {
return func(s *support.ConnectorOperationStatus) {
suite.T().Logf("Update status %v, count %d, success %d", s, s.ObjectCount, s.Successful)
*statusToUpdate = *s
wg.Done()
}
}
func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
wg := sync.WaitGroup{}
collStatus := support.ConnectorOperationStatus{}
folderPath := "dir1/dir2/dir3"
coll := NewCollection(folderPath, "fakeDriveID", suite, nil)
coll := NewCollection(folderPath, "fakeDriveID", suite, suite.testStatusUpdater(&wg, &collStatus))
require.NotNil(suite.T(), coll)
assert.Equal(suite.T(), filepath.SplitList(folderPath), coll.FullPath())
@ -58,13 +74,16 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
}
// Read items from the collection
wg.Add(1)
readItems := []data.Stream{}
for item := range coll.Items() {
readItems = append(readItems, item)
}
wg.Wait()
// Expect only 1 item
require.Len(suite.T(), readItems, 1)
require.Equal(suite.T(), 1, collStatus.ObjectCount)
require.Equal(suite.T(), 1, collStatus.Successful)
// Validate item info and data
readItem := readItems[0]
@ -82,7 +101,11 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
}
func (suite *OneDriveCollectionSuite) TestOneDriveCollectionReadError() {
coll := NewCollection("folderPath", "fakeDriveID", suite, nil)
wg := sync.WaitGroup{}
collStatus := support.ConnectorOperationStatus{}
wg.Add(1)
coll := NewCollection("folderPath", "fakeDriveID", suite, suite.testStatusUpdater(&wg, &collStatus))
coll.Add("testItemID")
readError := errors.New("Test error")
@ -91,6 +114,9 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollectionReadError() {
return "", nil, readError
}
coll.Items()
wg.Wait()
// Expect no items
require.Len(suite.T(), coll.Items(), 0)
require.Equal(suite.T(), 1, collStatus.ObjectCount)
require.Equal(suite.T(), 0, collStatus.Successful)
}

View File

@ -20,7 +20,7 @@ type Collections struct {
// for a OneDrive folder
collectionMap map[string]data.Collection
service graph.Service
statusCh chan<- *support.ConnectorOperationStatus
statusUpdater support.StatusUpdater
// Track stats from drive enumeration
numItems int
@ -32,13 +32,13 @@ type Collections struct {
func NewCollections(
user string,
service graph.Service,
statusCh chan<- *support.ConnectorOperationStatus,
statusUpdater support.StatusUpdater,
) *Collections {
return &Collections{
user: user,
collectionMap: map[string]data.Collection{},
service: service,
statusCh: statusCh,
statusUpdater: statusUpdater,
}
}
@ -80,7 +80,7 @@ func (c *Collections) updateCollections(ctx context.Context, driveID string, ite
// Create a collection for the parent of this item
collectionPath := *item.GetParentReference().GetPath()
if _, found := c.collectionMap[collectionPath]; !found {
c.collectionMap[collectionPath] = NewCollection(collectionPath, driveID, c.service, c.statusCh)
c.collectionMap[collectionPath] = NewCollection(collectionPath, driveID, c.service, c.statusUpdater)
}
switch {
case item.GetFolder() != nil, item.GetPackage() != nil:
@ -89,7 +89,7 @@ func (c *Collections) updateCollections(ctx context.Context, driveID string, ite
// e.g. a ".folderMetadataFile"
itemPath := path.Join(*item.GetParentReference().GetPath(), *item.GetName())
if _, found := c.collectionMap[itemPath]; !found {
c.collectionMap[itemPath] = NewCollection(itemPath, driveID, c.service, c.statusCh)
c.collectionMap[itemPath] = NewCollection(itemPath, driveID, c.service, c.statusUpdater)
}
case item.GetFile() != nil:
collection := c.collectionMap[collectionPath].(*Collection)

View File

@ -62,6 +62,40 @@ func CreateStatus(
return &status
}
// Function signature for a status updater
// Used to define a function that an async connector task can call
// to on completion with its ConnectorOperationStatus
type StatusUpdater func(*ConnectorOperationStatus)
// MergeStatus combines ConnectorOperationsStatus value into a single status
func MergeStatus(one, two ConnectorOperationStatus) ConnectorOperationStatus {
var hasErrors bool
if one.lastOperation == OpUnknown {
return two
}
if two.lastOperation == OpUnknown {
return one
}
if one.incomplete || two.incomplete {
hasErrors = true
}
status := ConnectorOperationStatus{
lastOperation: one.lastOperation,
ObjectCount: one.ObjectCount + two.ObjectCount,
FolderCount: one.FolderCount + two.FolderCount,
Successful: one.Successful + two.Successful,
errorCount: one.errorCount + two.errorCount,
incomplete: hasErrors,
incompleteReason: one.incompleteReason + " " + two.incompleteReason,
}
return status
}
func (cos *ConnectorOperationStatus) String() string {
message := fmt.Sprintf("Action: %s performed on %d of %d objects within %d directories.", cos.lastOperation.String(),
cos.Successful, cos.ObjectCount, cos.FolderCount)

View File

@ -78,3 +78,61 @@ func (suite *GCStatusTestSuite) TestCreateStatus_InvalidStatus() {
)
})
}
func (suite *GCStatusTestSuite) TestMergeStatus() {
simpleContext := context.Background()
table := []struct {
name string
one ConnectorOperationStatus
two ConnectorOperationStatus
expected statusParams
isIncomplete assert.BoolAssertionFunc
}{
{
name: "Test: Status + unknown",
one: *CreateStatus(simpleContext, Backup, 1, 1, 1, nil),
two: ConnectorOperationStatus{},
expected: statusParams{Backup, 1, 1, 1, nil},
isIncomplete: assert.False,
},
{
name: "Test: unknown + Status",
one: ConnectorOperationStatus{},
two: *CreateStatus(simpleContext, Backup, 1, 1, 1, nil),
expected: statusParams{Backup, 1, 1, 1, nil},
isIncomplete: assert.False,
},
{
name: "Test: Successful + Successful",
one: *CreateStatus(simpleContext, Backup, 1, 1, 1, nil),
two: *CreateStatus(simpleContext, Backup, 3, 3, 3, nil),
expected: statusParams{Backup, 4, 4, 4, nil},
isIncomplete: assert.False,
},
{
name: "Test: Successful + Unsuccessful",
one: *CreateStatus(simpleContext, Backup, 17, 17, 13, nil),
two: *CreateStatus(
simpleContext,
Backup,
12,
9,
8,
WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two"))),
),
expected: statusParams{Backup, 29, 26, 21, nil},
isIncomplete: assert.True,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
returned := MergeStatus(test.one, test.two)
suite.Equal(returned.FolderCount, test.expected.folders)
suite.Equal(returned.ObjectCount, test.expected.objects)
suite.Equal(returned.lastOperation, test.expected.operationType)
suite.Equal(returned.Successful, test.expected.success)
test.isIncomplete(t, returned.incomplete)
})
}
}