Защита от credential stuffing атак
Credential stuffing — автоматическая проверка украденных логин/пароль пар из утечек данных против целевого сайта. В отличие от brute force это не перебор паролей, а тестирование конкретных учётных данных с других взломанных сервисов. Конверсия таких атак: 0.1–2% — при базе в 10 млн пар это 10–200K взломанных аккаунтов.
Почему стандартный rate limiting не работает
Современные credential stuffing атаки:
- Распределены по тысячам IP (резидентные прокси, ботнеты)
- Имитируют заголовки браузера
- Добавляют случайные задержки между запросами
- Ротируют User-Agent и cookies
Задача — детектировать аномалии в поведении при логине, не блокируя легитимных пользователей.
Многоуровневая защита
class LoginProtectionService:
def __init__(self, redis, db, device_fp_service):
self.r = redis
self.db = db
self.dfp = device_fp_service
def check_login_attempt(self, request, email: str) -> dict:
"""
Проверить попытку входа до обращения к БД.
Возвращает {'allowed': bool, 'action': str, 'reason': str}
"""
ip = request.remote_addr
checks = [
self._check_ip_reputation(ip),
self._check_ip_velocity(ip),
self._check_email_velocity(email),
self._check_global_failure_rate(),
self._check_device_fingerprint(request),
]
for check in checks:
if not check['allowed']:
return check
return {'allowed': True, 'action': 'proceed'}
def _check_ip_reputation(self, ip: str) -> dict:
"""Проверка по спискам плохих IP"""
# AbuseIPDB, Cloudflare Threat Intelligence, MaxMind
if self.r.sismember('blocked_ips', ip):
return {'allowed': False, 'action': 'block', 'reason': 'blocked_ip'}
risk = self.r.get(f'ip_risk:{ip}')
if risk and int(risk) > 80:
return {'allowed': False, 'action': 'challenge', 'reason': 'high_risk_ip'}
return {'allowed': True}
def _check_ip_velocity(self, ip: str) -> dict:
"""Количество попыток с IP за последние 10 минут"""
key = f'login_attempts:ip:{ip}'
count = self.r.incr(key)
self.r.expire(key, 600)
if count > 20:
return {'allowed': False, 'action': 'block', 'reason': f'ip_velocity:{count}'}
if count > 10:
return {'allowed': False, 'action': 'challenge', 'reason': f'ip_velocity:{count}'}
return {'allowed': True}
def _check_email_velocity(self, email: str) -> dict:
"""Количество попыток к конкретному аккаунту"""
import hashlib
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
key = f'login_attempts:email:{email_hash}'
count = self.r.incr(key)
self.r.expire(key, 900) # 15 минут
if count > 5:
# Временная блокировка аккаунта
self.r.setex(f'account_locked:{email_hash}', 900, '1')
return {'allowed': False, 'action': 'lock', 'reason': f'account_lockout:{count}'}
return {'allowed': True}
def _check_global_failure_rate(self) -> dict:
"""Аномальный рост отказов входа по всему сайту"""
key = 'global_login_failures'
failures = int(self.r.get(key) or 0)
total = int(self.r.get('global_login_total') or 1)
failure_rate = failures / total
if failure_rate > 0.5 and total > 100:
# Более 50% отказов — признак массовой атаки
return {'allowed': False, 'action': 'challenge', 'reason': 'global_attack_detected'}
return {'allowed': True}
def _check_device_fingerprint(self, request) -> dict:
"""Device fingerprint из заголовков"""
fp = self.dfp.compute(request)
key = f'fp_failures:{fp}'
failures = int(self.r.get(key) or 0)
if failures > 3:
return {'allowed': False, 'action': 'block', 'reason': f'fp_failures:{failures}'}
return {'allowed': True}
def record_failure(self, request, email: str):
"""Записать неудачную попытку"""
ip = request.remote_addr
import hashlib
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
fp = self.dfp.compute(request)
pipe = self.r.pipeline()
pipe.incr(f'fp_failures:{fp}')
pipe.expire(f'fp_failures:{fp}', 3600)
pipe.incr('global_login_failures')
pipe.expire('global_login_failures', 60) # окно 1 минута
pipe.execute()
Проверка Have I Been Pwned
import hashlib
import httpx
async def is_password_compromised(password: str) -> bool:
"""
k-Anonymity: отправляем только первые 5 символов SHA1-хеша.
HIBP не узнает исходный пароль.
"""
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix = sha1[:5]
suffix = sha1[5:]
async with httpx.AsyncClient() as client:
resp = await client.get(
f'https://api.pwnedpasswords.com/range/{prefix}',
headers={'Add-Padding': 'true'}
)
for line in resp.text.splitlines():
hash_suffix, count = line.split(':')
if hash_suffix == suffix:
return int(count) > 0
return False
# Использование при регистрации или смене пароля
async def validate_new_password(password: str) -> list[str]:
errors = []
if len(password) < 12:
errors.append('Минимум 12 символов')
if await is_password_compromised(password):
errors.append('Этот пароль фигурирует в утечках данных. Выберите другой.')
return errors
Device fingerprinting
import hashlib
class DeviceFingerprintService:
def compute(self, request) -> str:
"""Вычислить fingerprint из заголовков без cookies"""
components = [
request.headers.get('User-Agent', ''),
request.headers.get('Accept-Language', ''),
request.headers.get('Accept-Encoding', ''),
request.headers.get('Accept', ''),
# TLS fingerprint передаётся nginx через заголовок
request.headers.get('X-JA3-Fingerprint', ''),
# Разрешение экрана и timezone из JS (передаётся в теле)
request.json.get('tz', '') if request.is_json else '',
]
raw = '|'.join(components)
return hashlib.sha256(raw.encode()).hexdigest()[:32]
Прогрессивное усиление защиты
@app.route('/api/auth/login', methods=['POST'])
def login():
email = request.json.get('email', '').lower().strip()
password = request.json.get('password', '')
# Проверка перед обращением к БД
check = login_protection.check_login_attempt(request, email)
if check['action'] == 'block':
return jsonify({'error': 'Too many attempts'}), 429
if check['action'] == 'challenge':
# Проверить Turnstile/hCaptcha токен
token = request.json.get('captcha_token')
if not verify_captcha(token):
return jsonify({
'error': 'CAPTCHA required',
'captcha': True,
'site_key': CAPTCHA_SITE_KEY
}), 429
if check['action'] == 'lock':
# Отправить письмо для разблокировки
send_unlock_email(email)
return jsonify({
'error': 'Account temporarily locked. Check your email.'
}), 429
# Обычная аутентификация
user = db.get_user_by_email(email)
if not user or not user.verify_password(password):
login_protection.record_failure(request, email)
# Одинаковое время ответа для существующих и несуществующих пользователей
return jsonify({'error': 'Invalid credentials'}), 401
# Успешный вход — сбросить счётчики
login_protection.record_success(request, email)
return jsonify({
'token': generate_token(user.id),
'user': user.to_dict()
})
Уведомления о подозрительных входах
def notify_suspicious_login(user, request, reason: str):
"""Уведомить пользователя о входе с нового устройства/места"""
ip = request.remote_addr
location = geoip.city(ip)
send_email(
to=user.email,
subject='Новый вход в аккаунт',
template='suspicious_login',
vars={
'ip': ip,
'city': location.city.name if location else 'Неизвестно',
'country': location.country.name if location else 'Неизвестно',
'user_agent': request.headers.get('User-Agent', ''),
'time': datetime.utcnow().strftime('%d.%m.%Y %H:%M UTC'),
'revoke_url': generate_revoke_url(user.id, session_id)
}
)
Срок выполнения
Реализация защиты от credential stuffing с HIBP-проверкой, device fingerprinting и прогрессивными ограничениями — 3–5 рабочих дней.







