jvinhit//lab

Search posts

Type to search across journal entries.

navigate open esc close

Node.js Super Senior · Phase 19 — Realtime với WebSockets

Bonus Phase 19: realtime trên Node. WebSocket vs SSE vs polling, server với ws và Socket.IO, room và namespace, scale ngang nhiều instance bằng Redis adapter, authenticate kết nối, heartbeat và reconnect, backpressure.

Đây là Bonus Phase 19. HTTP (Phase 2) là một chiều: client hỏi, server trả. Nhưng chat, notification, dashboard live, presence cần server chủ động đẩy dữ liệu xuống client. Đó là realtime — và lựa chọn công nghệ cùng cách scale nó là kiến thức senior thật.


Chọn đúng công nghệ: WebSocket vs SSE vs polling

Đừng mặc định WebSocket. Mỗi cách có chỗ của nó:

PollingSSE (Server-Sent Events)WebSocket
Hướngclient hỏi lặpserver → client (1 chiều)hai chiều
Giao thứcHTTPHTTP (text/event-stream)ws:// (nâng cấp từ HTTP)
Reconnecttự nhiêntự động (built-in)tự lo
Hợp chođơn giản, tần suất thấpfeed, notification, giá livechat, game, collab hai chiều

Quy tắc: chỉ cần server đẩy một chiều (thông báo, stock ticker) → SSE đơn giản hơn nhiều. Cần hai chiều thực sự (chat, multiplayer) → WebSocket. Đừng kéo WebSocket vào khi SSE đủ.

// SSE — đôi khi là tất cả những gì bạn cần
app.get('/events', (req, res) => {
  res.writeHead(200, { 'Content-Type': 'text/event-stream', Connection: 'keep-alive' });
  const timer = setInterval(() => res.write(`data: ${JSON.stringify({ t: Date.now() })}\n\n`), 1000);
  req.on('close', () => clearInterval(timer));
});

WebSocket thô với ws

ws là thư viện WebSocket nền tảng cho Node:

import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (socket) => {
  socket.on('message', (raw) => {
    const msg = JSON.parse(raw.toString());
    // broadcast cho mọi client
    for (const client of wss.clients) {
      if (client.readyState === client.OPEN) client.send(JSON.stringify(msg));
    }
  });
  socket.send(JSON.stringify({ type: 'welcome' }));
});

ws mạnh nhưng thô — bạn tự lo room, reconnect, fallback, ack. Cho phần lớn app, Socket.IO lo những thứ đó.


Socket.IO — pin đầy đủ

npm install socket.io
import { Server } from 'socket.io';
const io = new Server(httpServer, { cors: { origin: process.env.WEB_ORIGIN } });

io.on('connection', (socket) => {
  socket.join(`user:${socket.data.userId}`);     // room theo user

  socket.on('chat:send', (msg) => {
    io.to(`room:${msg.roomId}`).emit('chat:message', msg); // gửi tới một room
  });

  socket.on('disconnect', () => { /* cleanup presence */ });
});

Roomnamespace là khái niệm tổ chức kết nối: room gom các socket (một phòng chat, một tài liệu), namespace tách các kênh logic (/admin vs /chat). Socket.IO còn cho ack, auto-reconnect, và fallback — đáng dùng hơn ws thô cho app thật.


Authenticate kết nối

WebSocket không gửi header như request thường sau handshake — auth phải làm lúc kết nối. Dùng JWT (Phase 15) qua middleware:

io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  try {
    const claims = jwt.verify(token, process.env.JWT_SECRET!);
    socket.data.userId = (claims as { sub: string }).sub; // gắn vào socket
    next();
  } catch {
    next(new Error('unauthorized'));
  }
});

Đừng tin payload từ client — luôn xác thực danh tính ở handshake và gắn userId server-side để biết ai gửi mỗi event.


Scale ngang — bài toán then chốt

Đây là lý do nhiều dự án realtime sập khi lên production. Khi bạn chạy nhiều instance (Phase 8/10) sau load balancer, client A nối instance 1, client B nối instance 2. A gửi tin → instance 1 broadcast → B không nhận vì ở instance khác.

              ┌── instance 1 (client A)
LB ──────────┤
              └── instance 2 (client B)   ← A emit chỉ tới client trên instance 1

Lời giải: adapter dùng Redis pub/sub (Phase 13) để các instance chuyển tiếp event cho nhau:

import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const pub = createClient({ url: process.env.REDIS_URL });
const sub = pub.duplicate();
await Promise.all([pub.connect(), sub.connect()]);
io.adapter(createAdapter(pub, sub)); // mọi instance giờ broadcast được xuyên nhau

Thêm sticky session ở load balancer (Socket.IO cần client luôn về đúng instance trong một phiên) — cấu hình ip_hash ở Nginx hoặc session affinity ở K8s.


Heartbeat, reconnect & backpressure

  • Heartbeat (ping/pong): phát hiện kết nối chết (mạng rớt không gửi FIN). Socket.IO làm sẵn; với ws tự setInterval ping và terminate socket không pong.
  • Reconnect: Socket.IO tự thử lại với backoff; thiết kế event idempotent và gửi lại state khi reconnect.
  • Backpressure: nếu client chậm, buffer gửi phình bộ nhớ. Kiểm socket.bufferedAmount (ws) và bỏ/giảm tần suất message khi quá tải.

Thực hành

  1. Dựng chat với Socket.IO: room theo roomId, presence (ai online) theo room.
  2. Authenticate handshake bằng JWT (Phase 15), gắn userId server-side.
  3. Chạy 2 instance sau Nginx; quan sát tin nhắn không xuyên instance.
  4. Thêm Redis adapter (Phase 13) + sticky session; xác nhận tin xuyên instance.
  5. Thêm heartbeat phát hiện kết nối chết và xử lý reconnect gửi lại state.
  6. So sánh: làm lại tính năng “notification một chiều” bằng SSE và nhận xét độ phức tạp.

Phần tiếp theo

Giờ bạn có hệ thống phân tán, realtime, nhiều service — và không thấy gì đang xảy ra bên trong khi có sự cố. Phase cuối lấp khoảng đó. Phase 20 — Observability đi ba trụ cột: structured logging tương quan theo request, metrics (RED/USE) với Prometheus, và distributed tracing với OpenTelemetry để lần một request xuyên qua mọi service — công cụ debug production tối thượng.