Design Patterns in TypeScript · Part 5 — Observer & Pub/Sub
Decouple "something changed" from "who reacts": the Observer pattern, a fully typed event emitter, the platform EventTarget, Pub/Sub via an event bus, and how signals/reactivity build on the same idea.
Part 5 of 10 in the Design Patterns in TypeScript series {Phần 5/10 trong series Design Patterns in TypeScript}. Previous {Trước}: Part 4 — Strategy · Next {Tiếp}: Part 6 — Decorator & Middleware.
This is Part 5 of a 10-part series on the design patterns every senior web engineer should have in their hands — explained with runnable TypeScript, real frontend/back-of-front use cases, and exercises at the end of each part {Đây là Phần 5 của series 10 bài về các design pattern mà mọi senior web nên nắm — giải thích bằng TypeScript chạy được, use case web thực tế, và bài tập ở cuối mỗi phần}.
In Part 4 you swapped algorithms without rewriting the caller {Trong Phần 4 bạn đổi thuật toán mà không sửa caller}. A related smell is polling or hard-wiring every consumer into the producer: setInterval(() => checkCart(), 500), or onOrderPlaced that directly calls sendEmail(), updateAnalytics(), and refreshUI() in one function {Mùi liên quan là polling hoặc nối cứng mọi consumer vào producer: setInterval(() => checkCart(), 500), hoặc onOrderPlaced gọi thẳng sendEmail(), updateAnalytics(), refreshUI() trong một hàm}. That couples who changed to who must react {Điều đó ghép ai đổi với ai phải phản ứng}. Observer and Pub/Sub let interested parties subscribe and get notified when something happens — without the producer knowing their names {Observer và Pub/Sub cho phép bên quan tâm đăng ký và được báo khi có sự kiện — producer không cần biết tên họ}.
The intent {Ý đồ}
The Observer pattern defines a one-to-many dependency: when the subject changes state, all observers attached to it are notified automatically {Observer định nghĩa quan hệ một-nhiều: khi subject đổi trạng thái, mọi observer gắn vào nó được báo tự động}. The subject keeps a list of observers (or callbacks) and pushes updates {Subject giữ danh sách observer (hoặc callback) và đẩy cập nhật}. You reach for it when many independent reactions must follow the same event — analytics, UI refresh, cache invalidation, audit logs — and you refuse to edit the emitter every time product adds another listener {Bạn dùng khi nhiều phản ứng độc lập phải theo cùng một event — analytics, refresh UI, invalidate cache, audit log — và bạn không muốn sửa emitter mỗi lần product thêm listener}.
Pub/Sub (publish–subscribe) pushes decoupling one step further {Pub/Sub đẩy tách coupling thêm một bước}: publishers and subscribers never reference each other; a broker or event bus routes messages on topics or channels {publisher và subscriber không tham chiếu nhau; broker hoặc event bus định tuyến message theo topic hoặc channel}. Observer is often “in-process, subject knows its listeners”; Pub/Sub is “anyone who knows the bus can publish or listen” {Observer thường là “trong process, subject biết listener”; Pub/Sub là “ai biết bus đều publish hoặc nghe được”}. In TypeScript apps both show up as typed event maps, DOM EventTarget, store subscriptions, and WebSocket dispatch {Trong app TypeScript cả hai hiện dưới dạng event map có kiểu, EventTarget DOM, subscription store, và dispatch WebSocket}.
Polling and hard-wiring vs subscribe {Polling và nối cứng vs đăng ký}
Polling asks “did it change yet?” on a timer — wasteful and laggy {Polling hỏi “đã đổi chưa?” theo timer — tốn tài nguyên và trễ}:
// ❌ couples timing to business logic; burns CPU when idle
setInterval(() => {
if (cart.version !== lastSeen) refreshBadge(cart);
}, 500);
Hard-wiring lists every reaction inside the producer {Nối cứng liệt kê mọi phản ứng trong producer}:
// ❌ every new side effect edits this function
function placeOrder(order: Order) {
persist(order);
sendConfirmationEmail(order);
trackPurchase(order);
invalidateProductCache(order.productId);
bus.publish('order:placed', order); // still better than inline imports everywhere
}
The fix is notify, don’t call: emit once; subscribers register elsewhere {Cách sửa là báo, đừng gọi: emit một lần; subscriber đăng ký ở chỗ khác}.
A typed event emitter {Event emitter có kiểu}
The senior centerpiece is a small Emitter<Events> where Events is a record of event name → payload type {Trọng tâm senior là Emitter<Events> nhỏ với Events là record tên event → kiểu payload}. on, off, and emit stay fully type-safe: wrong event name or payload is a compile error {on, off, emit giữ type-safe: sai tên event hoặc payload là lỗi biên dịch}.
/** Map of event names to payload types — the contract for this emitter. */
type EventMap = Record<string, unknown>;
type Handler<Events extends EventMap, K extends keyof Events> = (
payload: Events[K],
) => void;
export class Emitter<Events extends EventMap> {
private readonly listeners: {
[K in keyof Events]?: Set<Handler<Events, K>>;
} = {};
on<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
let set = this.listeners[event];
if (!set) {
set = new Set();
this.listeners[event] = set;
}
set.add(handler);
return () => this.off(event, handler);
}
off<K extends keyof Events>(event: K, handler: Handler<Events, K>): void {
this.listeners[event]?.delete(handler);
}
emit<K extends keyof Events>(event: K, payload: Events[K]): void {
const set = this.listeners[event];
if (!set) return;
// Copy so a handler that unsubscribes mid-loop does not skip others.
for (const handler of [...set]) {
try {
handler(payload);
} catch (error) {
// Isolate failures — one bad observer must not break the rest.
console.error(`[Emitter] handler failed for "${String(event)}"`, error);
}
}
}
}
Define the contract once, then use it {Định nghĩa contract một lần, rồi dùng}:
type AppEvents = {
'cart:updated': { itemCount: number };
'user:signed-in': { userId: string };
};
const appBus = new Emitter<AppEvents>();
appBus.on('cart:updated', ({ itemCount }) => {
document.querySelector('#badge')!.textContent = String(itemCount);
});
appBus.emit('cart:updated', { itemCount: 3 });
// appBus.emit('cart:updated', { count: 3 }); // ❌ compile error
// appBus.emit('user:signed-in', { itemCount: 1 }); // ❌ wrong payload shape
once and an explicit unsubscribe handle are standard ergonomics {once và handle hủy đăng ký rõ ràng là ergonomics chuẩn}:
export class Emitter<Events extends EventMap> {
// ... on, off, emit as above
once<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
const wrapped: Handler<Events, K> = (payload) => {
unsubscribe();
handler(payload);
};
const unsubscribe = this.on(event, wrapped);
return unsubscribe;
}
}
Return () => void from on so callers can tear down without holding the original function reference {Trả () => void từ on để caller dọn mà không cần giữ reference function gốc}. In React/Vue lifecycles, call that unsubscribe in useEffect cleanup or onUnmounted {Trong lifecycle React/Vue, gọi unsubscribe trong cleanup useEffect hoặc onUnmounted}.
The platform already has one {Nền tảng đã có sẵn}
Browsers and Node (modern versions) ship EventTarget + CustomEvent {Browser và Node (bản mới) có sẵn EventTarget + CustomEvent}. Use it when you want interop with DOM APIs, addEventListener semantics, or bubbling on elements {Dùng khi cần tương thích API DOM, semantics addEventListener, hoặc bubbling trên element}. Use your own Emitter<Events> when you want a typed record of many named events without detail casting gymnastics {Dùng Emitter<Events> riêng khi muốn record có kiểu nhiều event tên mà không xoay detail kiểu}.
const target = new EventTarget();
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null;
}
function onTyped<T extends Record<string, unknown>>(
target: EventTarget,
type: string,
handler: (detail: T) => void,
isDetail: (value: unknown) => value is T,
signal?: AbortSignal,
): void {
const listener = (event: Event) => {
if (!(event instanceof CustomEvent)) return;
if (!isDetail(event.detail)) return;
handler(event.detail);
};
target.addEventListener(type, listener, { signal });
}
function isItemDetail(value: unknown): value is { id: string } {
return isRecord(value) && typeof value.id === 'string';
}
onTyped(
target,
'item:selected',
(detail) => console.log(detail.id),
isItemDetail,
AbortSignal.timeout(60_000), // auto-unsubscribe after 60s
);
target.dispatchEvent(
new CustomEvent('item:selected', { detail: { id: 'sku-42' } }),
);
AbortController / signal is the platform pattern for auto-unsubscribe: pass signal into addEventListener; call abort() (or AbortSignal.timeout) and every listener attached with that signal is removed {AbortController / signal là pattern nền tảng tự hủy đăng ký: truyền signal vào addEventListener; gọi abort() (hoặc AbortSignal.timeout) và mọi listener gắn signal đó bị gỡ}. Prefer this for one-shot UI listeners tied to a component lifetime {Ưu tiên cho listener UI một lần gắn với vòng đời component}.
Pub/Sub via an event bus {Pub/Sub qua event bus}
When modules must not import each other, expose a typed bus module: publishers only publish, subscribers only subscribe {Khi module không được import lẫn nhau, expose module bus có kiểu: publisher chỉ publish, subscriber chỉ subscribe}.
// events.ts — shared contract
export type DomainEvents = {
'order:placed': { orderId: string; totalCents: number };
'inventory:low': { sku: string; remaining: number };
};
// bus.ts — single broker instance
import { Emitter } from './emitter';
import type { DomainEvents } from './events';
const emitter = new Emitter<DomainEvents>();
export function publish<K extends keyof DomainEvents>(
topic: K,
payload: DomainEvents[K],
): void {
emitter.emit(topic, payload);
}
export function subscribe<K extends keyof DomainEvents>(
topic: K,
handler: (payload: DomainEvents[K]) => void,
): () => void {
return emitter.on(topic, handler);
}
// analytics.ts — never imports checkout.ts
import { subscribe } from './bus';
export function registerAnalytics(): () => void {
return subscribe('order:placed', ({ orderId, totalCents }) => {
track('purchase', { orderId, totalCents });
});
}
// checkout.ts — never imports analytics.ts
import { publish } from './bus';
export function placeOrder(orderId: string, totalCents: number): void {
saveOrder(orderId, totalCents);
publish('order:placed', { orderId, totalCents });
}
Topics are the keys of DomainEvents; adding a topic is a type-level change every subscriber sees {Topic là key của DomainEvents; thêm topic là thay đổi cấp kiểu mọi subscriber thấy}. For cross-tab or cross-service messaging you might swap the in-memory bus for Redis, MQTT, or BroadcastChannel — the pattern stays the same {Cho messaging cross-tab hoặc cross-service có thể đổi bus in-memory bằng Redis, MQTT, hoặc BroadcastChannel — pattern giữ nguyên}.
Relation to signals & reactivity {Liên hệ với signal & reactivity}
Framework reactivity (Vue ref, Solid/Preact signals, Angular signals, RxJS observables) is Observer with automatic dependency tracking {Reactivity framework (Vue ref, signal Solid/Preact, signal Angular, RxJS observable) là Observer có theo dõi dependency tự động}. You read a signal inside an effect; the runtime records that edge and re-runs the effect when the signal changes {Bạn đọc signal trong effect; runtime ghi cạnh đó và chạy lại effect khi signal đổi}. You do not manually on('change', …) for every dependency — the graph is built from reads {Bạn không on('change', …) thủ công cho mọi dependency — đồ thị được dựng từ lần đọc}. Conceptually: subject = reactive source, observers = effects / computed / subscribers {Về ý niệm: subject = nguồn reactive, observer = effect / computed / subscriber}. RxJS adds operators (map, filter, debounce) on the same notify stream — still Observer, with a richer protocol {RxJS thêm operator (map, filter, debounce) trên cùng luồng báo — vẫn Observer, protocol phong phú hơn}. You rarely implement signals yourself; you do implement typed emitters and buses in app code {Hiếm khi tự implement signal; bạn có implement emitter và bus có kiểu trong app}.
Real web use cases {Use case web thực tế}
- DOM events — clicks, input,
resize,visibilitychange; capture/bubble is the browser’s Observer graph {DOM event — click, input,resize,visibilitychange; capture/bubble là đồ thị Observer của browser}. - State stores — Zustand
subscribe, Redux listeners, or a home-grown store callingemit('state', slice)afterset{Store state —subscribeZustand, listener Redux, hoặc store tự gọiemit('state', slice)sauset}. - WebSocket / SSE — one connection dispatches message types to registered handlers {WebSocket / SSE — một connection dispatch kiểu message tới handler đã đăng ký}.
- Cross-component communication — micro-frontends or packages that must not import each other’s UI {Giao tiếp cross-component — micro-frontend hoặc package không được import UI lẫn nhau}.
- Domain events —
order:placed,user:deletedfor audit, email, projections (CQRS-flavored frontends) {Domain event —order:placed,user:deletedcho audit, email, projection (frontend kiểu CQRS)}.
Pitfalls {Cạm bẫy}
- Memory leaks (#1) — subscribing without unsubscribing when a component unmounts or a request ends keeps closures alive {Rò bộ nhớ (#1) — subscribe mà không unsubscribe khi component unmount hoặc request kết thúc giữ closure sống}. Always return and call an unsubscribe function, or use
AbortSignal{Luôn trả và gọi hàm unsubscribe, hoặc dùngAbortSignal}. - Emit storms / cascades — handler A emits B, B emits A; debounce or queue if needed {Bão emit / cascade — handler A emit B, B emit A; debounce hoặc queue nếu cần}.
- Ordering assumptions —
Setiteration order is insertion order in modern JS, but do not rely on cross-handler ordering for correctness {Giả định thứ tự — thứ tự duyệtSetlà insertion order trong JS hiện đại, nhưng đừng dựa vào thứ tự giữa handler cho đúng nghiệp vụ}. - One observer throws — without
try/catchper handler, one bug kills the whole notification round {Một observer ném lỗi — khôngtry/catchtừng handler thì một bug giết cả vòng báo}. Isolate (as inemitabove) {Cô lập (nhưemitở trên)}. thisbinding — passingobj.handleUpdatelosesthis; use arrow functions orhandler.bind(obj){Mấtthis— truyềnobj.handleUpdatemấtthis; dùng arrow function hoặchandler.bind(obj)}.
// ❌ leak: subscription outlives the component
function mountWidget() {
appBus.on('cart:updated', updateWidget);
// never unsubscribes on teardown
}
// ✅ cleanup
function mountWidget(): () => void {
return appBus.on('cart:updated', updateWidget);
}
// on teardown: const off = mountWidget(); ... later: off();
Cheat sheet {Bảng tra nhanh}
// Typed in-process emitter
type Events = { 'x': { n: number } };
const bus = new Emitter<Events>();
const off = bus.on('x', (p) => console.log(p.n));
bus.emit('x', { n: 1 });
off();
// Platform EventTarget + auto-unsubscribe
const ac = new AbortController();
target.addEventListener('x', handler, { signal: ac.signal });
ac.abort();
// Pub/Sub: typed bus module, publishers ≠ subscribers
publish('order:placed', { orderId: '1', totalCents: 100 });
subscribe('order:placed', handler);
Decision: same module, few listeners → Emitter on a class; cross-module → typed bus; DOM integration → EventTarget; async streams with operators → RxJS {Quyết định: cùng module, ít listener → Emitter trên class; cross-module → bus có kiểu; tích hợp DOM → EventTarget; luồng async có operator → RxJS}.
Bài tập / Exercises
1. Implement Emitter<Events> with on, off, emit so emit('cart:updated', wrongShape) fails at compile time {Cài Emitter<Events> với on, off, emit sao cho emit('cart:updated', wrongShape) lỗi biên dịch}.
Solution {Lời giải}
type EventMap = Record<string, unknown>;
type Handler<Events extends EventMap, K extends keyof Events> = (
payload: Events[K],
) => void;
class Emitter<Events extends EventMap> {
private readonly listeners: {
[K in keyof Events]?: Set<Handler<Events, K>>;
} = {};
on<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
let set = this.listeners[event];
if (!set) {
set = new Set();
this.listeners[event] = set;
}
set.add(handler);
return () => this.off(event, handler);
}
off<K extends keyof Events>(event: K, handler: Handler<Events, K>): void {
this.listeners[event]?.delete(handler);
}
emit<K extends keyof Events>(event: K, payload: Events[K]): void {
const set = this.listeners[event];
if (!set) return;
for (const h of [...set]) h(payload);
}
}
type ShopEvents = { 'cart:updated': { itemCount: number } };
const shop = new Emitter<ShopEvents>();
shop.on('cart:updated', ({ itemCount }) => console.log(itemCount));
shop.emit('cart:updated', { itemCount: 2 });2. Add once(event, handler) and verify the handler runs exactly one time even if you emit twice {Thêm once(event, handler) và xác minh handler chạy đúng một lần dù emit hai lần}.
Solution {Lời giải}
// Extend Emitter with:
once<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
const wrapped: Handler<Events, K> = (payload) => {
off();
handler(payload);
};
const off = this.on(event, wrapped);
return off;
}
let calls = 0;
const off = shop.once('cart:updated', () => { calls += 1; });
shop.emit('cart:updated', { itemCount: 1 });
shop.emit('cart:updated', { itemCount: 2 });
console.log(calls); // 1
off(); // idempotent cleanup3. Use EventTarget + CustomEvent + AbortController: register a listener, dispatch once, call abort(), dispatch again — prove the second dispatch does not run the handler {Dùng EventTarget + CustomEvent + AbortController: đăng ký listener, dispatch một lần, gọi abort(), dispatch lần hai — chứng minh lần hai không chạy handler}.
Solution {Lời giải}
const target = new EventTarget();
const ac = new AbortController();
let hits = 0;
target.addEventListener(
'ping',
() => { hits += 1; },
{ signal: ac.signal },
);
target.dispatchEvent(new CustomEvent('ping'));
ac.abort();
target.dispatchEvent(new CustomEvent('ping'));
console.log(hits); // 14. Demonstrate a memory leak: a function registers on a global bus every time it is called without unsubscribing. Fix it by returning and invoking an unsubscribe {Minh họa rò bộ nhớ: hàm đăng ký lên bus global mỗi lần gọi mà không unsubscribe. Sửa bằng cách trả và gọi unsubscribe}.
Solution {Lời giải}
const globalBus = new Emitter<{ tick: { n: number } }>();
// ❌ leak
function brokenMount() {
globalBus.on('tick', () => { /* ... */ });
}
brokenMount();
brokenMount();
// two handlers — double work, retained closures
// ✅ idempotent mount
let teardown: (() => void) | undefined;
function safeMount(): void {
teardown?.();
teardown = globalBus.on('tick', () => { /* ... */ });
}
function safeUnmount(): void {
teardown?.();
teardown = undefined;
}
safeMount();
safeMount();
// still one handler after second mount
safeUnmount();5. Sketch a typed subscribe(topic, handler) / publish(topic, payload) pair for DomainEvents with at least two topics and two subscriber modules that do not import each other {Phác subscribe(topic, handler) / publish(topic, payload) có kiểu cho DomainEvents với ít nhất hai topic và hai module subscriber không import lẫn nhau}.
Solution {Lời giải}
type DomainEvents = {
'order:placed': { orderId: string };
'inventory:low': { sku: string };
};
const core = new Emitter<DomainEvents>();
export function publish<K extends keyof DomainEvents>(
topic: K,
payload: DomainEvents[K],
): void {
core.emit(topic, payload);
}
export function subscribe<K extends keyof DomainEvents>(
topic: K,
handler: (payload: DomainEvents[K]) => void,
): () => void {
return core.on(topic, handler);
}
// module A
subscribe('order:placed', ({ orderId }) => console.log('audit', orderId));
// module B
subscribe('inventory:low', ({ sku }) => console.log('restock', sku));
publish('order:placed', { orderId: 'o-1' });Stretch {Nâng cao}: wrap Emitter handlers so each runs inside queueMicrotask and log event name + duration; emit ten events in a row and confirm no handler blocks another (conceptual backpressure) {Bọc handler Emitter để mỗi cái chạy trong queueMicrotask và log tên event + thời gian; emit mười event liên tiếp và xác nhận handler này không chặn handler kia (backpressure khái niệm)}.
Solution {Lời giải}
class Emitter<Events extends EventMap> {
protected readonly listeners: {
[K in keyof Events]?: Set<Handler<Events, K>>;
} = {};
// on, off, emit ...
}
class AsyncEmitter<Events extends EventMap> extends Emitter<Events> {
override emit<K extends keyof Events>(event: K, payload: Events[K]): void {
const set = this.listeners[event];
if (!set) return;
for (const handler of [...set]) {
queueMicrotask(() => {
const t0 = performance.now();
try {
handler(payload);
} finally {
console.log(String(event), performance.now() - t0, 'ms');
}
});
}
}
}emit returns immediately; handlers run on the microtask queue — no long sync work blocks peers {emit trả về ngay; handler chạy trên microtask queue — không có việc sync dài chặn peer}.
Key takeaways {Điểm chính}
- Observer = notify many listeners when state changes; stop polling and stop hard-wiring every reaction into the producer {Observer = báo nhiều listener khi state đổi; bỏ polling và bỏ nối cứng mọi phản ứng vào producer}.
Emitter<Events>with a record of payloads gives compile-time safety foron/emit{Emitter<Events>với record payload cho an toàn compile-time choon/emit}.EventTarget+AbortSignalis the platform unsubscribe story; use it for DOM-aligned lifecycles {EventTarget+AbortSignallà câu chuyện hủy đăng ký nền tảng; dùng cho vòng đời gắn DOM}.- Pub/Sub adds a bus so publishers and subscribers stay ignorant of each other {Pub/Sub thêm bus để publisher và subscriber không biết nhau}.
- Signals/RxJS are Observer with tracking or operators; your job in app code is still subscribe/unsubscribe discipline {Signal/RxJS là Observer có tracking hoặc operator; việc của bạn trong app vẫn là kỷ luật subscribe/unsubscribe}.
- #1 bug: leaks — always pair subscribe with teardown {Bug #1: leak — luôn ghép subscribe với teardown}.
Next up {Tiếp theo}
Part 6 — Decorator & Middleware: wrap behavior around a core function or object without subclass explosion — logging, auth, caching, and Express-style pipelines {Phần 6 — Decorator & Middleware: bọc hành vi quanh hàm hoặc object lõi không cần bùng nổ subclass — logging, auth, cache, pipeline kiểu Express}. Continue to Part 6 — Decorator & Middleware. ← Part 4 — Strategy