This document describes RAGFlow's document ingestion and processing pipeline, which transforms raw documents (PDF, DOCX, Excel, etc.) into searchable, semantically-indexed chunks. The pipeline handles parsing, chunking, enrichment, embedding, and indexing through a task-based execution system.
For information about retrieval and search after documents are indexed, see Retrieval and Search System. For API endpoints to trigger document processing, see Document and File Management APIs.
The document processing pipeline consists of six major stages:
The pipeline is asynchronous and resilient, supporting progress tracking, cancellation, and error recovery.
Sources: rag/svr/task_executor.py1-1400 api/db/services/document_service.py1-800 Diagram 2 from high-level architecture
Pipeline Flow: Documents uploaded through API → Binary stored in object storage → Document/File records in MySQL → Task created and queued in Redis → Task executor pulls task → Parses binary using format-specific parser → Chunks content with strategy-specific chunker → Enriches chunks with LLM-generated metadata → Embeds chunks with tenant-specific model → Inserts into document store with vector index.
Sources: rag/svr/task_executor.py176-238 rag/svr/task_executor.py624-763 api/db/services/document_service.py45-120 api/apps/document_app.py53-99
Documents enter the system through three primary routes:
| Endpoint | Purpose | File Limit | Storage Location |
|---|---|---|---|
POST /api/v1/document/upload | Direct file upload | 256 files, FILE_NAME_LEN_LIMIT bytes | kb_id/{location} in MinIO/S3 |
POST /api/v1/document/web_crawl | HTML → PDF conversion | Single URL | kb_id/{name}.pdf |
POST /api/v1/document/create | Virtual document (no binary) | - | Empty location |
When a document is uploaded, a Document record is created with:
id (UUID), kb_id, name, location (object storage path)type (PDF/DOCX/Excel/etc.), parser_id (chunking method), size (bytes)parser_config (JSON with parser-specific settings)run (UNSTART/RUNNING/CANCEL/DONE/FAIL), progress (0-1), progress_msgchunk_num, token_num, process_durationSources: api/apps/document_app.py53-99 api/db/services/file_service.py100-280 api/db/services/document_service.py352-358 api/db/db_models.py580-640
Document processing tasks are managed through a Redis-based queue system with consumer groups.
Tasks are created in two places:
doc_upload_and_parse() creates task immediately after document recordKey environment variables controlling task execution:
| Variable | Default | Purpose |
|---|---|---|
MAX_CONCURRENT_TASKS | 5 | Total concurrent task limit |
MAX_CONCURRENT_CHUNK_BUILDERS | 1 | Concurrent parsing/chunking |
MAX_CONCURRENT_MINIO | 10 | Concurrent MinIO operations |
WORKER_HEARTBEAT_TIMEOUT | 120 | Seconds before worker considered dead |
Sources: rag/svr/task_executor.py111-132 rag/svr/task_executor.py176-238 api/db/services/task_service.py57-220 api/db/services/document_service.py398-550
The parsing stage converts binary documents into structured text and layout information. RAGFlow supports multiple parsing backends through a dispatch system.
The PARSERS dictionary maps parser names to functions:
The default deepdoc parser uses RAGFlow's native vision pipeline:
OCR Pipeline (deepdoc/parser/pdf_parser.py547-575):
__images__(): Convert PDF pages to images at zoomin=3 (default)_layouts_rec(): Classify regions (text/title/figure/table/caption)_table_transformer_job(): Detect table structures_text_merge(): Merge text blocks respecting layout boundaries_concat_downward(): Use XGBoost model to decide text concatenationLayout Types: text, title, figure, table, caption, header, footer, reference, equation
Table Detection: Uses TableStructureRecognizer to identify cells and extract tabular data as HTML
Sources: rag/app/naive.py58-71 deepdoc/parser/pdf_parser.py544-660 deepdoc/vision/ocr.py1-300 deepdoc/vision/recognizer.py1-250
DOCX parsing uses python-docx and mammoth libraries:
Features:
__get_nearest_title()to_markdown() using mammothSources: rag/app/naive.py232-542 deepdoc/parser/docx_parser.py1-200
Excel parsing extracts tables row-by-row or as complete sheets:
Chunking Modes:
task_page_size <= 0)Image Handling: _extract_images_from_worksheet() extracts embedded images with position metadata
Sources: rag/app/table.py39-240 deepdoc/parser/excel_parser.py1-350
RAGFlow provides 12+ specialized chunking methods optimized for different document types. Each strategy is implemented as a module in rag/app/.
The FACTORY dictionary maps parser_id values to chunking modules:
| Strategy | Use Case | Chunk Unit | Preserves Structure | Typical Size |
|---|---|---|---|---|
| naive | General documents | Token windows | No | 512-2048 tokens |
| table | Spreadsheets | Table rows | Yes (row data) | Variable |
| qa | FAQ documents | Question-answer pairs | Yes (Q&A bullets) | Per Q&A |
| book | Long-form books | Chapters/sections | Yes (TOC hierarchy) | Per section |
| paper | Academic papers | Sections + citations | Yes (section headers) | Per section |
| laws | Legal documents | Articles/clauses | Yes (legal structure) | Per article |
| manual | Technical manuals | Procedures/steps | Yes (hierarchical bullets) | Per procedure |
| presentation | Slides | Per slide | Yes (slide boundaries) | Per slide |
| picture | Images | Single image | Yes | Single chunk |
| one | Small docs | Entire document | Yes | Whole doc |
| Email threads | Per email | Yes (headers) | Per email | |
| tag | Tagged content | Tagged sections | Yes (tags) | Per tag |
Default chunking strategy using sliding token windows:
Configuration (parser_config):
chunk_token_num (default 128): Tokens per chunkdelimiter (default "\n!?。;;!?。"): Sentence delimiterslayout_recognize (bool): Use layout-aware mergingAlgorithm (rag/nlp/__init__.py534-640):
chunk_token_num tokensSources: rag/app/naive.py644-830 rag/nlp/__init__.py534-780
Specialized for tabular data, preserving row relationships:
Row-by-Row Mode:
Chunk Data Structure:
Sources: rag/app/table.py243-510
Detects question-answer patterns and chunks accordingly:
Detection Patterns (rag/nlp/__init__.py75-87):
Chunking Logic:
has_qbullet()Sources: rag/app/qa.py1-600 rag/nlp/__init__.py75-167
Uses table of contents to preserve document hierarchy:
Bullet Patterns (rag/nlp/__init__.py169-201):
Sources: rag/app/book.py1-280 rag/nlp/__init__.py169-350
Academic paper chunking preserving sections and citations:
Recognized Sections:
Special Handling:
Sources: rag/app/paper.py1-400 (if exists, inferred from FACTORY)
After chunking, chunks are enriched with AI-generated metadata to improve retrieval quality.
Configuration: parser_config.auto_keywords (integer, number of keywords to extract)
Implementation:
Chunk Fields Added:
important_kwd: List of keywordsimportant_tks: Tokenized keywords for BM25 searchLLM Prompt: Uses keyword_extraction() prompt from rag/prompts/generator.py
Sources: rag/svr/task_executor.py342-374 rag/prompts/generator.py1-500
Configuration: parser_config.auto_questions (integer, number of questions to generate)
Purpose: Generate potential questions this chunk could answer, improving retrieval for question-based queries.
Chunk Fields Added:
question_kwd: List of generated questionsquestion_tks: Tokenized questionsSources: rag/svr/task_executor.py375-405 rag/prompts/generator.py200-300
Configuration: parser_config.enable_metadata + parser_config.metadata (JSON schema)
Purpose: Extract structured metadata from chunks based on user-defined schema.
JSON Schema Example:
Implementation:
Metadata Storage: Aggregated across all chunks and stored in doc_metadata table via DocMetadataService.update_document_metadata()
Sources: rag/svr/task_executor.py407-448 api/db/services/doc_metadata_service.py1-300 common/metadata_utils.py1-200
Configuration: kb_parser_config.tag_kb_ids (list of KB IDs to source tags from)
Purpose: Automatically tag chunks with relevant tags from existing knowledge bases.
Algorithm:
retriever.all_tags_in_portion(tenant_id, kb_ids, 1000)retriever.tag_content()content_tagging(chat_mdl, content, all_tags, examples, topn)tag_fld fieldChunk Field Added:
tag_fld: Dictionary of tags with weights (e.g., {"python": 1.0, "machine_learning": 0.8})Sources: rag/svr/task_executor.py450-513 rag/prompts/generator.py400-500
Images embedded in chunks are described using vision models (Image2Text):
Image Processing Flow:
Vision Model Integration: Uses LLMBundle(tenant_id, LLMType.IMAGE2TEXT) to generate descriptions
Image Description Prompt: vision_llm_describe_prompt() asks model to describe image relevance to surrounding text
Sources: rag/utils/base64_image.py1-150 rag/prompts/generator.py600-700 deepdoc/parser/figure_parser.py1-400
For book-style documents, a TOC is generated and stored as a special chunk:
TOC Retrieval: Used by retrieval_by_toc() to expand search to related sections
Sources: rag/svr/task_executor.py518-561 rag/prompts/generator.py800-900
After enrichment, chunks are vectorized using tenant-specific embedding models.
Model Binding:
Vector Size Consistency: All chunks in a KB must use same embedding model (same vector dimension)
Text Preparation:
question_kwd, use questions as embedding sourcecontent_with_weightTitle Weighting: Default 10% title, 90% content (configurable via filename_embd_weight)
Token Counting: Tracks embedding token consumption for billing
Sources: rag/svr/task_executor.py570-622 api/db/services/llm_service.py200-400
The final stage inserts enriched, embedded chunks into the document store with full-text and vector indexes.
RAGFlow supports three document store backends:
| Backend | Configuration | Vector Index | Full-Text Index | Hybrid Search |
|---|---|---|---|---|
| Elasticsearch | DOC_ENGINE="" | dense_vector + cosineSimilarity | text + match | Yes (fusion) |
| Infinity | DOC_ENGINE="infinity" | vector + HNSW | varchar + match | Yes (fusion) |
| OpenSearch | DOC_ENGINE="opensearch" | knn_vector | text + match | Yes (fusion) |
ragflow_{tenant_id} (one per tenant)kb_id field (list of KB IDs)ragflow_doc_meta_{tenant_id} (metadata only)When first chunk of a KB is inserted, index is created if missing:
Index Creation:
q_N_vec as dense_vectorq_N_vecknn_vector fieldChunks inserted in batches of 64:
Error Handling: If insertion fails, returns False and task is marked as failed
Progress: Reports 80-100% progress during insertion phase
Sources: rag/svr/task_executor.py871-940 rag/nlp/search.py1-100 common/doc_store/doc_store_base.py1-300
Progress is tracked through set_progress() function, which updates task records:
Progress Scale: 0.0 to 1.0 (displayed as 0% to 100%)
Progress Stages:
Tasks can be canceled via API, setting a Redis flag:
Cancellation Points: Checked at multiple points in pipeline (parsing, chunking, enrichment, embedding)
Graceful Shutdown: TaskCanceledException raised, allowing cleanup before exit
Sources: rag/svr/task_executor.py142-174 api/db/services/task_service.py300-350
Error Types:
Retry Logic: Tasks are NOT automatically retried (must be manually re-triggered)
Error Visibility: Error messages stored in progress_msg field, visible in UI
Sources: rag/svr/task_executor.py244-290 common/exceptions.py1-50
RAPTOR creates multi-level summaries for improved retrieval:
Configuration:
max_cluster: Maximum clusters per levelmax_token: Maximum tokens per summarythreshold: Clustering thresholdrandom_seed: ReproducibilityOutput: Creates additional chunks representing summaries at multiple abstraction levels
Sources: rag/svr/task_executor.py765-859 rag/raptor/__init__.py1-400
GraphRAG extracts entities and relationships:
Schema: Entities (nodes) + Relationships (edges) stored separately
Use Case: Enables graph-based retrieval for complex queries
Sources: rag/graphrag/general/index.py1-600 api/db/services/document_service.py700-800
| Resource | Limit | Purpose |
|---|---|---|
MAX_CONCURRENT_TASKS | 5 | Overall task parallelism |
MAX_CONCURRENT_CHUNK_BUILDERS | 1 | Parsing/chunking (memory-intensive) |
MAX_CONCURRENT_MINIO | 10 | MinIO I/O operations |
EMBEDDING_BATCH_SIZE | 32 | Embeddings per batch |
LLM Cache: Keyword/question/metadata generation results cached by content hash in Redis
Cache Key: {llm_name}:{content_hash}:{operation}:{params}
TTL: No expiration (persists until Redis restart)
Sources: rag/graphrag/utils.py1-200
Sources: api/utils/api_utils.py200-400 api/db/db_models.py580-640
This document provides a comprehensive overview of RAGFlow's document processing pipeline. For retrieval and search operations on the indexed chunks, see Retrieval and Search System. For LLM integration details, see LLM Integration System.
Refresh this wiki