queue.Queue. This design keeps all execution in a single process, allowing AI models to be loaded once and shared across all workers.
Architecture overview
The job queue system consists of three main components working together:| Component | Role |
|---|---|
| JobQueueManager | Orchestrates the entire system, manages workers and master thread |
| Master thread | Fetches jobs from the execution plan and adds them to the queue |
| Worker threads | Pull jobs from the queue and execute workflows |
| JobQueue | Thread-safe queue with deduplication and active job tracking |
Benefits of this architecture
No external dependencies
Uses Python’s built-in queue module. No Redis, Celery, or external services required.
Shared model memory
All workers share the same process memory, so AI models are loaded once and shared.
Race condition prevention
Single process eliminates race conditions that occur with multi-process architectures.
Production stable
Built-in Python threading is battle-tested and production-ready.
Components
Master thread
The master thread fetches jobs from the execution plan and adds them to the queue. Responsibilities:- Query execution plan for ready rules every 60 seconds
- Deduplicate jobs before adding to queue
- Monitor memory and pause fetching when memory is high
- Log statistics about job processing
Worker threads
Worker threads pull jobs from the queue and execute them. Responsibilities:- Pull jobs from queue (blocking until available)
- Mark jobs as running in execution plan
- Execute workflow via the workflow engine
- Handle errors and cleanup
Job queue
The job queue provides thread-safe job management with deduplication. Features:- Thread-safe using Python’s
queue.Queue - HashMap for O(1) duplicate detection
- Active job tracking
- Recently processed tracking (prevents immediate re-queue)
Memory throttling
The job queue implements memory throttling to prevent out-of-memory crashes.Thresholds
| Resource | Throttle threshold | Resume threshold |
|---|---|---|
| RAM | 90% | 80% |
| GPU | 85% | 75% |
| Swap | 70% | N/A |
Behavior
When memory exceeds thresholds:- Master thread pauses job fetching
- Worker threads pause pulling new jobs
- System logs warning with memory statistics
- Resume when memory drops below resume threshold
Configuration
Override thresholds via environment variables:| Variable | Default | Description |
|---|---|---|
MEMORY_THROTTLE_THRESHOLD | 90 | RAM percentage to trigger throttle |
MEMORY_RESUME_THRESHOLD | 80 | RAM percentage to resume |
GPU_THROTTLE_THRESHOLD | 85 | GPU memory percentage to trigger throttle |
GPU_RESUME_THRESHOLD | 75 | GPU memory percentage to resume |
MEMORY_CHECK_INTERVAL | 5 | Seconds between memory checks |
Single-job mode
Workers can operate in single-job mode for guaranteed memory cleanup.How it works
WhenWORKER_SINGLE_JOB_MODE=true (default):
- Worker executes one job
- Worker thread exits after job completion
- JobQueueManager automatically restarts the worker
- New thread starts with fresh memory state
Benefits
- Complete cleanup of thread-local storage
- Prevents memory leaks from accumulating
- Ensures fresh state for each job
Configuration
| Variable | Default | Description |
|---|---|---|
WORKER_SINGLE_JOB_MODE | true | Exit worker after each job |
Job lifecycle
States
| State | Location | Description |
|---|---|---|
| Pending | Database | Rule ready but not yet fetched |
| Queued | Job queue | Waiting for a worker |
| Active | Worker thread | Currently executing |
| Complete | Done | Workflow finished |
Transitions
| From | To | Trigger |
|---|---|---|
| Pending | Queued | Master adds to queue |
| Queued | Active | Worker pulls from queue |
| Active | Complete | Workflow finishes |
| Active | Pending | Error or timeout |
Deduplication
The job queue prevents duplicate execution using multiple checks:1
HashMap check
O(1) lookup in the queued job IDs set. If the job ID is already in the set, it’s skipped.
2
Recently processed check
Jobs completed within 30 seconds are skipped. This prevents rapid re-queueing of the same job.
3
Active jobs check
Jobs currently being executed by any worker are skipped.
4
Execution plan check
The
is_currently_running flag in the execution plan prevents re-fetch from database.Configuration options
Timing configuration
| Setting | Default | Description |
|---|---|---|
MASTER_GRACE_PERIOD_SECONDS | 15 | Wait before first fetch after restart |
WORKER_STAGGER_SECONDS | 1 | Delay between worker startups |
WORKER_COOLDOWN_SECONDS | 10 | Delay between jobs per worker |
Worker count
Worker count is calculated based on CPU cores: 90% of available CPU cores, with a minimum of 10 workers. This can be overridden when creating the JobQueueManager.Monitoring
Queue statistics
The job queue manager provides statistics for monitoring:| Metric | Description |
|---|---|
queue_size | Number of jobs waiting in the queue |
active_jobs | Number of jobs currently being executed |
active_job_ids | List of IDs for active jobs |
queued_job_ids | List of IDs in the queue |
total_workers | Configured number of workers |
alive_workers | Number of currently running workers |
Log messages
Key log messages to monitor:| Message | Meaning |
|---|---|
👑 Master fetched X jobs from database | Master successfully fetched jobs |
📥 Added job 'Rule Name' to queue | Job added to queue |
👷 Worker-N pulled job from queue | Worker started processing |
✅ Worker-N finished job | Job completed successfully |
🚨 MEMORY THROTTLE ACTIVE: RAM at X% | Memory throttling engaged |
✅ MEMORY THROTTLE RELEASED | Memory dropped below threshold |
Troubleshooting
Jobs not being picked up
Jobs not being picked up
Check:
- Is master thread running? Check
master_thread_alivein stats - Is memory throttle active? Look for
MEMORY THROTTLEin logs - Are workers alive? Check
alive_workersin stats
- Restart the application if master is dead
- Wait for memory to drop below threshold
- Workers auto-restart in single-job mode
Duplicate job execution
Duplicate job execution
Symptoms: Same job running twice simultaneously.Check:
- Is
is_currently_runningbeing set properly? - Are HashMap locks working correctly?
- This shouldn’t happen with current deduplication
- Check execution plan for stale
is_currently_runningflags - Review logs for “already in queue” messages
Workers dying frequently
Workers dying frequently
Symptoms:
alive_workers count dropping.Check:- Are there exceptions in worker logs?
- Is memory causing OOM kills?
- Single-job mode workers exit normally after each job (this is expected)
- Check for exceptions: look for
FATALin logs - Monitor memory usage before crashes
High memory usage
High memory usage
Check:
- Are memory thresholds configured correctly?
- Is single-job mode enabled?
- Are models being cached?
- Lower
MEMORY_THROTTLE_THRESHOLD - Enable single-job mode: set
WORKER_SINGLE_JOB_MODE=true - Check for memory leaks in action code