GNN-анализ социальных графов
Социальный граф — одна из наиболее естественных областей применения GNN. Классические задачи: детекция ботов и спама, предсказание связей (link prediction), обнаружение сообществ, анализ влияния. Структурная информация (кто с кем связан) часто важнее контентной для этих задач.
Community Detection и анализ сообществ
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GAEConv
from torch_geometric.utils import to_networkx, negative_sampling
import networkx as nx
import numpy as np
import pandas as pd
from community import community_louvain # python-louvain
class SocialGraphAnalyzer:
"""Анализ структуры социального графа"""
def build_graph_from_edges(self, edges: pd.DataFrame,
node_features: pd.DataFrame = None) -> tuple:
"""
edges: source_id, target_id, weight (optional)
node_features: node_id, feature_1, ..., feature_n
"""
# Маппинг строковых ID в числовые индексы
all_nodes = pd.unique(edges[['source_id', 'target_id']].values.ravel())
node_idx = {nid: i for i, nid in enumerate(all_nodes)}
n_nodes = len(node_idx)
src = edges['source_id'].map(node_idx).values
dst = edges['target_id'].map(node_idx).values
# Ненаправленный граф: добавляем обратные рёбра
edge_index = torch.tensor([
np.concatenate([src, dst]),
np.concatenate([dst, src])
], dtype=torch.long)
# Признаки узлов
if node_features is not None:
feat_matrix = node_features.set_index('node_id').reindex(all_nodes).fillna(0).values
x = torch.tensor(feat_matrix, dtype=torch.float)
else:
# Degree как базовый признак
degrees = np.bincount(src, minlength=n_nodes) + np.bincount(dst, minlength=n_nodes)
x = torch.tensor(degrees.reshape(-1, 1), dtype=torch.float)
return edge_index, x, node_idx
def detect_communities_louvain(self, edge_index: torch.Tensor,
n_nodes: int) -> dict:
"""
Алгоритм Лувена для обнаружения сообществ.
Оптимизирует modularity — меру качества разбиения.
"""
# Конвертируем в NetworkX
G = nx.Graph()
G.add_nodes_from(range(n_nodes))
edges = edge_index.T.numpy()
G.add_edges_from(edges)
# Алгоритм Лувена
partition = community_louvain.best_partition(G)
# Modularity quality
modularity = community_louvain.modularity(partition, G)
community_sizes = pd.Series(partition).value_counts().sort_values(ascending=False)
return {
'node_to_community': partition,
'n_communities': len(set(partition.values())),
'modularity': round(modularity, 4),
'largest_community_size': int(community_sizes.iloc[0]),
'community_size_distribution': community_sizes.head(10).to_dict()
}
def compute_node_centrality(self, G: nx.Graph,
top_k: int = 20) -> pd.DataFrame:
"""Метрики центральности узлов"""
# Degree centrality
degree_centrality = nx.degree_centrality(G)
# Betweenness (для небольших графов; для больших — approximation)
if G.number_of_nodes() < 5000:
betweenness = nx.betweenness_centrality(G, normalized=True)
else:
betweenness = nx.betweenness_centrality(G, k=500, normalized=True) # Аппроксимация
# PageRank
pagerank = nx.pagerank(G, alpha=0.85, max_iter=100)
df = pd.DataFrame({
'degree_centrality': degree_centrality,
'betweenness': betweenness,
'pagerank': pagerank,
})
# Нормализованный composite score
df_norm = (df - df.min()) / (df.max() - df.min() + 1e-9)
df['influence_score'] = (
df_norm['degree_centrality'] * 0.30 +
df_norm['betweenness'] * 0.35 +
df_norm['pagerank'] * 0.35
)
return df.nlargest(top_k, 'influence_score')
class BotDetectorGNN(nn.Module):
"""GNN для детекции ботов в социальных сетях"""
def __init__(self, node_features: int, hidden_dim: int = 64):
super().__init__()
# GAT лучше GCN для этой задачи:
# боты часто связаны аномально — attention выявляет это
from torch_geometric.nn import GATConv
self.conv1 = GATConv(node_features, hidden_dim, heads=4, dropout=0.3)
self.conv2 = GATConv(hidden_dim * 4, hidden_dim, heads=1, dropout=0.3)
self.conv3 = GATConv(hidden_dim, 32, heads=1, dropout=0.3)
self.classifier = nn.Sequential(
nn.Linear(32, 16),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(16, 2) # Human vs Bot
)
def forward(self, x, edge_index):
x = F.elu(self.conv1(x, edge_index))
x = F.elu(self.conv2(x, edge_index))
x = self.conv3(x, edge_index)
return self.classifier(x)
def get_bot_probability(self, x: torch.Tensor,
edge_index: torch.Tensor) -> np.ndarray:
self.eval()
with torch.no_grad():
logits = self.forward(x, edge_index)
probs = torch.softmax(logits, dim=-1)[:, 1]
return probs.cpu().numpy()
class LinkPredictor(nn.Module):
"""
Link prediction: предсказываем появление новых связей.
Применения: «Кого вы можете знать?», рекомендации партнёров, fraud rings.
"""
def __init__(self, node_features: int, hidden_dim: int = 64):
super().__init__()
self.encoder = nn.ModuleList([
GCNConv(node_features, hidden_dim),
GCNConv(hidden_dim, hidden_dim // 2),
])
# Декодер: из эмбеддингов двух узлов предсказываем связь
self.decoder = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def encode(self, x, edge_index):
for conv in self.encoder:
x = F.relu(conv(x, edge_index))
return x
def decode(self, z, edge_index):
"""Произведение эмбеддингов пар узлов"""
src_emb = z[edge_index[0]]
dst_emb = z[edge_index[1]]
return self.decoder(src_emb * dst_emb).squeeze()
def forward(self, x, edge_index, pos_edge, neg_edge=None):
z = self.encode(x, edge_index)
pos_scores = self.decode(z, pos_edge)
if neg_edge is not None:
neg_scores = self.decode(z, neg_edge)
return pos_scores, neg_scores
return pos_scores
def predict_new_links(self, z: torch.Tensor,
candidate_pairs: torch.Tensor,
threshold: float = 0.7) -> list:
"""Предсказание новых связей из кандидатных пар"""
with torch.no_grad():
scores = self.decode(z, candidate_pairs)
predicted = []
for i, score in enumerate(scores):
if float(score) >= threshold:
predicted.append({
'node_a': int(candidate_pairs[0, i]),
'node_b': int(candidate_pairs[1, i]),
'probability': round(float(score), 3)
})
return sorted(predicted, key=lambda x: -x['probability'])
Детекция мошеннических колец
class FraudRingDetector:
"""Обнаружение организованного мошенничества через анализ подграфов"""
def __init__(self, gnn_model: BotDetectorGNN):
self.model = gnn_model
def find_suspicious_clusters(self, graph_data,
bot_probs: np.ndarray,
min_cluster_bot_ratio: float = 0.6,
min_cluster_size: int = 5) -> list[dict]:
"""
Ищем плотно связанные подграфы с высокой долей ботов.
Признак fraud ring: взаимосвязанная группа аккаунтов.
"""
G = to_networkx(graph_data, to_undirected=True)
# Добавляем вероятности ботов как атрибуты узлов
for node_id in G.nodes():
G.nodes[node_id]['bot_prob'] = float(bot_probs[node_id])
suspicious_clusters = []
# Находим клики и плотные подграфы
for component in nx.connected_components(G):
if len(component) < min_cluster_size:
continue
subgraph = G.subgraph(component)
nodes = list(component)
bot_ratio = np.mean([G.nodes[n]['bot_prob'] for n in nodes])
if bot_ratio < min_cluster_bot_ratio:
continue
# Метрики плотности кластера
density = nx.density(subgraph)
avg_clustering = nx.average_clustering(subgraph)
suspicious_clusters.append({
'cluster_id': len(suspicious_clusters),
'nodes': nodes,
'size': len(nodes),
'bot_probability': round(float(bot_ratio), 3),
'density': round(density, 3),
'avg_clustering': round(avg_clustering, 3),
'risk_score': round(bot_ratio * density * avg_clustering, 3)
})
return sorted(suspicious_clusters, key=lambda x: -x['risk_score'])
GNN для детекции ботов в Twitter/Telegram: AUC 0.90-0.94 (TwiBot-22 датасет). Link prediction: Hits@50 около 0.65-0.75 на OGB-Collab. Ключевое преимущество перед feature-based методами: GNN улавливает паттерны сговора (coordinated inauthentic behavior) через структуру, которую боты не могут скрыть.







