Hacky prototype for conv KAI

This commit is contained in:
Abhishek Pandey 2023-12-19 17:43:50 -08:00
parent 276ace62f0
commit c5cf93deb3
2 changed files with 143 additions and 56 deletions

View File

@ -4,7 +4,6 @@ import (
"context"
"github.com/alcionai/clues"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/pii"
"github.com/alcionai/corso/src/internal/common/ptr"
@ -174,7 +173,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer](
continue
}
added := str.SliceToMap(maps.Keys(addAndRem.Added))
added := addAndRem.Added
removed := str.SliceToMap(addAndRem.Removed)
cl.Add(count.ItemsAdded, int64(len(added)))

View File

@ -3,9 +3,11 @@ package groups
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/alcionai/clues"
kjson "github.com/microsoft/kiota-serialization-json-go"
@ -17,6 +19,7 @@ import (
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
@ -35,7 +38,7 @@ type Collection[C graph.GetIDer, I groupsItemer] struct {
contains container[C]
// added is a list of existing item IDs that were added to a container
added map[string]struct{}
added map[string]time.Time
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
@ -54,7 +57,7 @@ func NewCollection[C graph.GetIDer, I groupsItemer](
baseCol data.BaseCollection,
getAndAugment getItemAndAugmentInfoer[C, I],
protectedResource string,
added map[string]struct{},
added map[string]time.Time,
removed map[string]struct{},
contains container[C],
statusUpdater support.StatusUpdater,
@ -135,81 +138,97 @@ func (col *Collection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) {
}
// add any new items
for id := range col.added {
for id, modTime := range col.added {
if el.Failure() != nil {
break
}
wg.Add(1)
semaphoreCh <- struct{}{}
col.stream <- data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[C, I]{
modTime: modTime,
getAndAugment: col.getAndAugment,
userID: col.protectedResource,
itemID: id,
containerIDs: col.FullPath().Folders(),
contains: col.contains,
parentPath: col.LocationPath().String(),
},
id,
modTime,
col.Counter,
el)
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
// wg.Add(1)
// semaphoreCh <- struct{}{}
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
// go func(id string) {
// defer wg.Done()
// defer func() { <-semaphoreCh }()
item, info, err := col.getAndAugment.getItem(
ctx,
col.protectedResource,
col.FullPath().Folders(),
id)
if err != nil {
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
// writer := kjson.NewJsonSerializationWriter()
// defer writer.Close()
return
}
// item, info, err := col.getAndAugment.getItem(
// ctx,
// col.protectedResource,
// col.FullPath().Folders(),
// id)
// if err != nil {
// err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
// el.AddRecoverable(ctx, err)
col.getAndAugment.augmentItemInfo(info, col.contains.container)
// return
// }
if err := writer.WriteObjectValue("", item); err != nil {
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
// col.getAndAugment.augmentItemInfo(info, col.contains.container)
return
}
// if err := writer.WriteObjectValue("", item); err != nil {
// err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
// el.AddRecoverable(ctx, err)
itemData, err := writer.GetSerializedContent()
if err != nil {
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
// return
// }
return
}
// itemData, err := writer.GetSerializedContent()
// if err != nil {
// err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
// el.AddRecoverable(ctx, err)
info.ParentPath = col.LocationPath().String()
// return
// }
storeItem, err := data.NewPrefetchedItemWithInfo(
io.NopCloser(bytes.NewReader(itemData)),
id,
details.ItemInfo{Groups: info})
if err != nil {
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
// info.ParentPath = col.LocationPath().String()
return
}
// storeItem, err := data.NewPrefetchedItemWithInfo(
// io.NopCloser(bytes.NewReader(itemData)),
// id,
// details.ItemInfo{Groups: info})
// if err != nil {
// err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
// el.AddRecoverable(ctx, err)
col.stream <- storeItem
// return
// }
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size)
// col.stream <- storeItem
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
}
// atomic.AddInt64(&streamedItems, 1)
// atomic.AddInt64(&totalBytes, info.Size)
col.Counter.Add(count.StreamBytesAdded, info.Size)
// if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
// logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
// }
if colProgress != nil {
colProgress <- struct{}{}
}
}(id)
// col.Counter.Add(count.StreamBytesAdded, info.Size)
if colProgress != nil {
colProgress <- struct{}{}
}
// }(id)
}
wg.Wait()
// wg.Wait()
}
// finishPopulation is a utility function used to close a Collection's data channel
@ -237,3 +256,72 @@ func (col *Collection[C, I]) finishPopulation(
col.statusUpdater(status)
}
type lazyItemGetter[C graph.GetIDer, I groupsItemer] struct {
getAndAugment getItemAndAugmentInfoer[C, I]
userID string
itemID string
containerIDs path.Elements
parentPath string
modTime time.Time
contains container[C]
}
func (lig *lazyItemGetter[C, I]) GetData(
ctx context.Context,
errs *fault.Bus,
) (io.ReadCloser, *details.ItemInfo, bool, error) {
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
fmt.Println("getdata itemID: ", lig.itemID)
item, info, err := lig.getAndAugment.getItem(
ctx,
lig.userID,
lig.containerIDs,
lig.itemID)
if err != nil {
// If an item was deleted then return an empty file so we don't fail
// the backup and return a sentinel error when asked for ItemInfo so
// we don't display the item in the backup.
//
// The item will be deleted from kopia on the next backup when the
// delta token shows it's removed.
if graph.IsErrDeletedInFlight(err) {
logger.CtxErr(ctx, err).Info("item not found")
return nil, nil, true, nil
}
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
lig.getAndAugment.augmentItemInfo(info, lig.contains.container)
if err := writer.WriteObjectValue("", item); err != nil {
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
itemData, err := writer.GetSerializedContent()
if err != nil {
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
info.ParentPath = lig.parentPath
// Update the mod time to what we already told kopia about. This is required
// for proper details merging.
info.Modified = lig.modTime
return io.NopCloser(bytes.NewReader(itemData)),
&details.ItemInfo{Groups: info},
false,
nil
}