Prerequisites
Before a branch rule can execute, these conditions must be met:All of these are automatically checked by the execution plan manager. You don’t need to manually verify them.
- Active status - The rule’s status must be
active - Not archived -
is_archivedmust befalse - Not force stopped -
is_force_stoppedmust befalse - Branch active - The associated branch must be active
- Within operation window - Current time must be within
operation_time_fromandoperation_time_to - Past next run time - Current time must be past
next_run_time - Not currently running -
is_currently_runningmust befalse
Execution flow
Step 1: Master thread fetches jobs
The master thread runs on a configurable interval (default: 60 seconds) and queries the execution plan for ready rules. It fetches up to 10 jobs per cycle, filtering rules using the criteria listed above.Step 2: Job enters the queue
Each fetched rule is added to the job queue with deduplication. The queue uses a HashMap to prevent duplicates. Deduplication checks:- Job not already in the queue (HashMap lookup)
- Job not recently processed (within 30 seconds)
- Job not currently active (being executed by another worker)
Step 3: Worker claims the job
A worker thread pulls the job from the queue and immediately marks it as running in the execution plan. This atomic claim prevents race conditions where multiple workers might try to execute the same job. The worker then callspre_process() on the branch rule, which attempts to claim the job in the database. If another worker already claimed it, the function returns false and the worker moves on to the next job.
Step 4: Workflow execution
The workflow engine processes the branch rule’s action sequence. This involves several sub-steps:1
Load workflow configuration
The workflow config is retrieved from
rule.action_template_connection.workflow_config. This defines the sequence of actions and conditions.2
Execute start node
Execution begins at the
start node defined in the workflow config.3
Process action nodes
Each node executes its action:
- Detection actions analyze camera streams
- Business actions create violations or send notifications
aggregated_actions_details.4
Evaluate edge conditions
After each action, edge conditions are evaluated to determine the next node. For example:
- If
feature_resultequalsdetection_complete, proceed to business action - If
is_successisfalse, skip business action
5
Apply consolidation logic
When transitioning from detection to business actions, the consolidation logic is evaluated. This is a Python expression that determines whether business actions should run based on aggregated results.If consolidation returns
false, business actions are skipped. This is a normal business outcome, not a failure.Step 5: Post-process and reschedule
After workflow completion,post_process() is called with the execution results.
Post-process updates:
- Sets
is_currently_runningtofalse - Calls the
branch_rules_set_NextRunTimeedge function to calculate and setnext_run_time - Updates
last_run_timeto current time - Increments execution counters
Step 6: Execution plan update
The local execution plan is updated with the newnext_run_time. This ensures the local cache matches the database, preventing the rule from being picked up again immediately.
Next run time calculation
After a branch rule completes execution, RES calls thebranch_rules_set_NextRunTime edge function to calculate when the rule should run next. This calculation considers both the cron expression and the operation time window.
How it works
1
Fetch rule configuration
The edge function retrieves:
- The branch rule’s
operation_time_fromandoperation_time_to - The brand rule’s
cron_expression(or falls back to the rule’s cron expression)
2
Calculate next cron occurrence
Using the cron expression, find the next scheduled run time starting from the current time.
3
Check operation window
Verify the calculated time falls within the operation window. If not, iterate through cron occurrences until one falls within the window.
4
Handle window boundaries
If no valid run time exists within today’s window, schedule for the start of tomorrow’s operation window.
5
Update database
Write the calculated
next_run_time to the branch_rules table.Cron and operation window interaction
The next run time must satisfy both the cron schedule and the operation window:| Scenario | Current time | Cron | Operation window | Next run |
|---|---|---|---|---|
| Normal | 14:00 | */30 * * * * | 08:00-18:00 | 14:30 |
| End of window | 17:45 | */30 * * * * | 08:00-18:00 | 08:00 next day |
| Overnight window | 23:00 | 0 * * * * | 22:00-06:00 | 00:00 (midnight) |
| Outside window | 20:00 | */15 * * * * | 08:00-18:00 | 08:00 next day |
Failed execution handling
If the workflow execution fails, the edge function uses a simplified retry strategy:- Next run time: Current time + 5 minutes
- Purpose: Allow quick retry without waiting for the next cron occurrence
- Behavior: The rule will be picked up again in the next master thread cycle after the 5-minute delay
Fallback behavior
If the edge function call fails (network issue, database error), RES calculates the next run time locally using the same algorithm. This ensures rules continue to be scheduled even during temporary service disruptions. The fallback calculation returns:current_time + 30 minutes
Timeline overview
A typical execution follows this timeline:| Time | Component | Action |
|---|---|---|
| T+0s | Master thread | Fetch jobs from execution plan |
| T+0.1s | Master thread | Add job to queue (with deduplication) |
| T+0.2s | Worker thread | Pull job from queue, mark as running |
| T+0.3s | Worker thread | Call pre_process() to claim in database |
| T+0.5s | Workflow engine | Execute workflow (detect → create_violation) |
| T+45s | Worker thread | Call post_process() to update next run time |
| T+45.1s | Worker thread | Update local execution plan |
| T+60s | Master thread | Next fetch cycle begins |
Operation time window enforcement
RES enforces operation time windows to ensure rules only run during specified hours.Deadline calculation
When a job starts, a deadline is calculated based onoperation_time_to. For example, if the operation window ends at 18:00, the deadline is set to 18:00 on the current day.
Overnight window handling: For windows like 22:00 to 06:00, the system correctly handles the day boundary. If the current time is after 22:00, the deadline is set to 06:00 the next day.
Graceful shutdown
If the deadline is reached during execution:- A signal is set:
operation_window_exceeded = true - Detection actions check this signal and stop gracefully
- The workflow completes with status
operation_window_exceeded - Results up to that point are saved
Error handling
Workflow failures
If a workflow fails with an exception:- The error is logged with full details
post_process()is called withis_last_run_failed = trueis_currently_runningis reset tofalse- The rule is rescheduled based on cron (retry at next scheduled time)
Claim failures
Ifpre_process() fails to claim the job (another worker claimed it first):
- The worker abandons this job
is_currently_runningstaystrue(the other worker has it)- The worker moves on to the next job in the queue
Monitoring execution
Execution logs
Each workflow execution creates an execution log with:- Start and end times
- Workflow status (running, completed, failed)
- Node results with action details
- System metrics (memory, CPU)
- Error messages if failed
Queue statistics
The job queue manager provides statistics including:| Metric | Description |
|---|---|
queue_size | Jobs waiting in the queue |
active_jobs | Jobs currently being executed |
alive_workers | Number of active worker threads |
Troubleshooting
Rule not being picked up
Rule not being picked up
Check these conditions:
- Is the rule status
active? - Is current time within
operation_time_fromandoperation_time_to? - Is
next_run_timein the past? - Is
is_currently_runningset tofalse?
Rule running but not completing
Rule running but not completing
Check for:
- Memory throttling (RAM > 90% pauses new jobs)
- Worker thread death (check
alive_workersin stats) - Stream connection issues (check stream pool stats)
- Long-running detections (check
max_runtime_secondssetting)
Duplicate executions
Duplicate executions
This shouldn’t happen due to deduplication, but if it does:
- Check if
is_currently_runningis being properly set - Verify the HashMap deduplication is working
- Check for database synchronization issues between RES instances