TanStack Query · Phần 14 — Realtime: WebSocket, SSE & cache
Ghép dữ liệu realtime với React Query: đẩy sự kiện vào cache bằng setQueryData thay vì invalidate, quyết định patch vs invalidate, gom sự kiện, một query làm kênh subscription, và cập nhật từng phần an toàn type.
React Query là thư viện request/response — bạn hỏi, server trả. Nhưng nhiều app cần chiều ngược lại: server đẩy cập nhật xuống (chat, thông báo, giá realtime, presence). Tin tốt: React Query không cần thay thế cho realtime — nó là cache hoàn hảo để realtime ghi vào. WebSocket/SSE lo phần vận chuyển, React Query lo phần lưu trữ và phân phối tới component.
Phần này chỉ cách hợp nhất hai thế giới: giữ một query làm “nguồn sự thật” trong cache, rồi để stream realtime cập nhật chính cache đó.
1. Hai chiến lược: invalidate vs setQueryData
Khi nhận một sự kiện realtime, bạn có hai cách phản ứng:
| Chiến lược | Làm gì | Ưu | Nhược |
|---|---|---|---|
| Invalidate | invalidateQueries → query refetch | Đơn giản, luôn lấy data đúng từ server | Tốn một request mỗi sự kiện; trễ |
| setQueryData (patch) | Ghi thẳng payload sự kiện vào cache | Tức thì, 0 request | Bạn chịu trách nhiệm shape; payload phải đủ |
Quy tắc chọn:
- Payload sự kiện chứa đủ dữ liệu mới (vd “todo X đổi thành Y”) → patch bằng
setQueryData. - Sự kiện chỉ là tín hiệu “có gì đó đổi” (không kèm data, hoặc data phức tạp cần tính lại ở server) → invalidate.
- Sự kiện đến dồn dập (giá tick mỗi 100ms) → patch (invalidate sẽ tạo bão request).
2. Một hook bắc cầu WebSocket → cache
Pattern gọn nhất: một hook mở WebSocket một lần ở gốc app, rồi route từng loại message vào cache.
import { useEffect } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import { todoKeys } from '@/features/todos/keys';
type WsEvent =
| { type: 'todo.updated'; todo: Todo }
| { type: 'todo.deleted'; id: string }
| { type: 'todos.changed' }; // chỉ tín hiệu
export function useRealtimeSync() {
const qc = useQueryClient();
useEffect(() => {
const ws = new WebSocket(import.meta.env.VITE_WS_URL);
ws.addEventListener('message', (e) => {
const event = JSON.parse(e.data) as WsEvent;
switch (event.type) {
case 'todo.updated':
// Patch: payload đủ data → ghi thẳng, không refetch.
qc.setQueryData<Todo[]>(todoKeys.lists(), (old) =>
old?.map((t) => (t.id === event.todo.id ? event.todo : t)),
);
qc.setQueryData(todoKeys.detail(event.todo.id), event.todo);
break;
case 'todo.deleted':
qc.setQueryData<Todo[]>(todoKeys.lists(), (old) =>
old?.filter((t) => t.id !== event.id),
);
qc.removeQueries({ queryKey: todoKeys.detail(event.id) });
break;
case 'todos.changed':
// Tín hiệu mơ hồ → để server quyết định, refetch.
qc.invalidateQueries({ queryKey: todoKeys.lists() });
break;
}
});
return () => ws.close();
}, [qc]);
}
Component vẫn dùng useQuery(todoKeys.lists()) như thường — chúng không biết data tới từ fetch hay từ WebSocket. Mọi đồng bộ gói gọn trong một hook.
3. Server-Sent Events (SSE) — nhẹ hơn cho một chiều
Nếu chỉ cần server → client (không cần client gửi lên), SSE đơn giản và tự reconnect:
export function useSseSync() {
const qc = useQueryClient();
useEffect(() => {
const es = new EventSource('/api/stream');
es.addEventListener('price', (e) => {
const { symbol, price } = JSON.parse((e as MessageEvent).data) as Price;
// Giá tick dồn dập → luôn patch, không bao giờ invalidate.
qc.setQueryData(['price', symbol], price);
});
return () => es.close();
}, [qc]);
}
EventSource tự kết nối lại khi rớt — đỡ phải tự viết reconnect như WebSocket.
4. Gom sự kiện dồn dập (batching/throttle)
Khi sự kiện tới quá nhanh, mỗi setQueryData kích hoạt một lượt thông báo observer. Với hàng trăm tick/giây, gom lại rồi ghi theo nhịp:
function createBatcher<T>(flush: (items: T[]) => void, ms = 100) {
let buffer: T[] = [];
let timer: ReturnType<typeof setTimeout> | null = null;
return (item: T) => {
buffer.push(item);
timer ??= setTimeout(() => {
const items = buffer;
buffer = [];
timer = null;
flush(items);
}, ms);
};
}
// Dùng: gom tick 100ms rồi ghi một lần
const pushTick = createBatcher<Price>((ticks) => {
qc.setQueryData<Record<string, number>>(['prices'], (old) => {
const next = { ...old };
for (const t of ticks) next[t.symbol] = t.price;
return next;
});
}, 100);
React 18+ tự batch nhiều setState trong cùng một tick, nhưng gom trước khi chạm cache vẫn giảm số lần structural sharing chạy — đáng làm cho stream tần suất cao.
5. refetchOnReconnect & “đồng bộ lại sau gián đoạn”
Realtime có một lỗ hổng: khi kết nối rớt rồi nối lại, bạn bỏ lỡ các sự kiện trong lúc mất kết nối → cache lệch. Cách chữa: khi reconnect, invalidate một lần để lấy lại snapshot đúng từ server, rồi tiếp tục patch theo stream.
ws.addEventListener('open', () => {
// Vừa (re)connect: có thể đã lỡ sự kiện → đồng bộ lại nền tảng.
qc.invalidateQueries({ queryKey: todoKeys.lists() });
});
Kết hợp với refetchOnReconnect: true (mặc định) cho query thường: khi mạng trở lại, query active tự refetch. Realtime stream chỉ cần lo phần “đẩy delta” giữa các lần đồng bộ.
6. Một query làm “kênh subscription”
Đôi khi bạn muốn vòng đời subscription gắn với vòng đời query (mở khi component cần, đóng khi không ai dùng). Trick: dùng queryFn để lấy snapshot đầu, và một hiệu ứng phụ để subscribe — hoặc dùng experimental_streamedQuery/streaming nếu nguồn hỗ trợ. Cách thủ công, dễ hiểu:
function useLiveTodos() {
const qc = useQueryClient();
// Query lo snapshot ban đầu + cache.
const query = useQuery(todoKeys.listsQuery());
// Subscription sống cùng component đang quan tâm tới data này.
useEffect(() => {
const sub = subscribeTodos((todo) => {
qc.setQueryData<Todo[]>(todoKeys.lists(), (old) =>
old?.map((t) => (t.id === todo.id ? todo : t)),
);
});
return () => sub.unsubscribe();
}, [qc]);
return query;
}
Khi nhiều component cùng
useLiveTodos, bạn sẽ mở nhiều subscription. Nếu muốn một subscription dùng chung, nâng nó lên một provider gốc (như mục 2) thay vì trong hook dùng nhiều nơi.
7. An toàn type khi patch từng phần
Payload realtime đến từ JSON — đừng tin mù. Validate biên bằng zod (như Phần 3) trước khi ghi vào cache:
import { z } from 'zod';
import { todoSchema } from '@/features/todos/schema';
const todoUpdatedSchema = z.object({ type: z.literal('todo.updated'), todo: todoSchema });
ws.addEventListener('message', (e) => {
const parsed = todoUpdatedSchema.safeParse(JSON.parse(e.data));
if (!parsed.success) return; // payload lạ → bỏ qua, không làm hỏng cache
qc.setQueryData(todoKeys.detail(parsed.data.todo.id), parsed.data.todo);
});
Validate ở biên giữ cho cache luôn đúng shape — nếu không, một payload server lỗi có thể nhét undefined vào field bắt buộc và làm crash mọi component đang đọc query đó.
8. Bài tập
1. Khi nào nên patch bằng setQueryData và khi nào nên invalidateQueries để phản ứng với một sự kiện realtime?
Lời giải
Patch khi payload sự kiện đủ để dựng giá trị mới (vd object đầy đủ sau khi đổi) hoặc khi sự kiện đến dồn dập (giá tick) — tức thì, không tốn request. Invalidate khi sự kiện chỉ là tín hiệu “có gì đó đổi” không kèm đủ data, hoặc data cần server tính lại — đánh đổi một request lấy tính đúng.
2. Vì sao nên invalidateQueries một lần khi WebSocket (re)connect?
Lời giải
Trong lúc mất kết nối, client bỏ lỡ các sự kiện realtime → cache lệch với server. Khi nối lại, invalidate một lần để lấy snapshot đúng làm nền tảng, rồi tiếp tục patch theo stream từ đó. Không làm vậy thì cache sẽ “trôi” dần khỏi sự thật sau mỗi lần gián đoạn.
3. Vì sao phải validate payload realtime trước khi setQueryData?
Lời giải
Payload đến từ JSON ngoài tầm kiểm soát; một message lỗi/đổi shape có thể ghi data sai vào cache (vd thiếu field bắt buộc), làm crash mọi component đọc query đó. Validate biên bằng zod và bỏ qua payload không hợp lệ giữ cache luôn đúng type — đúng triết lý “validate tại biên” của Phần 3.
Nâng cao: Dựng một stream giá giả (setInterval phát tick), bắc cầu vào cache bằng batcher 100ms. Mở React Profiler và so sánh số lần render khi patch từng tick so với patch gom nhóm.
Tóm tắt
- WebSocket/SSE lo vận chuyển, React Query lo cache + phân phối; component dùng
useQuerykhông cần biết data tới từ đâu. - Phản ứng sự kiện: patch (
setQueryData) khi payload đủ/đến dồn dập; invalidate khi chỉ là tín hiệu mơ hồ. - Bắc cầu trong một hook gốc (route message theo type); SSE tự reconnect, hợp một chiều.
- Gom sự kiện tần suất cao trước khi chạm cache để giảm structural sharing/re-render.
- Khi (re)connect, invalidate một lần để đồng bộ lại nền tảng đã lỡ;
refetchOnReconnectlo phần còn lại. - Validate payload bằng zod trước khi ghi — giữ cache luôn đúng shape.
Phần tiếp theo
Phần 15 — Mutation nâng cao: mutationKey + setMutationDefaults, theo dõi mutation đang chạy bằng useMutationState, chạy tuần tự bằng scope, optimistic update trải trên nhiều query cùng lúc, xử lý lỗi/thành công toàn cục ở MutationCache, và hàng đợi mutation offline.