Реализация GraphQL Subscriptions (real-time подписки)
GraphQL Subscriptions — это механизм долгоживущего соединения между клиентом и сервером, через которое сервер отправляет данные при наступлении событий. Транспорт — обычно WebSocket (протокол graphql-ws или устаревший subscriptions-transport-ws), реже SSE. Subscriptions — третья операция GraphQL рядом с Query и Mutation.
Когда это нужно
Для сайта с GraphQL API Subscriptions закрывают задачи, где нужно обновлять UI без действий пользователя: real-time чат, уведомления, трекинг статуса заказа, live-статистика, совместное редактирование. Если уже есть GraphQL, добавить Subscriptions дешевле, чем городить отдельный WebSocket-сервер.
Серверная часть: Node.js + graphql-ws
Современный стандарт — пакет graphql-ws, который реализует протокол graphql-transport-ws:
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { PubSub } from 'graphql-subscriptions';
const pubsub = new PubSub();
const typeDefs = `
type Message {
id: ID!
roomId: String!
authorId: String!
text: String!
createdAt: String!
}
type OrderStatus {
orderId: ID!
status: String!
updatedAt: String!
}
type Query {
messages(roomId: String!): [Message!]!
}
type Mutation {
sendMessage(roomId: String!, text: String!): Message!
}
type Subscription {
messageAdded(roomId: String!): Message!
orderStatusChanged(orderId: ID!): OrderStatus!
}
`;
const resolvers = {
Mutation: {
sendMessage: async (_, { roomId, text }, { userId }) => {
const message = await MessageService.create({ roomId, text, authorId: userId });
// Публикуем событие
pubsub.publish(`MESSAGE_ADDED:${roomId}`, { messageAdded: message });
return message;
},
},
Subscription: {
messageAdded: {
subscribe: (_, { roomId }, { userId }) => {
// Проверяем доступ пользователя к комнате
if (!ChatRoom.hasAccess(userId, roomId)) {
throw new Error('Forbidden');
}
return pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
},
},
orderStatusChanged: {
subscribe: (_, { orderId }, { userId }) => {
// Пользователь может подписываться только на свои заказы
if (!Order.belongsTo(orderId, userId)) {
throw new Error('Forbidden');
}
return pubsub.asyncIterator(`ORDER_STATUS:${orderId}`);
},
},
},
};
const schema = makeExecutableSchema({ typeDefs, resolvers });
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' });
useServer(
{
schema,
context: async (ctx) => {
// Аутентификация через параметры соединения
const token = ctx.connectionParams?.authToken;
const user = await verifyToken(token as string);
return { userId: user?.id };
},
onConnect: async (ctx) => {
const token = ctx.connectionParams?.authToken;
if (!token) return false; // отклонить соединение
return true;
},
onDisconnect: (ctx, code, reason) => {
console.log(`Client disconnected: ${code} ${reason}`);
},
},
wsServer
);
httpServer.listen(4000);
Масштабирование: Redis PubSub вместо in-memory
Встроенный PubSub из graphql-subscriptions — in-memory, работает только в рамках одного процесса. При нескольких инстансах нужен graphql-redis-subscriptions:
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';
const options = {
host: process.env.REDIS_HOST,
port: 6379,
retryStrategy: (times: number) => Math.min(times * 50, 2000),
};
const pubsub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options),
});
// Использование идентично — pubsub.publish() и pubsub.asyncIterator() те же
Теперь любой инстанс может опубликовать событие, и все подписчики на всех инстансах его получат через Redis Pub/Sub.
Клиентская часть: Apollo Client
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import { getMainDefinition } from '@apollo/client/utilities';
const httpLink = new HttpLink({ uri: '/graphql' });
const wsLink = new GraphQLWsLink(
createClient({
url: 'wss://example.com/graphql',
connectionParams: () => ({
authToken: localStorage.getItem('token'),
}),
shouldRetry: () => true,
retryAttempts: 10,
on: {
connected: () => console.log('WS connected'),
closed: () => console.log('WS closed'),
},
})
);
// Query/Mutation идут через HTTP, Subscription — через WS
const splitLink = split(
({ query }) => {
const def = getMainDefinition(query);
return def.kind === 'OperationDefinition' && def.operation === 'subscription';
},
wsLink,
httpLink
);
export const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});
import { useSubscription, gql } from '@apollo/client';
const MESSAGE_ADDED = gql`
subscription MessageAdded($roomId: String!) {
messageAdded(roomId: $roomId) {
id
text
authorId
createdAt
}
}
`;
function ChatRoom({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useSubscription(MESSAGE_ADDED, {
variables: { roomId },
onData: ({ data }) => {
const newMessage = data.data?.messageAdded;
if (newMessage) {
setMessages(prev => [...prev, newMessage]);
}
},
onError: (error) => console.error('Subscription error:', error),
});
return (
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.text}</div>
))}
</div>
);
}
Фильтрация событий на уровне сервера
Иногда нужно фильтровать события прямо в резолвере, чтобы не гонять лишнее по сети:
import { withFilter } from 'graphql-subscriptions';
const resolvers = {
Subscription: {
messageAdded: {
// withFilter оборачивает asyncIterator и фильтрует события
subscribe: withFilter(
(_, { roomId }) => pubsub.asyncIterator(`MESSAGES`),
(payload, variables) => {
// Отправляем только если roomId совпадает
return payload.messageAdded.roomId === variables.roomId;
}
),
},
},
};
Это позволяет использовать один широкий канал MESSAGES вместо канала на каждую комнату, фильтруя на сервере.
Управление соединениями и утечки памяти
При неосторожном обращении Subscriptions могут привести к утечкам: итераторы не закрываются, слушатели накапливаются.
// Правильное завершение итератора
const MESSAGE_ADDED_SUBSCRIPTION = {
subscribe: async function* (_, { roomId }, context) {
const iterator = pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
try {
for await (const value of iterator) {
yield value;
}
} finally {
// Вызывается при отписке клиента
iterator.return?.();
}
},
};
graphql-ws автоматически вызывает return() у итератора при закрытии соединения или явной отписке, но явный try/finally защищает от нестандартных кейсов.
Интеграция с Laravel бэкендом
Если GraphQL API реализован на PHP (Lighthouse), события можно публиковать через Redis из Laravel и принимать в Node.js WebSocket-сервере:
// Laravel публикует событие
Redis::publish('ORDER_STATUS:' . $order->id, json_encode([
'orderStatusChanged' => [
'orderId' => $order->id,
'status' => $order->status,
'updatedAt' => now()->toISOString(),
],
]));
// Node.js WS-сервер слушает Redis и форвардит в pubsub
const subscriber = new Redis({ host: process.env.REDIS_HOST });
subscriber.psubscribe('ORDER_STATUS:*');
subscriber.on('pmessage', (pattern, channel, message) => {
const orderId = channel.split(':')[1];
const data = JSON.parse(message);
pubsub.publish(`ORDER_STATUS:${orderId}`, data);
});
Тестирование
import { createTestClient } from 'apollo-server-testing';
import { execute, subscribe } from 'graphql';
it('должен доставлять сообщения подписчикам', async () => {
const results: any[] = [];
const iterator = await subscribe({
schema,
document: parse(`subscription { messageAdded(roomId: "room1") { id text } }`),
contextValue: { userId: 'user1' },
});
// Собираем первое событие
const firstResult = (await (iterator as AsyncIterator<any>).next()).value;
// Публикуем событие
pubsub.publish('MESSAGE_ADDED:room1', {
messageAdded: { id: '1', text: 'Hello', roomId: 'room1', authorId: 'user2' }
});
expect(firstResult.data.messageAdded.text).toBe('Hello');
});
Сроки
Базовые Subscriptions с одним типом события на существующем GraphQL-сервере — 2–3 дня. Полноценная реализация с Redis PubSub, аутентификацией через connectionParams, несколькими типами подписок и тестами — 1–1.5 недели. Добавление к Laravel/Lighthouse через гибридную схему PHP + Node.js — ещё 2–3 дня.







