Skip to main content
The RES job queue implements a master-worker architecture using Python’s built-in queue.Queue. This design keeps all execution in a single process, allowing AI models to be loaded once and shared across all workers.

Architecture overview

The job queue system consists of three main components working together:
ComponentRole
JobQueueManagerOrchestrates the entire system, manages workers and master thread
Master threadFetches jobs from the execution plan and adds them to the queue
Worker threadsPull jobs from the queue and execute workflows
JobQueueThread-safe queue with deduplication and active job tracking

Benefits of this architecture

No external dependencies

Uses Python’s built-in queue module. No Redis, Celery, or external services required.

Shared model memory

All workers share the same process memory, so AI models are loaded once and shared.

Race condition prevention

Single process eliminates race conditions that occur with multi-process architectures.

Production stable

Built-in Python threading is battle-tested and production-ready.

Components

Master thread

The master thread fetches jobs from the execution plan and adds them to the queue. Responsibilities:
  • Query execution plan for ready rules every 60 seconds
  • Deduplicate jobs before adding to queue
  • Monitor memory and pause fetching when memory is high
  • Log statistics about job processing
Behavior: The master thread runs continuously, checking memory safety before each fetch cycle. If memory exceeds thresholds, it pauses and waits for memory to drop before resuming.

Worker threads

Worker threads pull jobs from the queue and execute them. Responsibilities:
  • Pull jobs from queue (blocking until available)
  • Mark jobs as running in execution plan
  • Execute workflow via the workflow engine
  • Handle errors and cleanup
Behavior: Each worker checks memory safety before pulling a new job. If memory is high, the worker waits. Once a job is pulled, the worker executes it and marks it complete.

Job queue

The job queue provides thread-safe job management with deduplication. Features:
  • Thread-safe using Python’s queue.Queue
  • HashMap for O(1) duplicate detection
  • Active job tracking
  • Recently processed tracking (prevents immediate re-queue)
Deduplication: When a job is added, the queue checks if it’s already queued, currently active, or was recently processed. Only jobs that pass all checks are added.

Memory throttling

The job queue implements memory throttling to prevent out-of-memory crashes.

Thresholds

ResourceThrottle thresholdResume threshold
RAM90%80%
GPU85%75%
Swap70%N/A

Behavior

When memory exceeds thresholds:
  1. Master thread pauses job fetching
  2. Worker threads pause pulling new jobs
  3. System logs warning with memory statistics
  4. Resume when memory drops below resume threshold

Configuration

Override thresholds via environment variables:
VariableDefaultDescription
MEMORY_THROTTLE_THRESHOLD90RAM percentage to trigger throttle
MEMORY_RESUME_THRESHOLD80RAM percentage to resume
GPU_THROTTLE_THRESHOLD85GPU memory percentage to trigger throttle
GPU_RESUME_THRESHOLD75GPU memory percentage to resume
MEMORY_CHECK_INTERVAL5Seconds between memory checks

Single-job mode

Workers can operate in single-job mode for guaranteed memory cleanup.

How it works

When WORKER_SINGLE_JOB_MODE=true (default):
  1. Worker executes one job
  2. Worker thread exits after job completion
  3. JobQueueManager automatically restarts the worker
  4. New thread starts with fresh memory state

Benefits

  • Complete cleanup of thread-local storage
  • Prevents memory leaks from accumulating
  • Ensures fresh state for each job

Configuration

VariableDefaultDescription
WORKER_SINGLE_JOB_MODEtrueExit worker after each job

Job lifecycle

States

StateLocationDescription
PendingDatabaseRule ready but not yet fetched
QueuedJob queueWaiting for a worker
ActiveWorker threadCurrently executing
CompleteDoneWorkflow finished

Transitions

FromToTrigger
PendingQueuedMaster adds to queue
QueuedActiveWorker pulls from queue
ActiveCompleteWorkflow finishes
ActivePendingError or timeout

Deduplication

The job queue prevents duplicate execution using multiple checks:
1

HashMap check

O(1) lookup in the queued job IDs set. If the job ID is already in the set, it’s skipped.
2

Recently processed check

Jobs completed within 30 seconds are skipped. This prevents rapid re-queueing of the same job.
3

Active jobs check

Jobs currently being executed by any worker are skipped.
4

Execution plan check

The is_currently_running flag in the execution plan prevents re-fetch from database.

Configuration options

Timing configuration

SettingDefaultDescription
MASTER_GRACE_PERIOD_SECONDS15Wait before first fetch after restart
WORKER_STAGGER_SECONDS1Delay between worker startups
WORKER_COOLDOWN_SECONDS10Delay between jobs per worker

Worker count

Worker count is calculated based on CPU cores: 90% of available CPU cores, with a minimum of 10 workers. This can be overridden when creating the JobQueueManager.

Monitoring

Queue statistics

The job queue manager provides statistics for monitoring:
MetricDescription
queue_sizeNumber of jobs waiting in the queue
active_jobsNumber of jobs currently being executed
active_job_idsList of IDs for active jobs
queued_job_idsList of IDs in the queue
total_workersConfigured number of workers
alive_workersNumber of currently running workers

Log messages

Key log messages to monitor:
MessageMeaning
👑 Master fetched X jobs from databaseMaster successfully fetched jobs
📥 Added job 'Rule Name' to queueJob added to queue
👷 Worker-N pulled job from queueWorker started processing
✅ Worker-N finished jobJob completed successfully
🚨 MEMORY THROTTLE ACTIVE: RAM at X%Memory throttling engaged
✅ MEMORY THROTTLE RELEASEDMemory dropped below threshold

Troubleshooting

Check:
  • Is master thread running? Check master_thread_alive in stats
  • Is memory throttle active? Look for MEMORY THROTTLE in logs
  • Are workers alive? Check alive_workers in stats
Resolution:
  • Restart the application if master is dead
  • Wait for memory to drop below threshold
  • Workers auto-restart in single-job mode
Symptoms: Same job running twice simultaneously.Check:
  • Is is_currently_running being set properly?
  • Are HashMap locks working correctly?
Resolution:
  • This shouldn’t happen with current deduplication
  • Check execution plan for stale is_currently_running flags
  • Review logs for “already in queue” messages
Symptoms: alive_workers count dropping.Check:
  • Are there exceptions in worker logs?
  • Is memory causing OOM kills?
Resolution:
  • Single-job mode workers exit normally after each job (this is expected)
  • Check for exceptions: look for FATAL in logs
  • Monitor memory usage before crashes
Check:
  • Are memory thresholds configured correctly?
  • Is single-job mode enabled?
  • Are models being cached?
Resolution:
  • Lower MEMORY_THROTTLE_THRESHOLD
  • Enable single-job mode: set WORKER_SINGLE_JOB_MODE=true
  • Check for memory leaks in action code