Проектирование архитектуры RAG-пайплайна
Архитектура RAG-пайплайна определяет качество, масштабируемость и стоимость всей системы. Базовый RAG «работает» за день, но production-ready система с надёжным retrieval, мониторингом и управляемой стоимостью требует тщательного проектирования.
Компоненты современного RAG-пайплайна
┌─────────────────────────────────────────────────────┐
│ INGESTION PIPELINE │
│ Sources → Loaders → Parsers → Chunkers → Embedder │
│ → Metadata Extractor → Vector Store │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ RETRIEVAL PIPELINE │
│ Query → Query Transformer → Multi-Index Search │
│ → Reranker → Context Assembler │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ GENERATION PIPELINE │
│ Context + Query → Prompt Builder → LLM │
│ → Response Validator → User │
└─────────────────────────────────────────────────────┘
Ingestion Pipeline: архитектурные решения
Document Loaders: выбор загрузчиков критичен для качества. PDF с таблицами требует pdfplumber или LlamaParse, а не PyPDF2. Word документы — python-docx, HTML — BeautifulSoup с кастомными правилами очистки.
from llama_parse import LlamaParse
from langchain_community.document_loaders import (
PyPDFLoader, UnstructuredWordDocumentLoader,
ConfluenceLoader, NotionDBLoader
)
# Для сложных PDF (таблицы, колонки, изображения)
parser = LlamaParse(
api_key="...",
result_type="markdown", # Сохраняет структуру таблиц
language="ru",
)
# Конфигурируемый пайплайн загрузки
LOADERS = {
".pdf": lambda path: LlamaParse().load_data(path),
".docx": lambda path: UnstructuredWordDocumentLoader(path).load(),
".html": lambda path: custom_html_loader(path),
}
Metadata enrichment: обогащение чанков метаданными критично для фильтрации и атрибуции:
def enrich_chunk_metadata(chunk, source_doc):
"""Добавляет структурированные метаданные к чанку"""
chunk.metadata.update({
"source": source_doc.metadata.get("source"),
"page": source_doc.metadata.get("page"),
"doc_type": detect_doc_type(source_doc), # "contract", "regulation", "faq"
"department": extract_department(source_doc),
"date": extract_date(source_doc),
"version": extract_version(source_doc),
"chunk_index": chunk.metadata.get("chunk_index"),
"parent_chunk_id": chunk.metadata.get("parent_id"),
})
return chunk
Retrieval Pipeline: стратегии
Sparse + Dense Hybrid Search:
from qdrant_client import QdrantClient
from qdrant_client.models import SparseVector, NamedSparseVector, NamedVector
# Hybrid search в Qdrant: BM25 sparse + embedding dense
def hybrid_search(query: str, top_k: int = 10) -> list:
# Dense embedding
dense_vector = embedder.embed_query(query)
# Sparse (BM25) через SPLADE или FastEmbed
sparse_vector = sparse_encoder.encode(query)
results = client.query_points(
collection_name="docs",
prefetch=[
{"query": dense_vector, "using": "dense", "limit": 30},
{"query": SparseVector(indices=sparse_vector.indices,
values=sparse_vector.values),
"using": "sparse", "limit": 30},
],
query=fusion, # RRF (Reciprocal Rank Fusion)
limit=top_k,
)
return results
Reranking Pipeline:
from flashrank import Ranker, RerankRequest
ranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2")
def rerank_results(query: str, candidates: list[str]) -> list[str]:
rerank_request = RerankRequest(
query=query,
passages=[{"id": i, "text": c} for i, c in enumerate(candidates)]
)
results = ranker.rerank(rerank_request)
# Сортируем по score, берём top-5
top_passages = [candidates[r["id"]] for r in sorted(results, key=lambda x: -x["score"])[:5]]
return top_passages
Query Transformation: улучшение запроса перед поиском
Плохо сформулированный запрос = плохой retrieval. Трансформации запроса:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Multi-Query: генерируем 3 перефразирования запроса
def multi_query_transform(original_query: str) -> list[str]:
response = llm.invoke(f"""Сгенерируй 3 разных перефразирования следующего вопроса.
Каждый вариант должен искать ту же информацию, но другими словами.
Верни JSON-список строк.
Вопрос: {original_query}""")
queries = json.loads(response.content)
return [original_query] + queries # Оригинал + 3 перефразирования
# Step-back prompting: абстрагируемся к более общему вопросу
def step_back_transform(specific_query: str) -> str:
response = llm.invoke(f"""Сформулируй более общий вопрос, ответ на который
помог бы ответить на конкретный вопрос: "{specific_query}"
Верни только вопрос, без пояснений.""")
return response.content
Архитектура с несколькими индексами
Для систем с разнородными источниками данных эффективнее использовать раздельные индексы:
class MultiIndexRAG:
def __init__(self):
self.indexes = {
"contracts": QdrantRetriever(collection="contracts"),
"regulations": QdrantRetriever(collection="regulations"),
"faq": QdrantRetriever(collection="faq"),
"procedures": QdrantRetriever(collection="procedures"),
}
self.router = QueryRouter() # Классификатор запросов
def retrieve(self, query: str, top_k: int = 5) -> list:
# Определяем релевантные индексы
relevant_indexes = self.router.route(query)
# Параллельный поиск по всем релевантным индексам
all_results = []
for index_name in relevant_indexes:
results = self.indexes[index_name].retrieve(query, k=top_k)
for r in results:
r.metadata["source_index"] = index_name
all_results.extend(results)
# Reranking объединённых результатов
return rerank_results(query, all_results)[:top_k]
Мониторинг качества retrieval
# Трассировка каждого запроса для анализа
import opentelemetry as otel
def traced_retrieval(query: str, span_name: str = "rag_retrieval"):
with otel.trace.get_tracer(__name__).start_as_current_span(span_name) as span:
start_time = time.time()
results = retriever.retrieve(query)
latency = time.time() - start_time
span.set_attributes({
"query.length": len(query),
"results.count": len(results),
"results.top_score": results[0].score if results else 0,
"retrieval.latency_ms": latency * 1000,
})
return results
Сроки проектирования и разработки
- Проектирование архитектуры: 1 неделя
- Базовый ingestion pipeline: 1–2 недели
- Advanced retrieval (hybrid search, reranking): 2–3 недели
- Evaluation framework: 1–2 недели
- Production hardening: 1–2 недели
- Итого: 6–10 недель







