This document describes RAGFlow's multi-tier storage architecture, which separates structured metadata, unstructured document content with vector embeddings, raw file storage, and caching into distinct layers. The architecture is designed for scalability and supports multiple backend implementations for each storage tier.
For information about how documents are processed before storage, see Document Processing Pipeline. For retrieval and search operations, see Retrieval and Search System.
RAGFlow implements a four-tier storage architecture where each tier serves a specific purpose and can be independently scaled or swapped with alternative implementations:
Storage Tier Responsibilities:
| Tier | Purpose | Data Types | Example Backends |
|---|---|---|---|
| Relational DB | Structured metadata, relationships, configuration | User accounts, dataset configs, task status, conversation history | MySQL, OceanBase |
| Document Store | Full-text search, vector similarity, chunk storage | Document chunks, embeddings, tokenized text, positions | Elasticsearch, Infinity, OpenSearch |
| Object Storage | Raw file storage, binary data | Original documents (PDF, DOCX), chunk images, thumbnails | MinIO, S3, OSS, Azure Blob |
| Cache | Session state, temporary data, task queue | Task queue messages, file cache (12min TTL), OTP tokens | Redis, Valkey |
Sources: rag/svr/task_executor.py871-898 api/db/services/dialog_service.py447-451 common/settings.py docker-compose.yml
RAGFlow uses Peewee ORM with support for MySQL and OceanBase (MySQL-compatible). The schema is defined in api/db/db_models.py with automatic connection retry logic and connection pooling.
Primary Tables:
Sources: api/db/db_models.py420-550 api/db/db_models.py135-236
Document Table - Central table tracking all uploaded files:
run field (UNSTART=0, RUNNING=1, CANCEL=2, DONE=3, FAIL=4)chunk_num (indexed chunks), token_num (token consumption)kb_id foreign keyTask Table - Manages asynchronous document processing:
progress (0-1 float), progress_msg (status string)retry_count increments on failure (max 3 attempts)doc_id, used by task executor workersKnowledgebase Table - Dataset configuration:
parser_config JSON fielddoc_num, chunk_num, token_numembd_idConnection Management:
The api/db/db_models.py243-280 implements RetryingPooledMySQLDatabase which automatically retries on connection loss:
Sources: api/db/db_models.py243-280 api/db/services/document_service.py46-77 api/db/services/knowledgebase_service.py48-130
The DocStoreConnection abstract base class in common/doc_store/doc_store_base.py defines a unified interface for all document store implementations. This enables runtime switching between Elasticsearch, Infinity, and OpenSearch.
Core Interface Methods:
Query Expression Types:
| Expression | Purpose | Example |
|---|---|---|
MatchTextExpr | Full-text search with field weights | fields=["content_ltks^1.0", "title_tks^0.5"] |
MatchDenseExpr | Vector similarity search | vector_column="q_768_vec", topk=10 |
FusionExpr | Hybrid search combining text + vector | method="weighted_sum", weights="0.05,0.95" |
OrderByExpr | Result ordering | asc("page_num_int"), desc("create_timestamp_flt") |
Sources: common/doc_store/doc_store_base.py1-150 rag/nlp/search.py36-172
The ESConnection class rag/utils/es_conn.py34-400 implements the document store interface using Elasticsearch 8.x.
Index Structure:
Each index follows the naming pattern ragflow_{tenant_id} and contains:
q_768_vec for 768-dimensional embeddings)*_ltks (coarse), *_sm_ltks (fine-grained)*_kwd suffixesMapping Configuration (dynamic based on vector_size and parser_id):
Search Implementation rag/utils/es_conn.py39-200:
Bulk Operations: The insert() method uses Elasticsearch bulk API for efficient batch insertion of chunks (typically 64 chunks per batch).
Sources: rag/utils/es_conn.py34-400 common/doc_store/es_conn_base.py1-350
The InfinityConnection class rag/utils/infinity_conn.py30-700 implements the interface using Infinity Database, optimized for SQL-like operations with vector support.
Key Differences from Elasticsearch:
| Aspect | Infinity | Elasticsearch |
|---|---|---|
| Field naming | No suffix transformations (uses base field names) | Suffix-based (_ltks, _kwd, _vec) |
| Full-text search | Uses MATCH with specific analyzers like ft_content_rag_coarse | Uses query_string with Lucene syntax |
| Vector search | Native KNN operator with float arrays | script_score or knn with dense_vector |
| Fusion | FUSION('weighted_sum', ...) SQL function | Script-based score combination |
| Result format | Returns (DataFrame, total_count) tuple | Returns DataFrame with hits wrapper |
Field Mapping Strategy rag/utils/infinity_conn.py36-86:
Index Schema (defined in conf/infinity_mapping.json):
Search Query Example rag/utils/infinity_conn.py92-250:
Connection Pooling: Uses InfinityConnectionPool with Thrift protocol (port 23817) or HTTP (port 23820).
Sources: rag/utils/infinity_conn.py30-700 common/doc_store/infinity_conn_base.py1-800 conf/infinity_mapping.json1-30
The OpenSearchConnection class rag/utils/opensearch_conn.py1-800 provides OpenSearch compatibility as an alternative to Elasticsearch.
Implementation Notes:
opensearch-py client instead of elasticsearch-pySources: rag/utils/opensearch_conn.py1-800
The document store backend is selected at runtime via the DOC_ENGINE environment variable:
Sources: common/settings.py README.md
RAGFlow abstracts object storage through the STORAGE_IMPL singleton, which provides a common interface for MinIO, S3, OSS, and Azure Blob Storage.
Common Operations:
| Method | Purpose | Returns |
|---|---|---|
put(bucket, name, binary) | Upload file | Success boolean |
get(bucket, name) | Download file | Binary data |
rm(bucket, name) | Delete file | Success boolean |
obj_exist(bucket, name) | Check existence | Boolean |
get_presigned_url(bucket, name, expires) | Generate temporary URL | URL string |
Sources: rag/svr/task_executor.py240-242 api/db/services/file_service.py1-500
Files are stored with a two-level addressing scheme:
kb_id (knowledge base ID)doc.location field from Document tablechunk_id (SHA256 hash)thumbnail_idAddress Resolution api/db/services/file2document_service.py60-85:
Sources: api/db/services/file2document_service.py60-85 rag/svr/task_executor.py254-256
The MinIOConnection class rag/utils/minio_conn.py1-300 is the default object storage backend.
Configuration:
Key Features:
put() creates bucket if not exists rag/utils/minio_conn.py80-120S3Error (5 attempts, 1-second base delay)Usage Example rag/svr/task_executor.py240-270:
Sources: rag/utils/minio_conn.py1-300 rag/svr/task_executor.py240-270
The S3Connection class rag/utils/s3_conn.py1-250 supports AWS S3 and S3-compatible services.
Configuration:
Implementation Details:
boto3 library with connection poolingbotocore.config.Config(retries={'max_attempts': 5})Sources: rag/utils/s3_conn.py1-250
The OSSConnection class rag/utils/oss_conn.py1-200 integrates with Alibaba Cloud Object Storage Service.
Configuration:
Features:
oss2 library (Alibaba Cloud SDK)RequestError, ServerErrorSources: rag/utils/oss_conn.py1-200
RAGFlow provides two Azure authentication methods:
SAS Token Authentication rag/utils/azure_sas_conn.py1-120:
Service Principal Authentication rag/utils/azure_spn_conn.py1-150:
Both implementations use the azure-storage-blob SDK and provide identical interface methods.
Sources: rag/utils/azure_sas_conn.py1-120 rag/utils/azure_spn_conn.py1-150
The storage backend is selected at runtime via environment variable:
Sources: common/settings.py
Redis (or Valkey fork) serves three primary functions:
Sources: rag/utils/redis_conn.py1-400 api/db/services/task_service.py1-500
The task queue uses Redis Streams for reliable, distributed task processing rag/utils/redis_conn.py100-300:
Queue Structure:
Key Operations rag/svr/task_executor.py176-237:
Retry Logic: Unacknowledged messages are automatically redelivered to other consumers after a timeout (configurable, default 120s).
Sources: rag/utils/redis_conn.py100-300 rag/svr/task_executor.py176-237
The file cache system rag/svr/cache_file_svr.py1-100 stores frequently accessed document binaries in Redis to reduce object storage latency:
Configuration:
Cache Operations:
Use Case: During document processing, chunks from the same document are processed sequentially. Caching the original document for 12 minutes avoids repeated object storage fetches for multi-page documents.
Sources: rag/svr/cache_file_svr.py1-100
Task cancellation uses Redis sets for efficient broadcast api/db/services/task_service.py200-250:
Task executor workers poll this flag periodically during long-running operations rag/svr/task_executor.py350-470
Sources: api/db/services/task_service.py200-250 rag/svr/task_executor.py350-470
The complete flow from upload to searchable chunks involves coordination across all storage tiers:
Key Coordination Points:
Document.run field reflects current processing state (updated by task executor)Task.progress (0-1) and progress_msg fields are updated in real-time rag/svr/task_executor.py142-173chunk_id as key rag/svr/task_executor.py301-325DocMetadataService api/db/services/doc_metadata_service.py1-200Sources: api/db/services/file_service.py200-400 rag/svr/task_executor.py142-900
Search and retrieval operations coordinate between relational database (metadata) and document store (content):
Hybrid Search Process rag/nlp/search.py74-172:
MatchTextExpr with field weightsMatchDenseExprFusionExpr with weighted sum (default 0.05 text, 0.95 vector)Sources: api/db/services/dialog_service.py411-734 rag/nlp/search.py74-350
Document metadata is stored in both MySQL and document store, requiring synchronization:
Metadata Table (MySQL):
Metadata in Document Store:
Synchronization Points api/db/services/doc_metadata_service.py50-150:
Sources: api/db/services/doc_metadata_service.py1-200 rag/svr/task_executor.py407-448
Storage backend selection follows this priority:
Example Configuration:
Sources: common/settings.py service_conf.yaml.template
| Aspect | MinIO | S3 | OSS | Azure Blob | Elasticsearch | Infinity | OpenSearch |
|---|---|---|---|---|---|---|---|
| Setup Complexity | Low | Medium | Medium | Medium | Low | Low | Low |
| Cost | Free (self-hosted) | Pay-per-use | Pay-per-use | Pay-per-use | Free (self-hosted) | Free | Free |
| Latency | Lowest (local) | Medium | Medium | Medium | N/A | N/A | N/A |
| Scalability | Manual | Automatic | Automatic | Automatic | Cluster | Cluster | Cluster |
| Vector Search | N/A | N/A | N/A | N/A | Via script_score | Native KNN | Neural search |
| Full-Text | N/A | N/A | N/A | N/A | Lucene | Custom analyzers | Lucene |
Recommendations:
Sources: README.md docker-compose.yml
Object Storage:
Document Store:
number_of_shards based on cluster size (default: 1)INFINITY_CONNECTION_POOL_SIZE (default: 20)EMBEDDING_BATCH_SIZE (default: 64)Redis:
maxmemory-policy to allkeys-lru for cache evictionRefresh this wiki