This document covers n8n's distributed execution architecture, which enables horizontal scaling by distributing workflow executions across multiple worker processes. This includes the queue-based execution system, Bull/Redis integration, worker coordination, concurrency control, and the communication patterns between main and worker processes.
For information about the workflow execution lifecycle within a single process, see Workflow Execution Lifecycle. For details on the overall runtime architecture including process types, see Runtime Architecture and Process Models.
n8n supports two primary execution modes that determine how workflow executions are processed:
| Mode | Description | Use Case |
|---|---|---|
| Regular | Workflows execute directly in the main process | Single-instance deployments, development |
| Queue | Workflows are enqueued to Redis and processed by worker instances | Production deployments requiring horizontal scaling |
The execution mode is determined by the EXECUTIONS_MODE environment variable and accessed via ExecutionsConfig.mode.
Mode Selection in WorkflowRunner
When WorkflowRunner.run() is called, it decides whether to enqueue or execute directly:
shouldEnqueue = mode === 'queue' && executionMode !== 'manual'
By default, manual executions run in the main process even in queue mode, though this can be overridden with OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS=true.
Sources: packages/cli/src/workflow-runner.ts170-179
n8n uses Bull a Redis-backed job queue, to coordinate distributed execution. The queue infrastructure is managed by ScalingService.
Queue Initialization
The queue is initialized with a prefix (default: bull) and settings including maxStalledCount: 0 to disable Bull's automatic stall recovery (n8n implements its own queue recovery mechanism).
Sources: packages/cli/src/scaling/scaling.service.ts57-81
Each job in the queue carries the following data:
| Field | Type | Description |
|---|---|---|
executionId | string | Unique execution identifier |
workflowId | string | Workflow being executed |
loadStaticData | boolean | Whether to load workflow static data from DB |
pushRef | string? | WebSocket reference for UI updates |
streamingEnabled | boolean? | Whether streaming responses are enabled |
Sources: packages/cli/src/scaling/scaling.types.ts17-23
Sources: packages/cli/src/workflow-runner.ts370-410 packages/cli/src/scaling/job-processor.ts54-276 packages/cli/src/scaling/scaling.service.ts87-106
The main process enqueues executions when in queue mode. The WorkflowRunner.enqueueExecution() method handles this:
Enqueue Process
ScalingService is not initialized, it's dynamically imported and the queue is set upJobData with execution metadataScalingService.addJob() with priority (realtime: 50, non-realtime: 100)PCancelable wrapper that allows cancellation via stopJob()jobData = {
workflowId,
executionId,
loadStaticData: boolean,
pushRef,
streamingEnabled
}
Sources: packages/cli/src/workflow-runner.ts370-410 packages/cli/src/scaling/scaling.service.ts191-214
The main process uses getLifecycleHooksForScalingMain() to set up minimal hooks that only fire workflowExecuteBefore. The actual post-execution hooks run on the worker to avoid split execution logging.
Sources: packages/cli/src/workflow-runner.ts398-402
Main and webhook processes register listeners on the global:progress event to receive messages from workers:
Message Types Handled by Main
| Message Kind | Purpose | Handler Action |
|---|---|---|
send-chunk | Stream chunk to client | Forward to ActiveExecutions.sendChunk() |
respond-to-webhook | Send webhook response | Resolve response promise with decoded response |
job-finished | Execution completed | Resolve response promise, log completion |
job-failed | Execution failed | Log error with stack trace |
Main/Webhook Listener Implementation
The listener examines the message kind and routes accordingly:
switch (msg.kind) {
case 'send-chunk':
activeExecutions.sendChunk(msg.executionId, msg.chunkText)
case 'respond-to-webhook':
decodedResponse = decodeWebhookResponse(msg.response)
activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse)
case 'job-finished':
activeExecutions.resolveResponsePromise(msg.executionId, success ? {} : error)
case 'job-failed':
logger.error(msg.errorMsg + msg.errorStack)
}
Sources: packages/cli/src/scaling/scaling.service.ts300-370
The leader main instance periodically checks for "dangling" executionsāthose marked as running in the database but missing from the queue. This handles cases where executions were not properly cleaned up.
Recovery Process
Configuration
N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL: Minutes between checks (default: 180)N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH_SIZE: Max executions per check (default: 100)The recovery mechanism accelerates if it finds a full batch, indicating more dangling executions may exist.
Sources: packages/cli/src/scaling/scaling.service.ts458-523
When N8N_METRICS_INCLUDE_QUEUE_METRICS=true, the main instance collects queue metrics periodically:
Metrics Collected
active: Jobs currently being processedwaiting: Jobs in queue waiting for workerscompleted: Jobs finished in current intervalfailed: Jobs failed in current intervalThese are emitted as job-counts-updated events for Prometheus exposition.
Sources: packages/cli/src/scaling/scaling.service.ts422-447
Workers are initialized via the n8n worker command, which calls ScalingService.setupWorker(concurrency):
Worker Configuration
queue.process(JOB_TYPE_NAME, concurrency, async (job) => {
await jobProcessor.processJob(job)
})
The concurrency parameter (from N8N_CONCURRENCY_PRODUCTION_LIMIT) determines how many jobs a single worker can process simultaneously.
Sources: packages/cli/src/scaling/scaling.service.ts83-109
JobProcessor.processJob() Workflow
Key Components
workflowData, data, and modeloadStaticData=truegetLifecycleHooksForScalingWorker() which includes full hook suiteWorkflowExecute or ManualExecutionService based on execution typeSources: packages/cli/src/scaling/job-processor.ts54-276
Workers use getLifecycleHooksForScalingWorker() which includes handlers for:
Hook Handlers
| Hook | Purpose |
|---|---|
sendResponse | Send webhook responses via job.progress(RespondToWebhookMessage) |
sendChunk | Stream chunks to main via job.progress(SendChunkMessage) |
workflowExecuteBefore | Standard pre-execution hook |
workflowExecuteAfter | Full post-execution processing including DB save, error workflows, statistics |
nodeExecuteBefore | Node-level pre-execution |
nodeExecuteAfter | Node-level post-execution, save progress if enabled |
The key difference from main hooks: workers handle the full workflowExecuteAfter lifecycle including database persistence, while main instances only handle minimal UI notification.
Sources: packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts331-392
Workers listen for abort-job messages on the global:progress event:
queue.on('global:progress', (jobId, msg) => {
if (msg.kind === 'abort-job') {
jobProcessor.stopJob(jobId)
}
})
When received, the worker cancels the running execution via workflowExecution.cancel().
Sources: packages/cli/src/scaling/scaling.service.ts274-279 packages/cli/src/scaling/job-processor.ts287-301
Main and worker processes communicate via Bull's job.progress() mechanism, which publishes messages to the Redis pub/sub channel. All messages implement the JobMessage union type.
Job Message Types
Message Flow Examples
SendChunkMessage ā Main ā ActiveExecutions.sendChunk() ā HTTP ResponseRespondToWebhookMessage ā Main ā ActiveExecutions.resolveResponsePromise()JobFinishedMessage ā Main ā Resolve post-execute promiseAbortJobMessage ā Worker ā Cancel PCancelableSources: packages/cli/src/scaling/scaling.types.ts40-72 packages/cli/src/scaling/job-processor.ts149-169 packages/cli/src/scaling/scaling.service.ts312-364
When sending webhook responses that contain buffers, workers encode them as base64 strings:
if (Buffer.isBuffer(response.body)) {
response.body = {
'__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING)
}
}
Main decodes these before resolving the response promise:
if ('__@N8nEncodedBuffer@__' in response.body) {
response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING)
}
Sources: packages/cli/src/scaling/job-processor.ts311-321 packages/cli/src/scaling/scaling.service.ts379-393
While job messages handle execution coordination, UI updates flow through the Push service (WebSocket-based). Workers do not directly send push messages; instead, their lifecycle hooks persist execution data to the database, and main instances poll or receive events to update the UI.
n8n implements concurrency throttling to limit the number of simultaneous executions, preventing resource exhaustion. This system is separate from the Bull queue's job concurrency.
The ConcurrencyControlService maintains two separate queues:
| Queue Type | Purpose | Config Variable |
|---|---|---|
production | Production executions (webhook, trigger, etc.) | N8N_CONCURRENCY_PRODUCTION_LIMIT |
evaluation | Evaluation/manual test executions | N8N_CONCURRENCY_EVALUATION_LIMIT |
Each queue has an independent capacity. Setting a limit to -1 disables throttling for that queue.
Sources: packages/cli/src/concurrency/concurrency-control.service.ts15-77
Concurrency Control Flow
ConcurrencyQueue Implementation
The ConcurrencyQueue maintains a capacity counter and a FIFO queue of waiting executions:
capacity = limit
queue = [{executionId, resolve}]
enqueue(executionId):
capacity--
if capacity < 0:
return new Promise(resolve => queue.push({executionId, resolve}))
dequeue():
capacity++
if queue.length > 0:
{executionId, resolve} = queue.shift()
resolve()
Sources: packages/cli/src/concurrency/concurrency-queue.ts1-62 packages/cli/src/concurrency/concurrency-control.service.ts107-188
Executions reserve capacity before starting and release it upon completion. The ConcurrencyCapacityReservation class encapsulates this:
reservation = new ConcurrencyCapacityReservation(concurrencyControl)
try {
await reservation.reserve({mode, executionId})
// ... run execution ...
} finally {
reservation.release()
}
This ensures capacity is always released, even if execution creation fails.
Sources: packages/cli/src/concurrency/concurrency-capacity-reservation.ts1-48 packages/cli/src/active-executions.ts56-154
Important: Concurrency control is disabled in queue mode (EXECUTIONS_MODE=queue). The throttling mechanism only applies to regular mode. In queue mode, concurrency is controlled by:
N8N_CONCURRENCY_PRODUCTION_LIMIT on worker)Sources: packages/cli/src/concurrency/concurrency-control.service.ts64-69
Multiple main instances can run simultaneously for high availability. One instance is elected as the "leader" and handles singleton responsibilities:
Leader Responsibilities
Leader Election
n8n uses Redis-based leader election via the @n8n/multi-main-setup package. The InstanceSettings.isLeader flag determines leader status, and decorators trigger actions:
@OnLeaderTakeover(): Executed when becoming leader@OnLeaderStepdown(): Executed when losing leadershipSources: packages/cli/src/scaling/scaling.service.ts458-488
Worker processes can be horizontally scaled by running multiple n8n worker instances. Each worker:
Worker Scaling Considerations
In large deployments, webhook handling can be offloaded to dedicated processes:
Webhook instances:
This separation allows scaling webhook ingestion independently from execution processing.
Sources: packages/cli/src/scaling/scaling.service.ts260-269
The ActiveExecutions service tracks in-progress executions in the current process, not globally. This is crucial in distributed mode.
State Tracking
Each active execution stores:
Sources: packages/cli/src/interfaces.ts116-126 packages/cli/src/active-executions.ts56-154
When a workflow enters a waiting state (e.g., via Wait node or Form node), it remains in activeExecutions but releases the workflowExecution reference. This prevents memory leaks while maintaining the ability to resume.
Resume Behavior
On resume, ActiveExecutions.add() is called with the existing executionId. The service:
startedAt timestampresponsePromise for webhook workflowsrunningSources: packages/cli/src/active-executions.ts193-207
During graceful shutdown:
Regular Mode
cancelAll=trueQueue Mode (Worker)
Sources: packages/cli/src/active-executions.ts290-322 packages/cli/src/scaling/scaling.service.ts155-169
| Variable | Default | Description |
|---|---|---|
EXECUTIONS_MODE | regular | Execution mode: regular or queue |
OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS | false | Whether to enqueue manual executions in queue mode |
| Variable | Default | Description |
|---|---|---|
QUEUE_BULL_PREFIX | bull | Redis key prefix for Bull |
QUEUE_BULL_REDIS_HOST | localhost | Redis host |
QUEUE_BULL_REDIS_PORT | 6379 | Redis port |
QUEUE_BULL_REDIS_DB | 0 | Redis database number |
QUEUE_BULL_REDIS_PASSWORD | - | Redis password |
QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD | 10000 | Redis connection timeout (ms) |
| Variable | Default | Description |
|---|---|---|
N8N_CONCURRENCY_PRODUCTION_LIMIT | -1 | Max concurrent production executions per worker |
| Variable | Default | Description |
|---|---|---|
N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL | 180 | Minutes between recovery checks |
N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH_SIZE | 100 | Max executions per recovery check |
| Variable | Default | Description |
|---|---|---|
N8N_CONCURRENCY_PRODUCTION_LIMIT | -1 | Max concurrent production executions |
N8N_CONCURRENCY_EVALUATION_LIMIT | -1 | Max concurrent evaluation executions |
Sources: packages/cli/src/scaling/scaling.service.ts57-73 packages/cli/src/concurrency/concurrency-control.service.ts26-77
Bull has a built-in stalled job detection mechanism, but n8n disables it (maxStalledCount: 0) because:
If a "job stalled" error is detected, n8n wraps it in MaxStalledCountError and emits a job-stalled event.
Sources: packages/cli/src/scaling/scaling.service.ts70 packages/cli/src/workflow-runner.ts434-447
The queue recovery mechanism (described earlier) handles dangling executions by:
crashedThis runs periodically on the leader instance to catch executions that were never properly cleaned up due to crashes or network issues.
Sources: packages/cli/src/scaling/scaling.service.ts490-523
Jobs can be cancelled in two ways:
1. Before Processing (Waiting in Queue)
await job.remove()
2. During Processing (Active)
await job.progress({ kind: 'abort-job' })
await job.discard() // prevent retries
await job.moveToFailed(error, true)
The worker receives the abort message and cancels the execution via PCancelable.cancel().
Sources: packages/cli/src/scaling/scaling.service.ts226-252
Jobs are enqueued with priority values:
Lower priority values are processed first.
Sources: packages/cli/src/workflow-runner.ts396 packages/cli/src/scaling/scaling.service.ts191-214
Worker Concurrency
N8N_CONCURRENCY_PRODUCTION_LIMIT on each workerTotal Capacity = Workers Ć Per-Worker ConcurrencyRegular Mode Concurrency
N8N_CONCURRENCY_PRODUCTION_LIMIT on main instanceWaiting Executions
workflowExecution reference to free memoryActiveExecutionsJob Data
removeOnComplete: true and removeOnFail: true in job optionsSources: packages/cli/src/scaling/scaling.service.ts194-198 packages/cli/src/active-executions.ts136-148
Refresh this wiki
This wiki was recently refreshed. Please wait 2 days to refresh again.