diff --git a/CHANGELOG.md b/CHANGELOG.md index f8d4ffa72..5e4217d78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Return a ServiceNotEnabled error when a tenant has no active SharePoint license. +- Added retries for http/2 stream connection failures when downloading large item content. ### Known issues - If a link share is created for an item with inheritance disabled diff --git a/src/cmd/getM365/onedrive/get_item.go b/src/cmd/getM365/onedrive/get_item.go index 4e29098fd..05b5395ce 100644 --- a/src/cmd/getM365/onedrive/get_item.go +++ b/src/cmd/getM365/onedrive/get_item.go @@ -194,13 +194,13 @@ func getDriveItemContent( resp, err := gr.Request(ctx, http.MethodGet, *url, nil, nil) if err != nil { - return nil, clues.New("downloading item").With("error", err) + return nil, clues.New("requesting item content").With("error", err) } defer resp.Body.Close() content, err := io.ReadAll(resp.Body) if err != nil { - return nil, clues.New("read downloaded item").With("error", err) + return nil, clues.New("reading item content").With("error", err) } return content, nil diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index 7003f3079..8a55d3ca6 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -4,11 +4,15 @@ import ( "context" "io" "net/http" + "regexp" + "time" "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/version" + "github.com/alcionai/corso/src/pkg/logger" ) // --------------------------------------------------------------------------- @@ -70,6 +74,8 @@ func NewNoTimeoutHTTPWrapper(opts ...Option) *httpWrapper { // requests // --------------------------------------------------------------------------- +var streamErrRE = regexp.MustCompile(`stream error: stream ID \d+; .+; received from peer`) + // Request does the provided request. func (hw httpWrapper) Request( ctx context.Context, @@ -91,7 +97,35 @@ func (hw httpWrapper) Request( // See https://learn.microsoft.com/en-us/sharepoint/dev/general-development/how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online#how-to-decorate-your-http-traffic req.Header.Set("User-Agent", "ISV|Alcion|Corso/"+version.Version) - resp, err := hw.client.Do(req) + var resp *http.Response + + i := 0 + + // stream errors from http/2 will fail before we reach + // client middleware handling, therefore we don't get to + // make use of the retry middleware. This external + // retry wrapper is unsophisticated, but should only + // retry in the event of a `stream error`, which is not + // a common expectation. + for i < 3 { + ictx := clues.Add(ctx, "request_retry_iter", i) + + resp, err = hw.client.Do(req) + if err != nil && !streamErrRE.MatchString(err.Error()) { + return nil, Stack(ictx, err) + } + + if err == nil { + break + } + + logger.Ctx(ictx).Debug("retrying after stream error") + events.Inc(events.APICall, "streamerror") + + time.Sleep(3 * time.Second) + i++ + } + if err != nil { return nil, Stack(ctx, err) } diff --git a/src/internal/m365/onedrive/collection.go b/src/internal/m365/onedrive/collection.go index f5d98da93..e393554ab 100644 --- a/src/internal/m365/onedrive/collection.go +++ b/src/internal/m365/onedrive/collection.go @@ -271,21 +271,20 @@ func (oc *Collection) getDriveItemContent( var ( itemID = ptr.Val(item.GetId()) itemName = ptr.Val(item.GetName()) - el = errs.Local() ) itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID) if err != nil { if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) { logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware") - el.AddSkip(ctx, fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, graph.ItemInfo(item))) + errs.AddSkip(ctx, fault.FileSkip(fault.SkipMalware, driveID, itemID, itemName, graph.ItemInfo(item))) return nil, clues.Wrap(err, "malware item").Label(graph.LabelsSkippable) } if clues.HasLabel(err, graph.LabelStatus(http.StatusNotFound)) || graph.IsErrDeletedInFlight(err) { logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipNotFound).Info("item not found") - el.AddSkip(ctx, fault.FileSkip(fault.SkipNotFound, driveID, itemID, itemName, graph.ItemInfo(item))) + errs.AddSkip(ctx, fault.FileSkip(fault.SkipNotFound, driveID, itemID, itemName, graph.ItemInfo(item))) return nil, clues.Wrap(err, "deleted item").Label(graph.LabelsSkippable) } @@ -300,13 +299,13 @@ func (oc *Collection) getDriveItemContent( // restore, or we have to handle it separately by somehow // deleting the entire collection. logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipBigOneNote).Info("max OneNote file size exceeded") - el.AddSkip(ctx, fault.FileSkip(fault.SkipBigOneNote, driveID, itemID, itemName, graph.ItemInfo(item))) + errs.AddSkip(ctx, fault.FileSkip(fault.SkipBigOneNote, driveID, itemID, itemName, graph.ItemInfo(item))) return nil, clues.Wrap(err, "max oneNote item").Label(graph.LabelsSkippable) } - logger.CtxErr(ctx, err).Error("downloading item") - el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + logger.CtxErr(ctx, err).Error("downloading item content") + errs.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) // return err, not el.Err(), because the lazy reader needs to communicate to // the data consumer that this item is unreadable, regardless of the fault state. @@ -416,7 +415,6 @@ type driveStats struct { // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { var ( - el = errs.Local() stats driveStats wg sync.WaitGroup ) @@ -442,7 +440,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { defer close(semaphoreCh) for _, item := range oc.driveItems { - if el.Failure() != nil { + if errs.Failure() != nil { break } @@ -481,7 +479,6 @@ func (oc *Collection) populateDriveItem( errs *fault.Bus, ) { var ( - el = errs.Local() itemID = ptr.Val(item.GetId()) itemName = ptr.Val(item.GetName()) itemSize = ptr.Val(item.GetSize()) @@ -518,7 +515,7 @@ func (oc *Collection) populateDriveItem( // Fetch metadata for the file itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item) if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) + errs.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) return } @@ -546,7 +543,7 @@ func (oc *Collection) populateDriveItem( itemExtensionFactory) if err != nil { err := clues.Wrap(err, "adding extensions").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) + errs.AddRecoverable(ctx, err) return nil, err } diff --git a/src/pkg/fault/fault.go b/src/pkg/fault/fault.go index d69569edc..97cc0bae3 100644 --- a/src/pkg/fault/fault.go +++ b/src/pkg/fault/fault.go @@ -137,9 +137,9 @@ func (e *Bus) logAndAddRecoverable(ctx context.Context, err error, skip int) { isFail := e.addRecoverableErr(err) if isFail { - log.Error("recoverable error") + log.Errorf("recoverable error: %v", err) } else { - log.Info("recoverable error") + log.Infof("recoverable error: %v", err) } } diff --git a/src/pkg/selectors/selectors.go b/src/pkg/selectors/selectors.go index 86a1cb56c..1e148e04b 100644 --- a/src/pkg/selectors/selectors.go +++ b/src/pkg/selectors/selectors.go @@ -34,7 +34,7 @@ var serviceToPathType = map[service]path.ServiceType{ var ( ErrorBadSelectorCast = clues.New("wrong selector service type") - ErrorNoMatchingItems = clues.New("no items match the specified selectors") + ErrorNoMatchingItems = clues.New("no items match the provided selectors") ErrorUnrecognizedService = clues.New("unrecognized service") )