AI-система федеративного поиска по нескольким источникам

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
AI-система федеративного поиска по нескольким источникам
Средняя
~2-4 недели
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    853
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

AI Federated Search: единый поиск по множеству источников

Federated Search — поиск, который одновременно опрашивает несколько разнородных источников и возвращает единый ранжированный результат. В корпоративном контексте это означает: один запрос покрывает Confluence, Jira, SharePoint, корпоративную почту, внешние базы данных, файловые серверы — без необходимости переключаться между системами.

Архитектура федеративного поиска

Принципиальный выбор: индексировать данные централизованно или опрашивать источники на лету.

Подход Latency Актуальность данных Сложность Подходит для
Централизованный индекс 100–500 мс Задержка ~15–60 мин Высокая (ETL) Большие объёмы, корпоративный поиск
Query-time federation 1–5 сек Realtime Средняя API-источники, малые объёмы
Гибридный 300–800 мс Критичное — realtime Высокая Enterprise с разными требованиями

В большинстве enterprise-проектов используем гибрид: Confluence/SharePoint/Jira — через централизованный индекс, внешние API и realtime-данные — через query-time federation.

import asyncio
from typing import Protocol, runtime_checkable
from dataclasses import dataclass

@dataclass
class SearchResult:
    source: str
    doc_id: str
    title: str
    snippet: str
    url: str
    score: float
    metadata: dict

@runtime_checkable
class SearchConnector(Protocol):
    async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
        ...

class FederatedSearchOrchestrator:
    def __init__(self, connectors: dict[str, SearchConnector]):
        self.connectors = connectors
        self.merger = ResultMerger()

    async def search(
        self,
        query: str,
        sources: list[str] = None,
        filters: dict = None,
        limit: int = 10
    ) -> list[SearchResult]:
        active_connectors = {
            name: conn for name, conn in self.connectors.items()
            if sources is None or name in sources
        }

        # Параллельный запрос ко всем источникам с таймаутом
        tasks = {
            name: asyncio.create_task(
                asyncio.wait_for(
                    conn.search(query, filters or {}, limit * 2),
                    timeout=3.0  # не ждём медленные источники дольше 3 сек
                )
            )
            for name, conn in active_connectors.items()
        }

        results_by_source = {}
        for name, task in tasks.items():
            try:
                results_by_source[name] = await task
            except asyncio.TimeoutError:
                results_by_source[name] = []  # источник не ответил
                # логируем но не падаем
            except Exception as e:
                results_by_source[name] = []

        return self.merger.merge_and_rank(results_by_source, limit)

Слияние и переранжирование результатов

Главная техническая проблема федеративного поиска — нормализация скоров из разных источников. BM25-скор из Elasticsearch несопоставим с cosine similarity из Qdrant.

from sentence_transformers import CrossEncoder
import numpy as np

class ResultMerger:
    def __init__(self):
        self.reranker = CrossEncoder(
            "cross-encoder/ms-marco-MiniLM-L-6-v2",
            max_length=512
        )

    def merge_and_rank(
        self,
        results_by_source: dict[str, list[SearchResult]],
        limit: int
    ) -> list[SearchResult]:
        all_results = []
        for source, results in results_by_source.items():
            all_results.extend(results)

        if not all_results:
            return []

        # Дедупликация по контенту (cosine similarity эмбеддингов)
        all_results = self._deduplicate(all_results, threshold=0.92)

        # Нормализуем скоры внутри каждого источника (min-max)
        for source in results_by_source:
            source_results = [r for r in all_results if r.source == source]
            if len(source_results) > 1:
                scores = [r.score for r in source_results]
                min_s, max_s = min(scores), max(scores)
                for r in source_results:
                    r.score = (r.score - min_s) / (max_s - min_s + 1e-9)

        return sorted(all_results, key=lambda x: x.score, reverse=True)[:limit]

Коннекторы под конкретные источники

class ConfluenceConnector:
    async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
        # Гибридный поиск: vector store (Qdrant) для семантики
        # + Confluence REST API для актуальности
        vector_results = await self.vector_store.asimilarity_search(query, k=limit)
        return [self._to_result(r) for r in vector_results]

class JiraConnector:
    async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
        # JQL с text search
        jql = f'text ~ "{query}" ORDER BY updated DESC'
        if filters.get("project"):
            jql = f'project = {filters["project"]} AND ' + jql
        issues = self.jira.search_issues(jql, maxResults=limit)
        return [self._issue_to_result(i) for i in issues]

class EmailConnector:
    async def search(self, query: str, filters: dict, limit: int) -> list[SearchResult]:
        # Поиск через Microsoft Graph API или IMAP
        results = await self.graph_client.search_messages(
            query=query,
            top=limit,
            select=["subject", "bodyPreview", "from", "receivedDateTime"]
        )
        return [self._email_to_result(r) for r in results]

Кейс: страховая компания, 600 сотрудников. 6 источников: SharePoint (180K документов), Jira, Outlook, внутренняя СЭД, база прецедентов, регуляторная база ЦБ (внешний API). До внедрения оператор КЦ переключался между 4–5 вкладками при обработке запроса клиента. После — один поисковый интерфейс с результатами из всех источников. Среднее время обработки запроса: 4,2 мин → 1,8 мин. Источник «Регуляторная база» через query-time federation — latency 800 мс, что приемлемо для данного сценария.

Персонализация источников по роли

Разным ролям показываем релевантные источники по умолчанию:

ROLE_SOURCE_CONFIG = {
    "developer": ["jira", "confluence", "gitlab", "stackoverflow-internal"],
    "hr": ["confluence", "email", "hr-system", "orgchart"],
    "lawyer": ["contracts-db", "sharepoint", "email", "regulations"],
    "support": ["confluence", "jira", "email", "crm", "knowledge-base"],
}

Сроки: 2–3 источника, пилот: 4–6 недель; полная федерация 6–8 источников: 3–4 месяца.