Разработка AI-системы обогащения данных о клиенте из открытых источников
AI-система обогащения данных (data enrichment) автоматически дополняет профиль клиента информацией из открытых источников: LinkedIn, Crunchbase, GitHub, новостные сайты, реестры компаний. Это ускоряет onboarding, улучшает качество лид-скоринга и персонализацию.
Архитектура системы
from dataclasses import dataclass
from typing import Optional
@dataclass
class EnrichedProfile:
# Исходные данные
email: str
company_name: str
# Обогащённые данные
linkedin_profile: Optional[dict] = None
company_info: Optional[dict] = None
news_mentions: Optional[list] = None
tech_stack: Optional[list] = None
funding_info: Optional[dict] = None
enrichment_score: float = 0.0 # Уверенность в данных
class DataEnrichmentPipeline:
def __init__(self):
self.enrichers = [
LinkedInEnricher(),
ClearbitEnricher(),
CrunchbaseEnricher(),
NewsEnricher(),
GitHubEnricher(),
]
async def enrich(self, email: str, company: str) -> EnrichedProfile:
profile = EnrichedProfile(email=email, company_name=company)
# Параллельное обогащение из всех источников
tasks = [enricher.enrich(email, company) for enricher in self.enrichers]
results = await asyncio.gather(*tasks, return_exceptions=True)
for enricher, result in zip(self.enrichers, results):
if isinstance(result, Exception):
continue # Один источник не ломает весь пайплайн
profile = enricher.apply_to_profile(profile, result)
profile.enrichment_score = self._compute_score(profile)
return profile
LinkedIn обогащение через ProxyCurl
import httpx
class LinkedInEnricher:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://nubela.co/proxycurl/api"
async def enrich(self, email: str, company: str) -> dict:
async with httpx.AsyncClient() as client:
# Поиск профиля по email
response = await client.get(
f"{self.base_url}/linkedin/profile/resolve/email",
params={"email": email},
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code != 200:
return {}
profile_url = response.json().get('linkedin_profile_url')
if not profile_url:
return {}
# Получение полного профиля
profile_response = await client.get(
f"{self.base_url}/v2/linkedin",
params={"url": profile_url, "skills": "include"},
headers={"Authorization": f"Bearer {self.api_key}"}
)
return profile_response.json()
AI-извлечение технологического стека
class TechStackExtractor:
def __init__(self):
self.llm = Anthropic()
async def extract_from_website(self, domain: str) -> list[str]:
"""Извлечение tech stack с сайта компании через AI"""
# Сбор контента с сайта
job_postings = await self._scrape_job_postings(domain)
about_page = await self._scrape_page(f"https://{domain}/about")
combined_text = ' '.join([about_page] + job_postings[:5])
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Extract technology stack from this company information.
Return JSON array of technology names (programming languages, frameworks, cloud platforms, databases).
Only include clearly mentioned technologies.
Text: {combined_text[:3000]}"""
}]
)
return json.loads(response.content[0].text)
Качество и дедупликация обогащённых данных
Разные источники дают разные данные для одной компании. Нужна reconciliation логика:
def reconcile_company_info(sources: list[dict]) -> dict:
"""Объединение данных о компании из нескольких источников"""
reconciled = {}
# Приоритет источников: официальные реестры > Clearbit > Web scraping
priority_order = ['company_registry', 'clearbit', 'linkedin', 'web_scraping']
for field in ['employee_count', 'founded_year', 'industry', 'headquarters']:
for source_name in priority_order:
source = next((s for s in sources if s.get('source') == source_name), None)
if source and field in source:
reconciled[field] = source[field]
break
return reconciled
Типичный результат: обогащение 80-90% CRM контактов дополнительными данными за 2-5 секунд на запись. Качество данных: 85-90% accuracy для базовых полей (должность, индустрия, размер компании).







