Purpose: This page documents the Canvas execution engine and its Domain Specific Language (DSL) that powers RAGFlow's visual workflow system. The Canvas engine orchestrates directed acyclic graphs (DAGs) of components, manages state, resolves variables, and executes workflows. For information about individual component implementations, see Built-in Components. For details on state management, see State and Variable Management. For API endpoints and management, see Canvas API and Management.
Scope: This page covers the JSON DSL format, the Graph and Canvas classes, component initialization, execution algorithms, variable resolution patterns, and persistence mechanisms. It does not cover component internals or tool integration (see Agent Tools and ReAct Loop).
The Canvas DSL is a JSON structure that defines a complete workflow. Each Canvas workflow is stored as a serialized DSL in the user_canvas.dsl database field and can be loaded, executed, and modified at runtime.
DSL Root Fields:
| Field | Type | Description |
|---|---|---|
components | Object | Map of component_id → component definition with obj, downstream, upstream |
history | Array | Conversation history as [role, content] tuples |
path | Array | Ordered list of component IDs representing execution path |
retrieval | Array | Retrieved document chunks and aggregations per execution step |
globals | Object | Global variables including sys.* system vars and env.* user vars |
variables | Object | User-defined workflow variables with type information |
memory | Array | Long-term memory as [user_msg, assistant_msg, summary] tuples |
task_id | String | UUID for tracking workflow execution |
Sources: agent/canvas.py42-78 agent/canvas.py281-316
Each component in the components object has the following structure:
Component Fields:
| Field | Type | Description |
|---|---|---|
obj.component_name | String | Component type (e.g., "LLM", "Retrieval", "Agent") |
obj.params | Object | Component-specific parameters and configuration |
downstream | Array | List of component IDs to execute after this component |
upstream | Array | List of component IDs that execute before this component |
parent_id | String | Optional parent component ID for nested structures (Iteration, Loop) |
Sources: agent/canvas.py42-67 agent/canvas.py92-106
The Canvas execution engine is built on a two-class hierarchy: Graph (base) and Canvas (implementation).
Sources: agent/canvas.py40-279 agent/canvas.py281-831 agent/component/base.py365-585
When a Canvas is instantiated from DSL, components are dynamically loaded and initialized through the component_class registry.
Key Methods:
Graph.load() agent/canvas.py92-107: Iterates through DSL components, instantiates parameter objects, validates them, and creates component instances using the component_class registrycomponent_class(name) agent/component/__init__.py: Dynamic registry that maps component names to their classesComponentParam.update(conf) agent/component/base.py127-187: Recursively updates parameter values from DSL configurationComponentParam.check() agent/component/base.py57-58: Validates parameter constraints (implemented by each component)Sources: agent/canvas.py92-107 agent/component/base.py40-363 agent/canvas.py295-316
The Canvas execution engine traverses the component DAG in topological order, invoking components and updating the execution path.
Key Execution Methods:
| Method | Line | Description |
|---|---|---|
Canvas.run() | agent/canvas.py369-662 | Main async execution loop, yields SSE events |
_run_batch() | agent/canvas.py422-469 | Batch executes components using asyncio.gather with semaphore |
_invoke_one() | agent/canvas.py433-438 | Invokes single component (async or sync via thread pool) |
component.invoke() | agent/component/base.py407-419 | Synchronous component invocation wrapper |
component.invoke_async() | agent/component/base.py421-447 | Async component invocation with timing |
Execution Path Updates:
After each component invocation, the path is updated based on component type:
_next output (line 613)Sources: agent/canvas.py369-662 agent/canvas.py422-469 agent/component/base.py407-447
Canvas supports a sophisticated variable reference system that allows components to reference outputs from other components or global variables.
Variables are referenced using the syntax {component_id@output_var} or {sys.variable} or {env.variable}.
Variable Types:
| Syntax Pattern | Example | Description |
|---|---|---|
{component_id@output} | {retrieval_0@content} | Reference output from another component |
{[email protected]} | {[email protected]} | Reference nested field in component output |
{sys.*} | {sys.query} | System variables (query, user_id, files, etc.) |
{env.*} | {env.api_key} | User-defined environment variables |
Key Resolution Methods:
| Method | Line | Description |
|---|---|---|
get_value_with_variable() | agent/canvas.py164-189 | Scans string for variable patterns, resolves and replaces them |
get_variable_value() | agent/canvas.py191-206 | Resolves single variable expression to its value |
get_variable_param_value() | agent/canvas.py208-235 | Navigates nested paths in objects (dict/list/object) |
set_variable_value() | agent/canvas.py237-255 | Sets variable value (updates globals or component output) |
Variable Pattern Regex: agent/canvas.py165
Sources: agent/canvas.py164-267 agent/component/base.py500-511
Canvas maintains five distinct state buckets that components can read from and write to.
| Bucket | Type | Purpose | Read/Write | Persistence |
|---|---|---|---|---|
| globals | Object | System and environment variables | Both | Serialized in DSL |
| history | Array | Conversation turns | Read (via get_history()) | Serialized in DSL |
| retrieval | Array | Retrieved chunks per execution step | Both | Serialized in DSL |
| memory | Array | Long-term tool call summaries | Both (via Agent) | Serialized in DSL |
| variables | Object | Workflow variable definitions | Read (for type info) | Serialized in DSL |
Global Variable Reset Logic agent/canvas.py324-367:
When Canvas.reset() is called:
sys.* variables are reset to default values by type (string → "", int → 0, list → [], etc.)env.* variables are reset based on their type definition in variables dictmem=True)Sources: agent/canvas.py284-316 agent/canvas.py324-367 agent/canvas.py718-731
Canvas provides methods for managing retrieval results (references) and agent memory.
Adding References agent/canvas.py803-816:
canvas.add_reference(chunks, doc_infos) to register retrieved documentsretrieval[-1]["chunks"]retrieval[-1]["doc_aggs"]Getting References agent/canvas.py818-821:
canvas.get_reference() returns the latest retrieval resultsAdding Memory agent/canvas.py823-824:
canvas.add_memory(user, assist, summ) to store tool call summaries(user_request, assistant_response, summary)Getting Memory agent/canvas.py826-827:
Tool Use Callback agent/canvas.py779-801:
{task_id}-{message_id}-logsSources: agent/canvas.py779-827
Canvas DSL is stored in the database and supports version history for rollback.
Serialization agent/canvas.py109-128:
The __str__() method serializes Canvas state to JSON:
json.dumps(dsl, ensure_ascii=False)Version Control api/apps/canvas_app.py96-97:
On each save:
user_canvas.dsluser_canvas_version with timestampLoading api/apps/canvas_app.py146-164:
When loading a Canvas:
user_canvas tableCanvas(dsl_json, tenant_id, canvas_id=id)Graph.__init__() which calls load()Sources: agent/canvas.py109-128 api/apps/canvas_app.py73-98 api/apps/canvas_app.py146-164 api/db/services/canvas_service.py41-189
Canvas supports real-time task cancellation using Redis flags.
Cancellation Methods:
| Method | Location | Description |
|---|---|---|
Graph.is_canceled() | agent/canvas.py269-270 | Checks Redis for cancellation flag |
Graph.cancel_task() | agent/canvas.py272-278 | Sets Redis cancellation flag |
ComponentBase.check_if_canceled() | agent/component/base.py396-405 | Component-level cancellation check |
Cancellation Points:
Task Cleanup agent/canvas.py135-138:
On Canvas.reset(), Redis keys are deleted:
{task_id}-logs: Execution trace logs{task_id}-cancel: Cancellation flagSources: agent/canvas.py269-278 agent/canvas.py135-138 api/apps/canvas_app.py261-268 agent/component/base.py393-405
Canvas execution yields Server-Sent Events (SSE) to stream workflow progress to clients.
| Event | Description | Data Fields |
|---|---|---|
workflow_started | Workflow begins | inputs |
node_started | Component begins execution | component_id, component_name, component_type, thoughts |
node_finished | Component completes | inputs, outputs, error, elapsed_time |
message | Streaming content chunk | content, audio_binary, start_to_think, end_to_think |
message_end | Message streaming complete | status, attachment, reference |
user_inputs | Awaiting user input | inputs, tips |
workflow_finished | Workflow completes | inputs, outputs, elapsed_time |
Event Decoration agent/canvas.py399-408:
Each event is wrapped with metadata:
Streaming Pattern api/apps/canvas_app.py166-186:
Sources: agent/canvas.py369-662 agent/canvas.py399-408 api/apps/canvas_app.py166-186
Canvas provides exception handling at both the component and workflow level.
Components can define exception handlers in their parameters agent/component/base.py567-581:
Exception Flow agent/canvas.py572-581:
When a component errors:
exception_handler() is definedgoto is specified, append those components to pathdefault_value is specified, yield that as messageCanvas maintains an error state agent/canvas.py84:
Error Output agent/canvas.py654-661:
If cancelled:
Sources: agent/canvas.py572-625 agent/component/base.py567-581
The Canvas engine integrates with the REST API through service layer abstractions.
| Endpoint | Method | Description | Handler |
|---|---|---|---|
/canvas/set | POST | Save/update Canvas DSL | api/apps/canvas_app.py73-98 |
/canvas/get/:id | GET | Retrieve Canvas by ID | api/apps/canvas_app.py101-107 |
/canvas/completion | POST | Execute Canvas workflow | api/apps/canvas_app.py132-186 |
/canvas/reset | POST | Reset Canvas state | api/apps/canvas_app.py271-291 |
/canvas/debug | POST | Debug single component | api/apps/canvas_app.py332-366 |
/canvas/cancel/:task_id | PUT | Cancel running task | api/apps/canvas_app.py261-268 |
completion() api/db/services/canvas_service.py192-251:
High-level workflow execution:
UserCanvasService api/db/services/canvas_service.py41-189:
CRUD operations for Canvas storage:
get_by_canvas_id(): Fetch Canvas with user metadataupdate_by_id(): Persist DSL changesaccessible(): Check tenant permissionsSources: api/apps/canvas_app.py132-186 api/db/services/canvas_service.py192-251 api/db/services/canvas_service.py41-189
Batch Processing agent/canvas.py422-469:
Components are executed in batches using asyncio.gather():
run_in_executor()Components use token-aware message truncation rag/prompts/generator.py62-96:
message_fit_in() ensures messages fit within LLM context limitsLarge outputs use functools.partial for lazy evaluation agent/canvas.py506-549:
This avoids materializing full responses in memory until needed.
Sources: agent/canvas.py422-469 agent/canvas.py506-549 rag/prompts/generator.py62-96
The Canvas Engine is a sophisticated workflow orchestration system built on:
The engine enables visual workflow creation while maintaining programmatic flexibility through its JSON DSL and dynamic component loading system.
Sources: agent/canvas.py1-831 agent/component/base.py365-585 api/apps/canvas_app.py1-672 api/db/services/canvas_service.py1-367
Refresh this wiki