Система обнаружения платёжного мошенничества
Платёжный фрод — одна из самых дорогих проблем для e-commerce: chargeback'и, штрафы платёжных систем, блокировка мерчант-аккаунта. Автоматическая система оценки риска перед каждой транзакцией снижает фрод без ухудшения конверсии для добросовестных покупателей.
Типы мошенничества и атрибуты
Card Testing — проверка украденных карт мелкими транзакциями.
Признаки: много попыток с одного IP/устройства, суммы $0.01–$1, разные номера карт, короткие интервалы.
Account Takeover (ATO) — захват аккаунта и кража сохранённых карт.
Признаки: смена billing адреса перед покупкой, вход с нового устройства, немедленная крупная покупка.
Friendly Fraud — покупатель заказывает товар и подаёт chargeback.
Признаки: история chargeback, VPN/прокси, доставка в freight forwarder.
Risk Score модель
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class PaymentContext:
user_id: Optional[int]
email: str
ip: str
card_bin: str # первые 6 цифр карты
card_last4: str
amount: float
currency: str
billing_country: str
shipping_country: Optional[str]
device_fingerprint: str
user_agent: str
session_age_seconds: int
class FraudScorer:
def __init__(self, redis, db, geoip, maxmind):
self.r = redis
self.db = db
self.geoip = geoip
self.maxmind = maxmind # MaxMind minFraud
def score(self, ctx: PaymentContext) -> dict:
signals = []
total_score = 0
# === Velocity checks ===
v = self._velocity_checks(ctx)
signals.extend(v['signals'])
total_score += v['score']
# === Geolocation checks ===
g = self._geo_checks(ctx)
signals.extend(g['signals'])
total_score += g['score']
# === Card checks ===
c = self._card_checks(ctx)
signals.extend(c['signals'])
total_score += c['score']
# === Account checks ===
if ctx.user_id:
a = self._account_checks(ctx)
signals.extend(a['signals'])
total_score += a['score']
# === Device checks ===
d = self._device_checks(ctx)
signals.extend(d['signals'])
total_score += d['score']
final_score = min(total_score, 100)
return {
'score': final_score,
'signals': signals,
'decision': self._make_decision(final_score, ctx),
'timestamp': time.time()
}
def _velocity_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
# Количество попыток оплаты с IP за 1 час
ip_key = f"payment_attempts:ip:{ctx.ip}"
ip_count = self.r.incr(ip_key)
self.r.expire(ip_key, 3600)
if ip_count > 20:
score += 40
signals.append('ip_velocity_critical')
elif ip_count > 10:
score += 20
signals.append('ip_velocity_high')
# Количество уникальных карт с IP за 24 часа
cards_key = f"cards_tried:ip:{ctx.ip}"
self.r.sadd(cards_key, ctx.card_last4)
self.r.expire(cards_key, 86400)
card_count = self.r.scard(cards_key)
if card_count > 3:
score += 35
signals.append(f'multiple_cards_from_ip:{card_count}')
# Неудачные попытки оплаты за 1 час
failures_key = f"payment_failures:ip:{ctx.ip}"
failures = int(self.r.get(failures_key) or 0)
if failures > 5:
score += 30
signals.append(f'payment_failures:{failures}')
return {'score': score, 'signals': signals}
def _geo_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
ip_location = self.geoip.city(ctx.ip)
ip_country = ip_location.country.iso_code if ip_location else None
# IP страна vs billing страна
if ip_country and ip_country != ctx.billing_country:
score += 20
signals.append(f'country_mismatch:ip={ip_country},billing={ctx.billing_country}')
# Доставка в другую страну
if ctx.shipping_country and ctx.shipping_country != ctx.billing_country:
score += 10
signals.append('shipping_billing_country_mismatch')
# VPN/Tor/прокси (MaxMind Insights)
ip_risk = self.maxmind.insights(ctx.ip)
if ip_risk.ip_address.is_anonymous_vpn:
score += 25
signals.append('vpn_detected')
if ip_risk.ip_address.is_tor_exit_node:
score += 35
signals.append('tor_detected')
if ip_risk.ip_address.is_public_proxy:
score += 20
signals.append('proxy_detected')
return {'score': score, 'signals': signals}
def _card_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
# BIN страна vs billing страна
bin_country = self._get_bin_country(ctx.card_bin)
if bin_country and bin_country != ctx.billing_country:
score += 15
signals.append(f'bin_country_mismatch:{bin_country}')
# Prepaid карта (высокий риск анонимности)
if self._is_prepaid_bin(ctx.card_bin):
score += 15
signals.append('prepaid_card')
# Эта карта была в чарджбэках
card_key = f"card_chargebacks:{ctx.card_bin}:{ctx.card_last4}"
if self.r.exists(card_key):
score += 40
signals.append('card_chargeback_history')
return {'score': score, 'signals': signals}
def _account_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
user = self.db.get_user(ctx.user_id)
# Аккаунт создан недавно
account_age_days = (time.time() - user.created_at.timestamp()) / 86400
if account_age_days < 1:
score += 20
signals.append('new_account_1day')
elif account_age_days < 7:
score += 10
signals.append('new_account_7days')
# История чарджбэков у пользователя
chargebacks = self.db.get_user_chargebacks(ctx.user_id)
if len(chargebacks) > 0:
score += 30 * len(chargebacks)
signals.append(f'user_chargeback_history:{len(chargebacks)}')
# Смена адреса/email перед покупкой
recent_profile_change = self.db.get_recent_profile_change(ctx.user_id, hours=24)
if recent_profile_change:
score += 15
signals.append('recent_profile_change')
# Сессия слишком короткая
if ctx.session_age_seconds < 30:
score += 10
signals.append('very_short_session')
return {'score': score, 'signals': signals}
def _device_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
# Device был замечен в мошенничестве
fp_key = f"fraud_device:{ctx.device_fingerprint}"
if self.r.exists(fp_key):
score += 50
signals.append('known_fraud_device')
# Один fingerprint — много аккаунтов
accounts_key = f"device_accounts:{ctx.device_fingerprint}"
account_count = self.r.scard(accounts_key)
if account_count > 3:
score += 25
signals.append(f'device_multiple_accounts:{account_count}')
self.r.sadd(accounts_key, ctx.user_id or ctx.email)
self.r.expire(accounts_key, 86400 * 30)
return {'score': score, 'signals': signals}
def _make_decision(self, score: int, ctx: PaymentContext) -> str:
# Высокая сумма повышает порог для блока
threshold_block = 70 if ctx.amount > 500 else 60
threshold_review = 40
if score >= threshold_block:
return 'decline'
if score >= threshold_review:
return '3ds_challenge' # потребовать 3D Secure
if score >= 25:
return 'review' # флаг для ручной проверки
return 'approve'
Интеграция со Stripe
@app.route('/api/payment/charge', methods=['POST'])
@login_required
def create_charge():
data = request.json
# Собрать контекст
ctx = PaymentContext(
user_id=current_user.id,
email=current_user.email,
ip=request.remote_addr,
card_bin=data['card_bin'],
card_last4=data['card_last4'],
amount=data['amount'],
currency=data.get('currency', 'USD'),
billing_country=data['billing_country'],
shipping_country=data.get('shipping_country'),
device_fingerprint=data.get('device_fingerprint', ''),
user_agent=request.headers.get('User-Agent', ''),
session_age_seconds=data.get('session_age', 0)
)
result = fraud_scorer.score(ctx)
if result['decision'] == 'decline':
log_fraud_attempt(ctx, result)
return jsonify({'error': 'Payment declined'}), 402
# Stripe metadata для последующего анализа
payment_intent = stripe.PaymentIntent.create(
amount=int(ctx.amount * 100),
currency=ctx.currency,
payment_method=data['payment_method_id'],
metadata={
'fraud_score': result['score'],
'fraud_decision': result['decision'],
'user_id': ctx.user_id,
},
# 3DS если требуется
payment_method_options={
'card': {
'request_three_d_secure': 'any'
if result['decision'] == '3ds_challenge'
else 'automatic'
}
}
)
return jsonify({'client_secret': payment_intent.client_secret})
Обработка chargeback webhook
@app.route('/webhooks/stripe', methods=['POST'])
def stripe_webhook():
event = stripe.Webhook.construct_event(
request.data,
request.headers['Stripe-Signature'],
STRIPE_WEBHOOK_SECRET
)
if event['type'] == 'charge.dispute.created':
charge = event['data']['object']
metadata = charge.get('metadata', {})
# Занести в базу фрод-данных
if metadata.get('user_id'):
fraud_db.mark_user_chargeback(
user_id=metadata['user_id'],
charge_id=charge['id'],
amount=charge['amount']
)
# Занести device fingerprint
if metadata.get('device_fingerprint'):
redis.setex(
f"fraud_device:{metadata['device_fingerprint']}",
86400 * 90, # 90 дней
'1'
)
return jsonify({'status': 'ok'})
Срок выполнения
Реализация системы fraud detection с velocity checks, geo-анализом, HIBP-аналогичной BIN-проверкой и интеграцией в Stripe — 4–7 рабочих дней.







