Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 143 additions & 29 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ type Datastore struct {
// keep track of children to be fetched so only one job does every
// child
queuedChildren *cidSafeSet
// keep track of nodes currently being processed across all active
// walks so isProcessedOrInFlight returns true for in-flight nodes,
// preventing duplicate concurrent walks of the same branch.
inFlightCids *cidSafeSet
}

type dagJob struct {
Expand All @@ -252,6 +256,7 @@ type dagJob struct {
root Head // the root of the branch we are walking down
delta Delta // the current delta
node ipld.Node // the current ipld Node
walk *walkState // shared state for the branch-walk this job belongs to
}

type broadcastBatchHead struct {
Expand Down Expand Up @@ -344,6 +349,7 @@ func New(
jobQueue: make(chan *dagJob, opts.NumWorkers),
sendJobs: make(chan *dagJob),
queuedChildren: newCidSafeSet(),
inFlightCids: newCidSafeSet(),
}

err = dstore.applyMigrations(ctx)
Expand Down Expand Up @@ -461,11 +467,11 @@ func (store *Datastore) handleNext(ctx context.Context) {

// markHeadsAsSeen asap so that we don't rebroadcast them.
for _, heads := range receivedHeads {
store.seenHeadsMux.Lock()
for _, head := range heads {
store.seenHeadsMux.Lock()
store.seenHeads[head.Cid] = struct{}{}
store.seenHeadsMux.Unlock()
}
store.seenHeadsMux.Unlock()
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -720,7 +726,7 @@ func (store *Datastore) rebroadcastHeads(ctx context.Context) {
var headsToBroadcast []Head
store.seenHeadsMux.RLock()
{
headsToBroadcast = make([]Head, 0, len(store.seenHeads))
headsToBroadcast = make([]Head, 0, len(heads))
for _, h := range heads {
if _, ok := store.seenHeads[h.Cid]; !ok {
headsToBroadcast = append(headsToBroadcast, h)
Expand Down Expand Up @@ -773,7 +779,7 @@ func (store *Datastore) handleBlock(ctx context.Context, h Head) error {
// This includes the case when the block is a current
// head.
c := h.Cid
isProcessed, err := store.isProcessed(ctx, c)
isProcessed, err := store.isProcessedOrInFlight(ctx, c)
if err != nil {
return fmt.Errorf("error checking for known block %s: %w", c, err)
}
Expand All @@ -797,10 +803,28 @@ func (store *Datastore) handleBranch(ctx context.Context, head Head, c cid.Cid)
dg = &crdtNodeGetter{NodeGetter: sessionMaker.Session(cctx)}
}

walk := newWalkState(cancel)
defer func() {
// Remove all in-flight CIDs for this walk regardless of outcome.
// On success they are already on disk; on failure they are not, so
// removing them lets the next broadcast retry cleanly.
for _, c := range walk.pending.Keys() {
store.inFlightCids.Remove(c)
}
}()

var session sync.WaitGroup
err := store.sendNewJobs(ctx, &session, dg, head, []cid.Cid{c})
err := store.sendNewJobs(cctx, &session, dg, head, []cid.Cid{c}, walk)
session.Wait()
return err

if err != nil {
return err
}
if walkErr := walk.err(); walkErr != nil {
return walkErr
}

return store.commitProcessed(ctx, walk.pending)
}

// dagWorker should run in its own goroutine. Workers are launched during
Expand All @@ -822,18 +846,19 @@ func (store *Datastore) dagWorker() {
job.root,
job.delta,
job.node,
job.walk,
)
if err != nil {
store.logger.Error(err)
store.MarkDirty(ctx)
job.walk.recordError(err)
job.session.Done()
continue
}
go func(j *dagJob) {
err := store.sendNewJobs(ctx, j.session, j.nodeGetter, j.root, children)
err := store.sendNewJobs(ctx, j.session, j.nodeGetter, j.root, children, j.walk)
if err != nil {
store.logger.Error(err)
store.MarkDirty(ctx)
j.walk.recordError(err)
}
j.session.Done()
}(job)
Expand All @@ -843,7 +868,7 @@ func (store *Datastore) dagWorker() {
// sendNewJobs calls getDeltas (GetMany) on the crdtNodeGetter with the given
// children and sends each response to the workers. It will block until all
// jobs have been queued.
func (store *Datastore) sendNewJobs(ctx context.Context, session *sync.WaitGroup, ng *crdtNodeGetter, root Head, children []cid.Cid) error {
func (store *Datastore) sendNewJobs(ctx context.Context, session *sync.WaitGroup, ng *crdtNodeGetter, root Head, children []cid.Cid, walk *walkState) error {
if len(children) == 0 {
return nil
}
Expand Down Expand Up @@ -888,6 +913,7 @@ loop:
root: root,
delta: delta,
node: deltaOpt.node,
walk: walk,
}
select {
case store.sendJobs <- job:
Expand Down Expand Up @@ -922,10 +948,6 @@ func (store *Datastore) sendJobWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
if len(store.sendJobs) > 0 {
// we left something in the queue
store.MarkDirty(ctx)
}
close(store.jobQueue)
return
case j := <-store.sendJobs:
Expand All @@ -942,8 +964,44 @@ func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error
return store.store.Has(ctx, store.processedBlockKey(c))
}

func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
return store.store.Put(ctx, store.processedBlockKey(c), nil)
func (store *Datastore) isProcessedOrInFlight(ctx context.Context, c cid.Cid) (bool, error) {
if store.inFlightCids.Has(c) {
return true, nil
}
return store.isProcessed(ctx, c)
}

// commitProcessed writes all CIDs in the set to the processed-blocks namespace.
// Uses a batch write when the underlying store supports it.
func (store *Datastore) commitProcessed(ctx context.Context, pending *cidSafeSet) error {
cids := pending.Keys()
if len(cids) == 0 {
return nil
}

var w ds.Write = store.store
var batch ds.Batch
if bds, ok := store.store.(ds.Batching); ok {
var err error
batch, err = bds.Batch(ctx)
if err != nil {
return fmt.Errorf("error creating batch for processed blocks: %w", err)
}
w = batch
}

for _, c := range cids {
if err := w.Put(ctx, store.processedBlockKey(c), nil); err != nil {
return fmt.Errorf("error marking %s as processed: %w", c, err)
}
}

if batch != nil {
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("error committing processed blocks batch: %w", err)
}
}
return nil
}

func (store *Datastore) dirtyKey() ds.Key {
Expand Down Expand Up @@ -979,7 +1037,7 @@ func (store *Datastore) MarkClean(ctx context.Context) {

// processNode merges the delta in a node and has the logic about what to do
// then.
func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, root Head, delta Delta, node ipld.Node) ([]cid.Cid, error) {
func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, root Head, delta Delta, node ipld.Node, walk *walkState) ([]cid.Cid, error) {
// First, merge the delta in this node.
current := node.Cid()
blockKey := dshelp.MultihashToDsKey(current.Hash()).String()
Expand All @@ -991,13 +1049,22 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo
return nil, fmt.Errorf("error merging delta from %s: %w", current, err)
}

// Record that we have processed the node so that any other worker
// can skip it.
err = store.markProcessed(ctx, current)
if err != nil {
// marking as dirty here will not help, as we have not made this block a head, so we will not re-traverse it when fixing the datastore.
return nil, fmt.Errorf("error recording %s as processed: %w", current, err)
}
// Mark this node as in-flight globally and track it in the walk's
// pending set. inFlightCids makes isProcessedOrInFlight return true for
// the duration of this walk, preventing concurrent broadcasts from
// starting a duplicate walk for the same branch. pending is used to
// commit all CIDs to disk atomically at the end of a successful walk and
// to clean up inFlightCids when the walk finishes (success or failure).
//
// Invariant: during an active walk, the CRDT set may contain merged
// entries for CIDs that have no corresponding processedBlock key yet.
// This is intentional — processedBlock is written only when the whole
// walk succeeds (commitProcessed). If the walk fails, the merged set
// entries remain but the CIDs are not in processedBlocks, so the next
// broadcast will retry cleanly. Re-merging on retry is safe because
// CRDT joins are idempotent.
store.inFlightCids.Visit(current)
walk.pending.Visit(current)

// Remove from the set that has the children which are queued for
// processing.
Expand Down Expand Up @@ -1032,11 +1099,8 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo
child := l.Cid

oldHead, isHead := store.heads.Get(ctx, child)
if err != nil {
return nil, fmt.Errorf("error checking if %s is head: %w", child, err)
}

isProcessed, err := store.isProcessed(ctx, child)
isProcessed, err := store.isProcessedOrInFlight(ctx, child)
if err != nil {
return nil, fmt.Errorf("error checking for known block %s: %w", child, err)
}
Expand Down Expand Up @@ -1075,8 +1139,8 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo
// from processing the other links.
store.logger.Error(fmt.Errorf("error adding head %s: %w", root, err))
}
addedAsHead = true
}
addedAsHead = true
continue
}

Expand Down Expand Up @@ -1538,12 +1602,19 @@ func (store *Datastore) addDAGNode(ctx context.Context, delta Delta) (Head, []He
// returning. Since our block references current heads, children
// should be empty
store.logger.Debugf("processing generated block %s", nd.Cid())
localWalk := newWalkState(func() {})
defer func() {
for _, c := range localWalk.pending.Keys() {
store.inFlightCids.Remove(c)
}
}()
children, err := store.processNode(
ctx,
&crdtNodeGetter{store.dagService},
newHead,
delta,
nd,
localWalk,
)
if err != nil {
// store.MarkDirty(ctx) // Keep disabled: Since we are
Expand All @@ -1556,6 +1627,9 @@ func (store *Datastore) addDAGNode(ctx context.Context, delta Delta) (Head, []He
// datastore dirty.
return newHead, nil, fmt.Errorf("error processing new block: %w", err)
}
if err := store.commitProcessed(ctx, localWalk.pending); err != nil {
return newHead, nil, fmt.Errorf("error committing processed block %s: %w", nd.Cid(), err)
}
if len(children) != 0 {
store.logger.Warnf("bug: created a block to unknown children")
}
Expand Down Expand Up @@ -1915,3 +1989,43 @@ func (s *cidSafeSet) Has(c cid.Cid) (ok bool) {
s.mux.RUnlock()
return
}

func (s *cidSafeSet) Keys() []cid.Cid {
s.mux.RLock()
defer s.mux.RUnlock()
keys := make([]cid.Cid, 0, len(s.set))
for c := range s.set {
keys = append(keys, c)
}
return keys
}

// walkState tracks the in-flight state for a single branch-walk session.
// Processed CIDs are accumulated in memory and only committed to disk when
// the entire walk succeeds. The first error encountered (in any worker) is
// stored so handleBranch can decide whether to commit or discard.
type walkState struct {
pending *cidSafeSet
cancel context.CancelFunc
mu sync.RWMutex
firstErr error
}

func newWalkState(cancel context.CancelFunc) *walkState {
return &walkState{pending: newCidSafeSet(), cancel: cancel}
}

func (w *walkState) recordError(err error) {
w.mu.Lock()
if w.firstErr == nil {
w.firstErr = err
w.cancel()
}
w.mu.Unlock()
}

func (w *walkState) err() error {
w.mu.RLock()
defer w.mu.RUnlock()
return w.firstErr
}
Loading