Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions cmd/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,7 @@ func (b *blnkInstance) processTransaction(ctx context.Context, t *asynq.Task) er
return nil
}

handled, err := b.blnk.TryRecordQueuedTransactionBatch(ctx, &txn)
if b.cnf.Queue.EnableHotLane && t.Type() == b.cnf.Queue.HotQueueName {
handled, err = b.blnk.TryRecordQueuedTransactionBatchForHotLane(ctx, &txn)
}
if err != nil {
logrus.WithError(err).Warnf("coalesced processing attempt failed for transaction %s", txn.TransactionID)
}
if handled {
return nil
}

_, err = b.blnk.RecordTransaction(ctx, &txn)
_, err = b.blnk.ProcessQueuedTransaction(ctx, &txn, b.cnf.Queue.EnableHotLane && t.Type() == b.cnf.Queue.HotQueueName)
if err != nil {
// Handle reference already used error
if strings.Contains(strings.ToLower(err.Error()), "reference") && strings.Contains(strings.ToLower(err.Error()), "already been used") {
Expand Down Expand Up @@ -136,7 +125,6 @@ func (b *blnkInstance) processTransaction(ctx context.Context, t *asynq.Task) er
return err
}

logrus.Infof("Transaction %s processed successfully", txn.TransactionID)
return nil
}

Expand Down
18 changes: 11 additions & 7 deletions internal/hooks/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (m *redisHookManager) ExecutePreHooks(ctx context.Context, transactionID st
return err
}

return m.executeHooks(ctx, hooks, PreTransaction, transactionID, data)
return m.ExecuteHooks(ctx, hooks, PreTransaction, transactionID, data)
}

// ExecutePostHooks queues all active post-transaction hooks for execution.
Expand All @@ -270,10 +270,10 @@ func (m *redisHookManager) ExecutePostHooks(ctx context.Context, transactionID s
return err
}

return m.executeHooks(ctx, hooks, PostTransaction, transactionID, data)
return m.ExecuteHooks(ctx, hooks, PostTransaction, transactionID, data)
}

