Network Programming · Part 8 — Streams, Backpressure & Message Framing
TCP is a byte stream with no message boundaries — learn delimiter vs length-prefix framing, stateful decoders, and backpressure with socket.write() and drain in Node.js + TypeScript, bilingual.
This is Part 8 of a 10-part series on network programming with Node.js + TypeScript {Đây là Phần 8 của series 10 bài về lập trình mạng với Node.js + TypeScript}. Part 2 promised a hard truth: TCP is a byte stream — it preserves order and reliability, but it does not preserve message boundaries {Phần 2 đã hứa một sự thật khó: TCP là luồng byte — giữ thứ tự và tin cậy, nhưng không giữ ranh giới message}. Today we face that truth head-on, add framing so logical messages survive arbitrary chunking, and handle backpressure so fast senders do not blow up memory {Hôm nay ta đối mặt sự thật đó, thêm framing để message logic sống sót qua việc cắt chunk tùy ý, và xử lý backpressure để bên gửi nhanh không làm tràn bộ nhớ}.
The core truth: no message boundaries {Sự thật cốt lõi: không có ranh giới message}
When you socket.write('HELLO') then socket.write('WORLD'), TCP guarantees the receiver gets the bytes in order — but not how they are grouped into 'data' events {Khi bạn socket.write('HELLO') rồi socket.write('WORLD'), TCP đảm bảo bên nhận nhận byte đúng thứ tự — nhưng không đảm bảo cách chúng được gom thành sự kiện 'data'}.
Two things happen in production all the time {Hai điều này xảy ra liên tục trên production}:
- One message, many chunks — a single JSON payload arrives split across several
'data'callbacks {Một message, nhiều chunk — một payload JSON đến bị cắt qua nhiều callback'data'}. - Many messages, one chunk — two back-to-back writes land in a single
'data'event {Nhiều message, một chunk — hai lầnwriteliên tiếp nằm chung một sự kiện'data'}.
The OS, the NIC, and Nagle’s algorithm decide chunk sizes — not your application {OS, NIC, và thuật toán Nagle quyết định kích thước chunk — không phải ứng dụng của bạn}. Your protocol must reconstruct logical messages from the byte stream {Giao thức của bạn phải dựng lại message logic từ luồng byte}.
Key idea {Ý chính}:
'data'tells you “more bytes arrived” — not “here is one complete message” {'data'báo “thêm byte đã đến” — không phải “đây là một message hoàn chỉnh”}.
The naive server that breaks {Server ngây thơ bị vỡ}
This server assumes one 'data' event = one JSON message {Server này giả định một sự kiện 'data' = một message JSON}. It works on localhost with tiny payloads — then fails under real traffic {Chạy được trên localhost với payload nhỏ — rồi hỏng khi traffic thật}.
import { createServer } from 'node:net';
const server = createServer((socket) => {
socket.on('data', (chunk: Buffer) => {
// ❌ WRONG: treats each chunk as a complete message
const msg = JSON.parse(chunk.toString('utf8'));
console.log('got:', msg);
socket.write(JSON.stringify({ ok: true, echo: msg }) + '\n');
});
});
server.listen(3000);
A client that fires two small messages quickly {Client gửi hai message nhỏ liên tiếp}:
import { connect } from 'node:net';
const socket = connect(3000, () => {
socket.write(JSON.stringify({ id: 1, cmd: 'ping' }));
socket.write(JSON.stringify({ id: 2, cmd: 'ping' }));
});
What breaks {Điều gì hỏng}:
- Coalescing — one
'data'chunk might be{"id":1,...}{"id":2,...}→JSON.parsethrowsSyntaxError{Gộp — một chunk có thể là{"id":1,...}{"id":2,...}→JSON.parsenémSyntaxError}. - Splitting — chunk 1 might be
{"id":1,"cmd":"piand chunk 2ng"}→ parse fails on both halves {Cắt — chunk 1 có thể là{"id":1,"cmd":"pivà chunk 2ng"}→ parse thất bại trên cả hai nửa}.
The fix is never “hope chunks align” — you frame messages on the wire and buffer on the read side until a full frame is available {Cách sửa không bao giờ là “hy vọng chunk khớp” — bạn đóng khung message trên dây và buffer phía đọc cho đến khi có frame đủ}.
Framing strategy A: delimiter-based {Chiến lược framing A: delimiter}
The simplest framing: pick a delimiter bytes cannot appear in the payload (or escape them) {Framing đơn giản nhất: chọn delimiter mà byte payload không thể chứa (hoặc escape chúng)}. Text protocols often use newline — one message per line, aka NDJSON when each line is JSON {Giao thức text thường dùng xuống dòng — một message mỗi dòng, hay NDJSON khi mỗi dòng là JSON}.
// Encode: payload + '\n'
function encodeLine(obj: unknown): Buffer {
return Buffer.from(JSON.stringify(obj) + '\n', 'utf8');
}
// Decode: accumulate bytes, split on '\n', emit complete lines
class LineDecoder {
private buf = Buffer.alloc(0);
push(chunk: Buffer): string[] {
this.buf = Buffer.concat([this.buf, chunk]);
const lines: string[] = [];
let start = 0;
for (let i = 0; i < this.buf.length; i++) {
if (this.buf[i] === 0x0a) {
// byte before newline is one complete message
lines.push(this.buf.subarray(start, i).toString('utf8'));
start = i + 1;
}
}
// keep incomplete tail for next chunk
this.buf = this.buf.subarray(start);
return lines;
}
}
Server using the line decoder {Server dùng line decoder}:
import { createServer } from 'node:net';
const decoder = new LineDecoder();
const server = createServer((socket) => {
const perSocket = new LineDecoder();
socket.on('data', (chunk) => {
for (const line of perSocket.push(chunk)) {
const msg = JSON.parse(line) as { id: number; cmd: string };
console.log('frame:', msg);
socket.write(encodeLine({ ok: true, id: msg.id }));
}
});
});
server.listen(3000);
Delimiter framing is great for logs, chat, and human-debuggable protocols {Framing delimiter tốt cho log, chat, và giao thức debug được bằng mắt}. It struggles when payloads are binary or must contain the delimiter byte {Khó khi payload là binary hoặc phải chứa byte delimiter}.
Framing strategy B: length-prefix {Chiến lược framing B: length-prefix}
Length-prefix framing prepends a fixed-width header with the payload length, then the raw bytes {Length-prefix framing thêm header cố định chứa độ dài payload, rồi byte thô}. A common choice: 4-byte big-endian unsigned integer + payload {Lựa chọn phổ biến: số nguyên 4 byte big-endian + payload}.
on the wire:
┌──────────┬─────────────────────┐
│ len = 5 │ H E L L O │
│ 4 bytes │ 5 bytes payload │
└──────────┴─────────────────────┘
Encoder
const HEADER_SIZE = 4;
function encodeFrame(payload: Buffer): Buffer {
const header = Buffer.alloc(HEADER_SIZE);
header.writeUInt32BE(payload.length, 0);
return Buffer.concat([header, payload]);
}
function encodeJsonFrame(obj: unknown): Buffer {
return encodeFrame(Buffer.from(JSON.stringify(obj), 'utf8'));
}
Stateful decoder (buffers partial reads)
class LengthPrefixDecoder {
private buf = Buffer.alloc(0);
push(chunk: Buffer): Buffer[] {
this.buf = Buffer.concat([this.buf, chunk]);
const frames: Buffer[] = [];
while (this.buf.length >= HEADER_SIZE) {
const len = this.buf.readUInt32BE(0);
if (this.buf.length < HEADER_SIZE + len) {
// header says N bytes, but we haven't received all of them yet
break;
}
const payload = this.buf.subarray(HEADER_SIZE, HEADER_SIZE + len);
frames.push(payload);
this.buf = this.buf.subarray(HEADER_SIZE + len);
}
return frames;
}
}
Server + client with length-prefix
import { connect, createServer } from 'node:net';
// --- server ---
const server = createServer((socket) => {
const decoder = new LengthPrefixDecoder();
socket.on('data', (chunk) => {
for (const payload of decoder.push(chunk)) {
const msg = JSON.parse(payload.toString('utf8')) as {
id: number;
cmd: string;
};
console.log('frame:', msg);
socket.write(encodeJsonFrame({ ok: true, id: msg.id }));
}
});
});
server.listen(3000);
// --- client ---
const client = connect(3000, () => {
client.write(encodeJsonFrame({ id: 1, cmd: 'ping' }));
client.write(encodeJsonFrame({ id: 2, cmd: 'ping' }));
});
client.on('data', (chunk) => {
const decoder = new LengthPrefixDecoder();
for (const payload of decoder.push(chunk)) {
console.log('reply:', JSON.parse(payload.toString('utf8')));
}
});
Note: the client example above creates a fresh decoder per 'data' — in real code, keep one decoder per socket for the connection lifetime, just like the server {Lưu ý: ví dụ client trên tạo decoder mới mỗi 'data' — trong code thật, giữ một decoder mỗi socket suốt đời kết nối, giống server}. The loop while (this.buf.length >= HEADER_SIZE) is the heart of the pattern: never parse until you have header + full payload {Vòng while là cốt lõi: không parse cho đến khi có header + đủ payload}.
Delimiter vs length-prefix {So sánh delimiter và length-prefix}
| Aspect {Khía cạnh} | Delimiter (newline / NDJSON) | Length-prefix (4-byte BE) |
|---|---|---|
| Human-readable {Đọc được bằng mắt} | Yes — nc and logs show one line per message {Có — nc và log thấy một dòng mỗi message} | No — binary header + payload {Không — header binary + payload} |
| Binary payloads {Payload binary} | Awkward — must escape delimiter or use base64 {Khó — phải escape delimiter hoặc base64} | Native — any byte sequence {Tự nhiên — mọi chuỗi byte} |
| Max message size {Kích thước message tối đa} | Implicit (line length) or scan for delimiter {Ngầm (độ dài dòng) hoặc quét delimiter} | Explicit in header (here: 4 GB with uint32) {Rõ trong header (ở đây: 4 GB với uint32)} |
| Parsing cost {Chi phí parse} | Scan every byte for \n {Quét mọi byte tìm \n} | Read 4 bytes, jump len bytes — O(1) per frame {Đọc 4 byte, nhảy len byte — O(1) mỗi frame} |
| Typical use {Dùng khi nào} | Redis-ish text, logs, NDJSON streams {Text kiểu Redis, log, stream NDJSON} | RPC, game protocols, Protobuf/gRPC-style {RPC, game protocol, kiểu Protobuf/gRPC} |
Pick delimiter when humans and nc matter; pick length-prefix when messages can be binary, large, or frequent {Chọn delimiter khi người và nc quan trọng; chọn length-prefix khi message có thể binary, lớn, hoặc dày}.
Backpressure: when write() says “slow down” {Backpressure: khi write() bảo “chậm lại”}
Framing solves read-side chunking. Backpressure solves write-side overload {Framing giải quyết chunking phía đọc. Backpressure giải quyết quá tải phía ghi}.
socket.write(data) copies into the kernel send buffer and returns immediately most of the time {socket.write(data) copy vào buffer gửi kernel và trả về ngay hầu hết thời gian}. When that buffer is full, write() returns false {Khi buffer đầy, write() trả về false}. If you keep calling write() anyway, Node queues data in userland memory — unbounded growth until the process OOMs {Nếu bạn vẫn gọi write(), Node xếp hàng dữ liệu trong bộ nhớ userland — tăng không giới hạn đến khi OOM}.
import type { Socket } from 'node:net';
// Correct: respect backpressure
function writeWithBackpressure(
socket: Socket,
data: Buffer,
): Promise<void> {
return new Promise((resolve, reject) => {
const ok = socket.write(data, (err) => {
if (err) reject(err);
});
if (ok) {
// kernel accepted everything — safe to send more
resolve();
} else {
// buffer full — wait until it drains
socket.once('drain', resolve);
}
});
}
// Send many frames without blowing memory
async function sendMany(
socket: Socket,
frames: Buffer[],
): Promise<void> {
for (const frame of frames) {
await writeWithBackpressure(socket, frame);
}
}
The 'drain' event fires when the kernel buffer has room again {Sự kiện 'drain' bắn khi buffer kernel có chỗ trở lại}. writableHighWaterMark (default 16 KiB on many sockets) hints when write() starts returning false {writableHighWaterMark (mặc định 16 KiB trên nhiều socket) gợi ý khi write() bắt đầu trả false}.
For file-to-socket or socket-to-socket copying, prefer built-ins that handle this for you {Khi copy file→socket hoặc socket→socket, ưu tiên built-in đã xử lý sẵn}:
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import type { Socket } from 'node:net';
async function pumpFileToSocket(
filePath: string,
socket: Socket,
): Promise<void> {
const rs = createReadStream(filePath);
await pipeline(rs, socket);
// pipeline() pauses the source on backpressure and resumes on drain
}
socket.pipe() does the same for readable→writable pairs, but stream.pipeline() also forwards errors and cleans up — prefer pipeline() in new code {socket.pipe() làm tương tự cho cặp readable→writable, nhưng stream.pipeline() còn chuyển lỗi và dọn dẹp — ưu tiên pipeline() trong code mới}.
Putting it together: framed echo with backpressure {Gộp lại: echo có framing và backpressure}
import { createServer, type Socket } from 'node:net';
const PORT = 3000;
function writeWithBackpressure(socket: Socket, data: Buffer): Promise<void> {
return new Promise((resolve, reject) => {
const ok = socket.write(data, (err) => {
if (err) reject(err);
});
if (ok) resolve();
else socket.once('drain', resolve);
});
}
const server = createServer((socket) => {
const decoder = new LengthPrefixDecoder();
socket.on('data', async (chunk) => {
for (const payload of decoder.push(chunk)) {
const text = payload.toString('utf8');
const reply = encodeFrame(Buffer.from(`echo:${text}`, 'utf8'));
await writeWithBackpressure(socket, reply);
}
});
});
server.listen(PORT, () => {
console.log(`framed echo on :${PORT}`);
});
This server survives split, coalesced, and high-volume writes because framing and backpressure are both handled explicitly {Server này chịu được write bị cắt, bị gộp, và khối lượng lớn vì framing và backpressure đều được xử lý rõ ràng}.
Mistakes beginners make {Lỗi người mới hay mắc}
- ❌ Assuming one
'data'event = one message — the #1 TCP bug; always buffer and frame {Giả định một'data'= một message — lỗi TCP số 1; luôn buffer và frame}. - ❌ Ignoring the return value of
write()— queueing forever when it returnsfalsecauses unbounded memory growth {Bỏ qua giá trị trả về củawrite()— xếp hàng mãi khifalsegây tăng bộ nhớ không giới hạn}. - ❌ Forgetting partial-read buffering in the decoder — parsing as soon as bytes arrive guarantees intermittent
JSON.parsefailures {Quên buffer đọc một phần trong decoder — parse ngay khi byte đến chắc chắnJSON.parselỗi lúc này lúc kia}. - ❌ Calling
JSON.parsedirectly on a chunk — works in demos, breaks in production {GọiJSON.parsetrực tiếp trên chunk — demo chạy, production hỏng}. - ❌ Creating a new decoder per
'data'event instead of one per connection — loses state across chunks {Tạo decoder mới mỗi'data'thay vì một decoder mỗi kết nối — mất state giữa các chunk}.
Exercises {Bài tập}
Try each before opening the solution {Thử từng bài trước khi mở lời giải}.
- Run the naive JSON server and a client that sends two
write()calls back-to-back. Log chunk lengths and show whenJSON.parsethrows {Chạy server JSON ngây thơ và client gửi haiwrite()liên tiếp. Log độ dài chunk và chứng minh khiJSON.parseném lỗi}. - Extend
LineDecoderto reject lines longer than 64 KiB (defense against abuse) and emit an error event instead of buffering forever {Mở rộngLineDecodertừ chối dòng dài hơn 64 KiB và emit lỗi thay vì buffer mãi}. - Write a client that sends 10 000 length-prefixed frames as fast as possible without backpressure handling, then repeat with
writeWithBackpressure. Compareprocess.memoryUsage().heapUsed{Viết client gửi 10 000 frame length-prefix nhanh nhất có thể không xử lý backpressure, rồi lặp lại vớiwriteWithBackpressure. So sánhprocess.memoryUsage().heapUsed}.
Solution {Lời giải}
import { connect, createServer } from 'node:net';
// --- Exercise 1: demonstrate coalescing ---
const naive = createServer((socket) => {
socket.on('data', (chunk) => {
console.log('chunk length:', chunk.length);
try {
console.log('parsed:', JSON.parse(chunk.toString('utf8')));
} catch (e) {
console.log('JSON.parse failed:', (e as Error).message);
}
});
});
naive.listen(3001);
const c1 = connect(3001, () => {
c1.write(JSON.stringify({ n: 1 }));
c1.write(JSON.stringify({ n: 2 }));
});
// Often prints one chunk with length ~26 and SyntaxError — two objects, one chunk
// --- Exercise 2: LineDecoder with max line size ---
class SafeLineDecoder extends EventTarget {
private buf = Buffer.alloc(0);
private readonly max: number;
constructor(maxBytes = 64 * 1024) {
super();
this.max = maxBytes;
}
push(chunk: Buffer): string[] {
this.buf = Buffer.concat([this.buf, chunk]);
if (this.buf.length > this.max) {
this.dispatchEvent(new ErrorEvent('error', {
error: new Error(`line exceeds ${this.max} bytes`),
}));
this.buf = Buffer.alloc(0);
return [];
}
const lines: string[] = [];
let start = 0;
for (let i = 0; i < this.buf.length; i++) {
if (this.buf[i] === 0x0a) {
lines.push(this.buf.subarray(start, i).toString('utf8'));
start = i + 1;
}
}
this.buf = this.buf.subarray(start);
return lines;
}
}
// --- Exercise 3: backpressure memory comparison ---
import type { Socket } from 'node:net';
function writeWithBackpressure(socket: Socket, data: Buffer): Promise<void> {
return new Promise((resolve) => {
if (socket.write(data)) resolve();
else socket.once('drain', resolve);
});
}
async function flood(useBackpressure: boolean): Promise<void> {
const socket = connect(3000);
await new Promise<void>((r) => socket.once('connect', r));
const frames = Array.from({ length: 10_000 }, (_, i) =>
encodeJsonFrame({ i }),
);
const before = process.memoryUsage().heapUsed;
if (useBackpressure) {
for (const f of frames) await writeWithBackpressure(socket, f);
} else {
for (const f of frames) socket.write(f);
}
const after = process.memoryUsage().heapUsed;
console.log(useBackpressure ? 'with BP' : 'no BP', {
deltaMB: ((after - before) / 1024 / 1024).toFixed(1),
});
socket.end();
}Exercise 1: coalescing is timing-dependent — run several times or add setImmediate between writes to force separate chunks sometimes {Bài 1: gộp phụ thuộc timing — chạy nhiều lần hoặc thêm setImmediate giữa các write để đôi khi tách chunk}. Exercise 3: without backpressure, heap delta is often tens of MB while the kernel catches up; with backpressure it stays near zero {Bài 3: không backpressure, delta heap thường hàng chục MB khi kernel theo kịp; có backpressure gần không}.
Takeaway {Điều cốt lõi}
TCP gives you an ordered, reliable byte stream — not messages {TCP cho bạn luồng byte có thứ tự, tin cậy — không phải message}. 'data' is a chunk notification, not a message boundary; you frame on the wire (delimiter or length-prefix) and buffer until a full frame arrives {'data' là thông báo chunk, không phải ranh giới message; bạn frame trên dây (delimiter hoặc length-prefix) và buffer đến khi frame đủ}. On the send side, respect write() returning false and wait for 'drain' (or use pipeline()) so fast producers cannot exhaust memory {Phía gửi, tôn trọng write() trả false và chờ 'drain' (hoặc dùng pipeline()) để producer nhanh không làm cạn bộ nhớ}. Every protocol you have used — HTTP, WebSocket, Redis — does this under the hood; now you can build your own {Mọi giao thức bạn từng dùng — HTTP, WebSocket, Redis — đều làm vậy bên trong; giờ bạn có thể tự xây}.