AI Federated Search: единый поиск по множеству источников
Federated Search — поиск, который одновременно опрашивает несколько разнородных источников и возвращает единый ранжированный результат. В корпоративном контексте это означает: один запрос покрывает Confluence, Jira, SharePoint, корпоративную почту, внешние базы данных, файловые серверы — без необходимости переключаться между системами.
Архитектура федеративного поиска
Принципиальный выбор: индексировать данные централизованно или опрашивать источники на лету.
| Подход | Latency | Актуальность данных | Сложность | Подходит для |
|---|---|---|---|---|
| Централизованный индекс | 100–500 мс | Задержка ~15–60 мин | Высокая (ETL) | Большие объёмы, корпоративный поиск |
| Query-time federation | 1–5 сек | Realtime | Средняя | API-источники, малые объёмы |
| Гибридный | 300–800 мс | Критичное — realtime | Высокая | Enterprise с разными требованиями |
В большинстве enterprise-проектов используем гибрид: Confluence/SharePoint/Jira — через централизованный индекс, внешние API и realtime-данные — через query-time federation.
import asyncio
from typing import Protocol, runtime_checkable
from dataclasses import dataclass
@dataclass
class SearchResult:
source: str
doc_id: str
title: str
snippet: str
url: str
score: float
metadata: dict
@runtime_checkable
class SearchConnector(Protocol):
async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
...
class FederatedSearchOrchestrator:
def __init__(self, connectors: dict[str, SearchConnector]):
self.connectors = connectors
self.merger = ResultMerger()
async def search(
self,
query: str,
sources: list[str] = None,
filters: dict = None,
limit: int = 10
) -> list[SearchResult]:
active_connectors = {
name: conn for name, conn in self.connectors.items()
if sources is None or name in sources
}
# Параллельный запрос ко всем источникам с таймаутом
tasks = {
name: asyncio.create_task(
asyncio.wait_for(
conn.search(query, filters or {}, limit * 2),
timeout=3.0 # не ждём медленные источники дольше 3 сек
)
)
for name, conn in active_connectors.items()
}
results_by_source = {}
for name, task in tasks.items():
try:
results_by_source[name] = await task
except asyncio.TimeoutError:
results_by_source[name] = [] # источник не ответил
# логируем но не падаем
except Exception as e:
results_by_source[name] = []
return self.merger.merge_and_rank(results_by_source, limit)
Слияние и переранжирование результатов
Главная техническая проблема федеративного поиска — нормализация скоров из разных источников. BM25-скор из Elasticsearch несопоставим с cosine similarity из Qdrant.
from sentence_transformers import CrossEncoder
import numpy as np
class ResultMerger:
def __init__(self):
self.reranker = CrossEncoder(
"cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length=512
)
def merge_and_rank(
self,
results_by_source: dict[str, list[SearchResult]],
limit: int
) -> list[SearchResult]:
all_results = []
for source, results in results_by_source.items():
all_results.extend(results)
if not all_results:
return []
# Дедупликация по контенту (cosine similarity эмбеддингов)
all_results = self._deduplicate(all_results, threshold=0.92)
# Нормализуем скоры внутри каждого источника (min-max)
for source in results_by_source:
source_results = [r for r in all_results if r.source == source]
if len(source_results) > 1:
scores = [r.score for r in source_results]
min_s, max_s = min(scores), max(scores)
for r in source_results:
r.score = (r.score - min_s) / (max_s - min_s + 1e-9)
return sorted(all_results, key=lambda x: x.score, reverse=True)[:limit]
Коннекторы под конкретные источники
class ConfluenceConnector:
async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
# Гибридный поиск: vector store (Qdrant) для семантики
# + Confluence REST API для актуальности
vector_results = await self.vector_store.asimilarity_search(query, k=limit)
return [self._to_result(r) for r in vector_results]
class JiraConnector:
async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
# JQL с text search
jql = f'text ~ "{query}" ORDER BY updated DESC'
if filters.get("project"):
jql = f'project = {filters["project"]} AND ' + jql
issues = self.jira.search_issues(jql, maxResults=limit)
return [self._issue_to_result(i) for i in issues]
class EmailConnector:
async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
# Поиск через Microsoft Graph API или IMAP
results = await self.graph_client.search_messages(
query=query,
top=limit,
select=["subject", "bodyPreview", "from", "receivedDateTime"]
)
return [self._email_to_result(r) for r in results]
Кейс: страховая компания, 600 сотрудников. 6 источников: SharePoint (180K документов), Jira, Outlook, внутренняя СЭД, база прецедентов, регуляторная база ЦБ (внешний API). До внедрения оператор КЦ переключался между 4–5 вкладками при обработке запроса клиента. После — один поисковый интерфейс с результатами из всех источников. Среднее время обработки запроса: 4,2 мин → 1,8 мин. Источник «Регуляторная база» через query-time federation — latency 800 мс, что приемлемо для данного сценария.
Персонализация источников по роли
Разным ролям показываем релевантные источники по умолчанию:
ROLE_SOURCE_CONFIG = {
"developer": ["jira", "confluence", "gitlab", "stackoverflow-internal"],
"hr": ["confluence", "email", "hr-system", "orgchart"],
"lawyer": ["contracts-db", "sharepoint", "email", "regulations"],
"support": ["confluence", "jira", "email", "crm", "knowledge-base"],
}
Сроки: 2–3 источника, пилот: 4–6 недель; полная федерация 6–8 источников: 3–4 месяца.







