This document describes RAGFlow's asynchronous task execution system, which handles long-running document processing operations outside of HTTP request-response cycles. The system uses Redis Streams as a distributed task queue, with multiple worker processes consuming and executing tasks concurrently.
Topics covered include:
For information about specific document processing methods (parsing, chunking, embedding), see Document Processing Pipeline. For dataset and document management APIs, see Dataset and Knowledge Base APIs and Document and File Management APIs.
The task execution system follows a producer-consumer pattern where API endpoints act as producers, Redis Streams serves as the message broker, and dedicated worker processes act as consumers.
Sources: rag/svr/task_executor.py1-140 api/db/services/task_service.py1-70 api/db/services/document_service.py460-550
RAGFlow uses Redis Streams with consumer groups for reliable task distribution. Each task type has a dedicated queue (stream name), and workers belong to a consumer group that ensures each message is processed exactly once.
| Queue Name | Task Type | Purpose |
|---|---|---|
ragflow_dataflow | Document parsing | Parse documents, generate chunks, create embeddings |
ragflow_raptor | RAPTOR summarization | Build hierarchical summaries for datasets |
ragflow_graphrag | Knowledge graph | Extract entities and relationships |
ragflow_mindmap | Mind map generation | Generate mind maps from documents |
ragflow_memory | Memory storage | Save conversation messages to memory |
Sources: rag/svr/task_executor.py80 common/constants.py common/settings.py
Tasks are queued as JSON messages with the following structure:
Sources: rag/svr/task_executor.py196-237 api/db/services/task_service.py147-180
The collect() async function retrieves messages from Redis and acknowledges them:
Sources: rag/svr/task_executor.py176-237
Each worker process is identified by CONSUMER_NO (passed as command-line argument) and registers itself as task_executor_{N}:
| Environment Variable | Default | Description |
|---|---|---|
MAX_CONCURRENT_TASKS | 5 | Maximum parallel tasks per worker |
MAX_CONCURRENT_CHUNK_BUILDERS | 1 | Parallel chunking operations |
MAX_CONCURRENT_MINIO | 10 | Parallel MinIO uploads |
WORKER_HEARTBEAT_TIMEOUT | 120 | Worker heartbeat timeout (seconds) |
Sources: rag/svr/task_executor.py113-132
The worker runs an async event loop that continuously collects and processes tasks:
Sources: rag/svr/task_executor.py176-237 rag/svr/task_executor.py1114-1450
Workers gracefully shut down on SIGINT/SIGTERM:
Sources: rag/svr/task_executor.py135-139
The TASK_TYPE_TO_PIPELINE_TASK_TYPE dictionary maps queue task types to database enum values:
Sources: rag/svr/task_executor.py103-109
Sources: rag/svr/task_executor.py624-763
RAPTOR builds hierarchical summaries through recursive clustering and summarization:
Sources: rag/svr/task_executor.py765-859
GraphRAG extracts entities, relationships, and builds community structures:
Sources: rag/svr/task_executor.py1225-1300 rag/graphrag/general/index.py
Saves conversation messages to a memory system for context retrieval:
Sources: api/db/joint_services/memory_message_service.py rag/svr/task_executor.py1359-1400
The system uses asyncio.Semaphore objects to limit concurrent operations and prevent resource exhaustion:
Sources: rag/svr/task_executor.py126-131 rag/graphrag/utils.py76
Semaphores are acquired using async context managers:
Sources: rag/svr/task_executor.py271-283 rag/svr/task_executor.py597-599
This hierarchical design ensures:
Sources: rag/svr/task_executor.py126-131
Tasks are created in the MySQL Task table and then queued to Redis:
Sources: api/db/services/task_service.py147-250 api/db/services/document_service.py567-650
Key fields in the Task model:
| Field | Type | Description |
|---|---|---|
id | VARCHAR(128) | Primary key, UUID |
doc_id | VARCHAR(128) | Associated document ID |
from_page | INT | Starting page for processing |
to_page | INT | Ending page (-1 = all) |
progress | FLOAT | 0.0 to 1.0 (negative = error) |
progress_msg | LONGTEXT | Human-readable status messages |
retry_count | INT | Number of retry attempts |
Sources: api/db/db_models.py900-950
The set_progress() function updates task status and checks for cancellation:
Sources: rag/svr/task_executor.py142-173
Cancellation uses a Redis flag checked periodically during execution:
The worker checks cancellation at multiple points:
Sources: api/db/services/task_service.py128-145 rag/svr/task_executor.py142-173
Long-running operations accept a progress_callback function:
The callback is invoked with prog (float 0-1) and msg (string) parameters:
Sources: rag/svr/task_executor.py1114-1300
Typical progress breakdowns:
| Stage | Progress Range | Description |
|---|---|---|
| Fetch from MinIO | 0.0 - 0.05 | Download file from object storage |
| Chunking | 0.05 - 0.5 | Parse document and split into chunks |
| Keyword extraction | 0.5 - 0.6 | Generate keywords (if enabled) |
| Question generation | 0.6 - 0.7 | Generate questions (if enabled) |
| Embedding | 0.7 - 0.9 | Generate vector embeddings |
| Indexing | 0.9 - 1.0 | Insert chunks into doc store |
Sources: rag/svr/task_executor.py244-622
Tasks with errors are retried up to 3 times:
After 3 failures, the task is marked as failed (progress = -1) and no longer processed.
Sources: api/db/services/task_service.py129-141
The main task execution is wrapped in comprehensive exception handling:
Sources: rag/svr/task_executor.py1114-1450
Common errors and their handling:
| Error Type | Handling Strategy | Progress Value |
|---|---|---|
TaskCanceledException | Immediate abort, log cancellation | -1 |
TimeoutError | Log timeout message, mark failed | -1 |
DoesNotExist (Peewee) | Task/document deleted, skip | N/A (no update) |
| File not found in MinIO | Log error, mark failed | -1 |
| Parsing errors | Log exception, mark failed | -1 |
| Database errors | Retry connection, log error | -1 if fatal |
Sources: rag/svr/task_executor.py142-173 rag/svr/task_executor.py285-290
Critical operations have timeouts to prevent indefinite hangs:
Sources: rag/svr/task_executor.py244 rag/svr/task_executor.py301 rag/svr/task_executor.py765
Each worker maintains counters for monitoring:
Sources: rag/svr/task_executor.py113-121
Worker health can be monitored through:
FAILED_TASKS / (DONE_TASKS + FAILED_TASKS)CURRENT_TASKS dictionarySources: rag/svr/task_executor.py113-132
Sources: rag/svr/task_executor.py1-1500 api/db/services/task_service.py1-300 api/db/services/document_service.py460-700 rag/utils/redis_conn.py common/constants.py
Refresh this wiki