// executeHooks marshals the hook data and queues each active hook for execution.
// ExecuteHooks marshals the hook data and queues each active hook for execution.
//
// Parameters:
// - ctx: The context for the operation.
Expand All @@ -284,7 +284,7 @@ func (m *redisHookManager) ExecutePostHooks(ctx context.Context, transactionID s
//
// Returns:
// - error: An error if data marshalling fails.
func (m *redisHookManager) executeHooks(ctx context.Context, hooks []*Hook, hookType HookType, transactionID string, data interface{}) error {
func (m *redisHookManager) ExecuteHooks(ctx context.Context, hooks []*Hook, hookType HookType, transactionID string, data interface{}) error {
dataBytes, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal hook data: %w", err)
Expand Down Expand Up @@ -332,9 +332,13 @@ func (m *redisHookManager) queueHook(ctx context.Context, hook *Hook, payload Ho
return fmt.Errorf("failed to marshal hook task payload: %w", err)
}

conf, err := config.Fetch()
if err != nil {
return fmt.Errorf("failed to fetch config: %w", err)
conf := m.config
if conf == nil {
conf, err = config.Fetch()
if err != nil {
return fmt.Errorf("failed to fetch config: %w", err)
}
m.config = conf
}

// Use webhook queue from config
Expand Down
1 change: 1 addition & 0 deletions internal/hooks/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type HookManager interface {
DeleteHook(ctx context.Context, hookID string) error
GetHook(ctx context.Context, hookID string) (*Hook, error)
ListHooks(ctx context.Context, hookType HookType) ([]*Hook, error)
ExecuteHooks(ctx context.Context, hooks []*Hook, hookType HookType, transactionID string, data interface{}) error
ExecutePreHooks(ctx context.Context, transactionID string, data interface{}) error
ExecutePostHooks(ctx context.Context, transactionID string, data interface{}) error
ProcessHookTask(ctx context.Context, task *asynq.Task) error
Expand Down
84 changes: 38 additions & 46 deletions queue_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,35 @@ import (
)

type QueuedTransactionRecoveryProcessor struct {
blnk *Blnk
batchSize int
maxWorkers int
pollInterval time.Duration
stuckThreshold time.Duration
maxRecoveryAttempts int
stopCh chan struct{}
wg sync.WaitGroup
running bool
mu sync.Mutex
tryBatch func(ctx context.Context, txn *model.Transaction) (bool, error)
tryHotBatch func(ctx context.Context, txn *model.Transaction) (bool, error)
recordTransaction func(ctx context.Context, txn *model.Transaction) (*model.Transaction, error)
blnk *Blnk
batchSize int
maxWorkers int
pollInterval time.Duration
stuckThreshold time.Duration
maxRecoveryAttempts int
stopCh chan struct{}
wg sync.WaitGroup
running bool
mu sync.Mutex
processQueuedTransaction func(ctx context.Context, txn *model.Transaction, hotLane bool) (transactionExecutionResult, error)
}

// NewQueuedTransactionRecoveryProcessor creates the stuck queued-transaction recovery loop
// with conservative single-worker defaults to avoid recovery-induced lock storms.
func NewQueuedTransactionRecoveryProcessor(blnk *Blnk) *QueuedTransactionRecoveryProcessor {
return &QueuedTransactionRecoveryProcessor{
blnk: blnk,
batchSize: 100,
maxWorkers: 1,
pollInterval: 30 * time.Second,
stuckThreshold: 2 * time.Hour,
maxRecoveryAttempts: 3,
stopCh: make(chan struct{}),
tryBatch: blnk.TryRecordQueuedTransactionBatch,
tryHotBatch: blnk.TryRecordQueuedTransactionBatchForHotLane,
recordTransaction: blnk.RecordTransaction,
blnk: blnk,
batchSize: 100,
maxWorkers: 1,
pollInterval: 30 * time.Second,
stuckThreshold: 2 * time.Hour,
maxRecoveryAttempts: 3,
stopCh: make(chan struct{}),
processQueuedTransaction: blnk.processQueuedTransaction,
}
}

// Start begins the background recovery loop for stuck queued transactions.
func (p *QueuedTransactionRecoveryProcessor) Start(ctx context.Context) {
p.mu.Lock()
if p.running {
Expand All @@ -77,6 +76,7 @@ func (p *QueuedTransactionRecoveryProcessor) Start(ctx context.Context) {
logrus.Info("Queued transaction recovery processor started")
}

// Stop shuts down the background recovery loop and waits for the worker goroutine to exit.
func (p *QueuedTransactionRecoveryProcessor) Stop() {
p.mu.Lock()
if !p.running {
Expand All @@ -91,12 +91,14 @@ func (p *QueuedTransactionRecoveryProcessor) Stop() {
logrus.Info("Queued transaction recovery processor stopped")
}

// IsRunning reports whether the recovery processor is currently active.
func (p *QueuedTransactionRecoveryProcessor) IsRunning() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.running
}

// run executes the poll loop that periodically scans for stuck queued transactions.
func (p *QueuedTransactionRecoveryProcessor) run(ctx context.Context) {
ticker := time.NewTicker(p.pollInterval)
defer ticker.Stop()
Expand All @@ -115,6 +117,7 @@ func (p *QueuedTransactionRecoveryProcessor) run(ctx context.Context) {
}
}

// processBatch performs one periodic stuck-queue recovery pass using the configured threshold.
func (p *QueuedTransactionRecoveryProcessor) processBatch(ctx context.Context) {
p.recoverWithThreshold(ctx, p.stuckThreshold)
}
Expand All @@ -130,6 +133,7 @@ func (b *Blnk) RecoverQueuedTransactions(ctx context.Context, threshold time.Dur
return processor.recoverWithThreshold(ctx, threshold), nil
}

// recoverWithThreshold loads currently stuck queued transactions and reprocesses them serially.
func (p *QueuedTransactionRecoveryProcessor) recoverWithThreshold(ctx context.Context, threshold time.Duration) int {
stuckTxns, err := p.blnk.datasource.GetStuckQueuedTransactions(ctx, threshold, p.batchSize)
if err != nil {
Expand All @@ -152,6 +156,8 @@ func (p *QueuedTransactionRecoveryProcessor) recoverWithThreshold(ctx context.Co
return len(stuckTxns)
}

// processStuckTransaction replays one stuck queued transaction, preserving the existing recovery
// attempt tracking and rejection semantics while preferring the shared queued executor path.
func (p *QueuedTransactionRecoveryProcessor) processStuckTransaction(ctx context.Context, stuckTxn *model.Transaction) error {
restoreTransactionFlagsFromMetadata(stuckTxn)

Expand Down Expand Up @@ -197,7 +203,7 @@ func (p *QueuedTransactionRecoveryProcessor) processStuckTransaction(ctx context
}

queueCopy := createQueueCopy(stuckTxn, stuckTxn.Reference)
handled, err := p.tryRecordRecoveredTransaction(ctx, queueCopy)
result, err := p.tryRecordRecoveredTransaction(ctx, queueCopy)
if err != nil {
if isReferenceAlreadyUsedError(err) {
logrus.Infof("Stuck transaction %s already processed (reference %s already used)", stuckTxn.TransactionID, queueCopy.Reference)
Expand All @@ -209,7 +215,7 @@ func (p *QueuedTransactionRecoveryProcessor) processStuckTransaction(ctx context
return err
}

if handled {
if result.usedCoalescing() {
logrus.Infof("Successfully recovered stuck transaction %s via coalesced batch", stuckTxn.TransactionID)
} else {
logrus.Infof("Successfully recovered stuck transaction %s via queue copy %s", stuckTxn.TransactionID, queueCopy.TransactionID)
Expand All @@ -218,29 +224,15 @@ func (p *QueuedTransactionRecoveryProcessor) processStuckTransaction(ctx context
return nil
}

func (p *QueuedTransactionRecoveryProcessor) tryRecordRecoveredTransaction(ctx context.Context, queueCopy *model.Transaction) (bool, error) {
if hotpairs.QueueLaneFromMetadata(queueCopy.MetaData) == hotpairs.LaneHot {
handled, err := p.tryHotBatch(ctx, queueCopy)
if err != nil {
logrus.WithError(err).Warnf("coalesced hot-lane recovery attempt failed for transaction %s", queueCopy.TransactionID)
}
if handled {
return true, nil
}
} else {
handled, err := p.tryBatch(ctx, queueCopy)
if err != nil {
logrus.WithError(err).Warnf("coalesced recovery attempt failed for transaction %s", queueCopy.TransactionID)
}
if handled {
return true, nil
}
}

_, err := p.recordTransaction(ctx, queueCopy)
return false, err
// tryRecordRecoveredTransaction routes stuck-transaction replay through the shared queued
// processing path, selecting hot-lane execution when the queued metadata requires it.
func (p *QueuedTransactionRecoveryProcessor) tryRecordRecoveredTransaction(ctx context.Context, queueCopy *model.Transaction) (transactionExecutionResult, error) {
hotLane := hotpairs.QueueLaneFromMetadata(queueCopy.MetaData) == hotpairs.LaneHot
return p.processQueuedTransaction(ctx, queueCopy, hotLane)
}

// updateRecoveryMetadata stores recovery attempt and status information on the stuck parent
// transaction so later recovery passes can make bounded retry decisions.
func (p *QueuedTransactionRecoveryProcessor) updateRecoveryMetadata(ctx context.Context, txn *model.Transaction, attempts int, status string) {
if txn.MetaData == nil {
txn.MetaData = make(map[string]interface{})
Expand Down
53 changes: 15 additions & 38 deletions queue_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,10 @@ func TestProcessStuckTransaction_UsesCoalescingBeforeDirectReplay(t *testing.T)
MetaData: map[string]interface{}{},
}

var recorded bool
processor.tryBatch = func(ctx context.Context, txn *model.Transaction) (bool, error) {
return true, nil
}
processor.tryHotBatch = func(ctx context.Context, txn *model.Transaction) (bool, error) {
t.Fatalf("hot lane coalescing should not be used for normal lane recovery")
return false, nil
}
processor.recordTransaction = func(ctx context.Context, txn *model.Transaction) (*model.Transaction, error) {
recorded = true
return txn, nil
var hotLane bool
processor.processQueuedTransaction = func(ctx context.Context, txn *model.Transaction, gotHotLane bool) (transactionExecutionResult, error) {
hotLane = gotHotLane
return transactionExecutionResult{mode: transactionExecutionModeQueuedBatch, transaction: txn}, nil
}

mockDS.On("UpdateTransactionMetadata", mock.Anything, stuckTxn.TransactionID, mock.MatchedBy(func(metadata map[string]interface{}) bool {
Expand All @@ -52,7 +45,7 @@ func TestProcessStuckTransaction_UsesCoalescingBeforeDirectReplay(t *testing.T)

err := processor.processStuckTransaction(context.Background(), stuckTxn)
assert.NoError(t, err)
assert.False(t, recorded)
assert.False(t, hotLane)
mockDS.AssertExpectations(t)
}

Expand All @@ -73,18 +66,10 @@ func TestProcessStuckTransaction_UsesHotLaneCoalescingWhenMarkedHot(t *testing.T
},
}

var normalBatchCalled bool
var recorded bool
processor.tryBatch = func(ctx context.Context, txn *model.Transaction) (bool, error) {
normalBatchCalled = true
return false, nil
}
processor.tryHotBatch = func(ctx context.Context, txn *model.Transaction) (bool, error) {
return true, nil
}
processor.recordTransaction = func(ctx context.Context, txn *model.Transaction) (*model.Transaction, error) {
recorded = true
return txn, nil
var hotLane bool
processor.processQueuedTransaction = func(ctx context.Context, txn *model.Transaction, gotHotLane bool) (transactionExecutionResult, error) {
hotLane = gotHotLane
return transactionExecutionResult{mode: transactionExecutionModeHotQueuedBatch, transaction: txn}, nil
}

mockDS.On("UpdateTransactionMetadata", mock.Anything, stuckTxn.TransactionID, mock.MatchedBy(func(metadata map[string]interface{}) bool {
Expand All @@ -93,8 +78,7 @@ func TestProcessStuckTransaction_UsesHotLaneCoalescingWhenMarkedHot(t *testing.T

err := processor.processStuckTransaction(context.Background(), stuckTxn)
assert.NoError(t, err)
assert.False(t, normalBatchCalled)
assert.False(t, recorded)
assert.True(t, hotLane)
mockDS.AssertExpectations(t)
}

Expand All @@ -113,17 +97,10 @@ func TestProcessStuckTransaction_FallsBackToDirectReplayWhenBatchNotHandled(t *t
MetaData: map[string]interface{}{},
}

var recorded bool
processor.tryBatch = func(ctx context.Context, txn *model.Transaction) (bool, error) {
return false, nil
}
processor.tryHotBatch = func(ctx context.Context, txn *model.Transaction) (bool, error) {
t.Fatalf("hot lane coalescing should not be used for normal lane recovery")
return false, nil
}
processor.recordTransaction = func(ctx context.Context, txn *model.Transaction) (*model.Transaction, error) {
recorded = true
return txn, nil
var hotLane bool
processor.processQueuedTransaction = func(ctx context.Context, txn *model.Transaction, gotHotLane bool) (transactionExecutionResult, error) {
hotLane = gotHotLane
return transactionExecutionResult{mode: transactionExecutionModeSingle, transaction: txn}, nil
}

mockDS.On("UpdateTransactionMetadata", mock.Anything, stuckTxn.TransactionID, mock.MatchedBy(func(metadata map[string]interface{}) bool {
Expand All @@ -132,6 +109,6 @@ func TestProcessStuckTransaction_FallsBackToDirectReplayWhenBatchNotHandled(t *t

err := processor.processStuckTransaction(context.Background(), stuckTxn)
assert.NoError(t, err)
assert.True(t, recorded)
assert.False(t, hotLane)
mockDS.AssertExpectations(t)
}
Loading
Loading