The job system uses the cja framework to process threads asynchronously. Each job execution handles one "step" of a thread, then re-enqueues itself for the next step unless the thread is complete.
The main job that drives thread execution.
Input:
struct ProcessThreadStepInput {
thread_id: Uuid,
}Logic:
- Load thread from DB and check status
- Lock thread (set status to 'running')
- Process any
pending_child_resultsand create thread_result Stitches - Find the last Stitch in the thread
- Based on the last Stitch type:
- If LLM response with tool call: Execute the requested tool
- If tool output: Send to LLM for next action
- If no Stitches yet: Send initial prompt to LLM with goal
- Create new Stitch with the result
- Handle any spawn_child_thread calls by creating new threads
- If complete_thread was called: Update thread status and notify parent
- Otherwise: Re-enqueue ProcessThreadStep for this thread_id
- Unlock thread (set status back to 'pending' or 'waiting')
Handles child thread completion notifications.
Input:
struct NotifyParentThreadInput {
child_thread_id: Uuid,
result_summary: String,
result_data: Option<serde_json::Value>,
}Logic:
- Lock parent thread row
- Check if parent has a running Stitch
- If running: Append to
pending_child_results - If not: Create thread_result Stitch immediately
- If parent was waiting for this child: Enqueue ProcessThreadStep for parent
Example cron job that creates threads on a schedule.
Schedule: Daily at 9 AM Logic:
- Create new Thread with goal "Generate daily standup report"
- Set initial tasks
- Enqueue ProcessThreadStep for the new thread
Monitors thread health and handles stuck threads.
Schedule: Every 5 minutes Logic:
- Find threads stuck in 'running' status for > 10 minutes
- Reset their status to 'pending'
- Re-enqueue ProcessThreadStep for each
- Alert on threads that have failed multiple times
CreateDailyStandup (cron)
│
├─> Creates Thread
│
└─> Enqueues ProcessThreadStep
│
├─> Processes one step
│
├─> If spawns child:
│ ├─> Creates child Thread
│ └─> Enqueues ProcessThreadStep (child)
│
├─> If completes:
│ └─> Enqueues NotifyParentThread
│
└─> Otherwise:
└─> Re-enqueues ProcessThreadStep (self)
- ProcessThreadStep: Max 3 retries with exponential backoff
- Tool execution failures: Store error in Stitch, let LLM decide next action
- LLM API failures: Retry with backoff, eventually mark thread as failed
- Always use DB transactions when updating thread state
- Use row-level locks to prevent concurrent modifications
- Timeout stuck threads via ThreadHealthCheck cron
- Thread creation rate
- Average thread completion time
- Tool usage frequency
- Child thread spawn rate
- Error rates by thread goal type
- Log thread state transitions
- Log LLM prompts and responses (with sampling)
- Log tool executions and results
- Log parent-child relationships
- Priority Queues: High-priority threads process first
- Resource Limits: Max concurrent threads per goal type
- Thread Pooling: Reuse completed threads for similar goals
- Webhooks: Notify external systems on thread completion
- Batch Processing: Process multiple Stitches in one job execution for efficiency