jvinhit//lab

Search posts

Type to search across journal entries.

navigate open esc close

Design Patterns in TypeScript · Part 5 — Observer & Pub/Sub

Decouple "something changed" from "who reacts": the Observer pattern, a fully typed event emitter, the platform EventTarget, Pub/Sub via an event bus, and how signals/reactivity build on the same idea.

Part 5 of 10 in the Design Patterns in TypeScript series {Phần 5/10 trong series Design Patterns in TypeScript}. Previous {Trước}: Part 4 — Strategy · Next {Tiếp}: Part 6 — Decorator & Middleware.

This is Part 5 of a 10-part series on the design patterns every senior web engineer should have in their hands — explained with runnable TypeScript, real frontend/back-of-front use cases, and exercises at the end of each part {Đây là Phần 5 của series 10 bài về các design pattern mà mọi senior web nên nắm — giải thích bằng TypeScript chạy được, use case web thực tế, và bài tập ở cuối mỗi phần}.

In Part 4 you swapped algorithms without rewriting the caller {Trong Phần 4 bạn đổi thuật toán mà không sửa caller}. A related smell is polling or hard-wiring every consumer into the producer: setInterval(() => checkCart(), 500), or onOrderPlaced that directly calls sendEmail(), updateAnalytics(), and refreshUI() in one function {Mùi liên quan là polling hoặc nối cứng mọi consumer vào producer: setInterval(() => checkCart(), 500), hoặc onOrderPlaced gọi thẳng sendEmail(), updateAnalytics(), refreshUI() trong một hàm}. That couples who changed to who must react {Điều đó ghép ai đổi với ai phải phản ứng}. Observer and Pub/Sub let interested parties subscribe and get notified when something happens — without the producer knowing their names {ObserverPub/Sub cho phép bên quan tâm đăng ký và được báo khi có sự kiện — producer không cần biết tên họ}.


The intent {Ý đồ}

The Observer pattern defines a one-to-many dependency: when the subject changes state, all observers attached to it are notified automatically {Observer định nghĩa quan hệ một-nhiều: khi subject đổi trạng thái, mọi observer gắn vào nó được báo tự động}. The subject keeps a list of observers (or callbacks) and pushes updates {Subject giữ danh sách observer (hoặc callback) và đẩy cập nhật}. You reach for it when many independent reactions must follow the same event — analytics, UI refresh, cache invalidation, audit logs — and you refuse to edit the emitter every time product adds another listener {Bạn dùng khi nhiều phản ứng độc lập phải theo cùng một event — analytics, refresh UI, invalidate cache, audit log — và bạn không muốn sửa emitter mỗi lần product thêm listener}.

Subject state + notify() Observer 1 Observer 2 Observer 3 notify on change observers subscribe / unsubscribe — Subject doesn't know who
Subject notifies every Observer; subscribers do not need to know each other

Pub/Sub (publish–subscribe) pushes decoupling one step further {Pub/Sub đẩy tách coupling thêm một bước}: publishers and subscribers never reference each other; a broker or event bus routes messages on topics or channels {publisher và subscriber không tham chiếu nhau; broker hoặc event bus định tuyến message theo topic hoặc channel}. Observer is often “in-process, subject knows its listeners”; Pub/Sub is “anyone who knows the bus can publish or listen” {Observer thường là “trong process, subject biết listener”; Pub/Sub là “ai biết bus đều publish hoặc nghe được”}. In TypeScript apps both show up as typed event maps, DOM EventTarget, store subscriptions, and WebSocket dispatch {Trong app TypeScript cả hai hiện dưới dạng event map có kiểu, EventTarget DOM, subscription store, và dispatch WebSocket}.


Polling and hard-wiring vs subscribe {Polling và nối cứng vs đăng ký}

Polling asks “did it change yet?” on a timer — wasteful and laggy {Polling hỏi “đã đổi chưa?” theo timer — tốn tài nguyên và trễ}:

// ❌ couples timing to business logic; burns CPU when idle
setInterval(() => {
  if (cart.version !== lastSeen) refreshBadge(cart);
}, 500);

Hard-wiring lists every reaction inside the producer {Nối cứng liệt kê mọi phản ứng trong producer}:

// ❌ every new side effect edits this function
function placeOrder(order: Order) {
  persist(order);
  sendConfirmationEmail(order);
  trackPurchase(order);
  invalidateProductCache(order.productId);
  bus.publish('order:placed', order); // still better than inline imports everywhere
}

The fix is notify, don’t call: emit once; subscribers register elsewhere {Cách sửa là báo, đừng gọi: emit một lần; subscriber đăng ký ở chỗ khác}.


A typed event emitter {Event emitter có kiểu}

The senior centerpiece is a small Emitter<Events> where Events is a record of event name → payload type {Trọng tâm senior là Emitter<Events> nhỏ với Eventsrecord tên event → kiểu payload}. on, off, and emit stay fully type-safe: wrong event name or payload is a compile error {on, off, emit giữ type-safe: sai tên event hoặc payload là lỗi biên dịch}.

/** Map of event names to payload types — the contract for this emitter. */
type EventMap = Record<string, unknown>;

type Handler<Events extends EventMap, K extends keyof Events> = (
  payload: Events[K],
) => void;

export class Emitter<Events extends EventMap> {
  private readonly listeners: {
    [K in keyof Events]?: Set<Handler<Events, K>>;
  } = {};

  on<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
    let set = this.listeners[event];
    if (!set) {
      set = new Set();
      this.listeners[event] = set;
    }
    set.add(handler);
    return () => this.off(event, handler);
  }

  off<K extends keyof Events>(event: K, handler: Handler<Events, K>): void {
    this.listeners[event]?.delete(handler);
  }

  emit<K extends keyof Events>(event: K, payload: Events[K]): void {
    const set = this.listeners[event];
    if (!set) return;
    // Copy so a handler that unsubscribes mid-loop does not skip others.
    for (const handler of [...set]) {
      try {
        handler(payload);
      } catch (error) {
        // Isolate failures — one bad observer must not break the rest.
        console.error(`[Emitter] handler failed for "${String(event)}"`, error);
      }
    }
  }
}

Define the contract once, then use it {Định nghĩa contract một lần, rồi dùng}:

type AppEvents = {
  'cart:updated': { itemCount: number };
  'user:signed-in': { userId: string };
};

const appBus = new Emitter<AppEvents>();

appBus.on('cart:updated', ({ itemCount }) => {
  document.querySelector('#badge')!.textContent = String(itemCount);
});

appBus.emit('cart:updated', { itemCount: 3 });
// appBus.emit('cart:updated', { count: 3 });     // ❌ compile error
// appBus.emit('user:signed-in', { itemCount: 1 }); // ❌ wrong payload shape

once and an explicit unsubscribe handle are standard ergonomics {oncehandle hủy đăng ký rõ ràng là ergonomics chuẩn}:

export class Emitter<Events extends EventMap> {
  // ... on, off, emit as above

  once<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
    const wrapped: Handler<Events, K> = (payload) => {
      unsubscribe();
      handler(payload);
    };
    const unsubscribe = this.on(event, wrapped);
    return unsubscribe;
  }
}

Return () => void from on so callers can tear down without holding the original function reference {Trả () => void từ on để caller dọn mà không cần giữ reference function gốc}. In React/Vue lifecycles, call that unsubscribe in useEffect cleanup or onUnmounted {Trong lifecycle React/Vue, gọi unsubscribe trong cleanup useEffect hoặc onUnmounted}.


The platform already has one {Nền tảng đã có sẵn}

Browsers and Node (modern versions) ship EventTarget + CustomEvent {Browser và Node (bản mới) có sẵn EventTarget + CustomEvent}. Use it when you want interop with DOM APIs, addEventListener semantics, or bubbling on elements {Dùng khi cần tương thích API DOM, semantics addEventListener, hoặc bubbling trên element}. Use your own Emitter<Events> when you want a typed record of many named events without detail casting gymnastics {Dùng Emitter<Events> riêng khi muốn record có kiểu nhiều event tên mà không xoay detail kiểu}.

const target = new EventTarget();

function isRecord(value: unknown): value is Record<string, unknown> {
  return typeof value === 'object' && value !== null;
}

function onTyped<T extends Record<string, unknown>>(
  target: EventTarget,
  type: string,
  handler: (detail: T) => void,
  isDetail: (value: unknown) => value is T,
  signal?: AbortSignal,
): void {
  const listener = (event: Event) => {
    if (!(event instanceof CustomEvent)) return;
    if (!isDetail(event.detail)) return;
    handler(event.detail);
  };
  target.addEventListener(type, listener, { signal });
}

function isItemDetail(value: unknown): value is { id: string } {
  return isRecord(value) && typeof value.id === 'string';
}

onTyped(
  target,
  'item:selected',
  (detail) => console.log(detail.id),
  isItemDetail,
  AbortSignal.timeout(60_000), // auto-unsubscribe after 60s
);

target.dispatchEvent(
  new CustomEvent('item:selected', { detail: { id: 'sku-42' } }),
);

AbortController / signal is the platform pattern for auto-unsubscribe: pass signal into addEventListener; call abort() (or AbortSignal.timeout) and every listener attached with that signal is removed {AbortController / signal là pattern nền tảng tự hủy đăng ký: truyền signal vào addEventListener; gọi abort() (hoặc AbortSignal.timeout) và mọi listener gắn signal đó bị gỡ}. Prefer this for one-shot UI listeners tied to a component lifetime {Ưu tiên cho listener UI một lần gắn với vòng đời component}.


Pub/Sub via an event bus {Pub/Sub qua event bus}

When modules must not import each other, expose a typed bus module: publishers only publish, subscribers only subscribe {Khi module không được import lẫn nhau, expose module bus có kiểu: publisher chỉ publish, subscriber chỉ subscribe}.

// events.ts — shared contract
export type DomainEvents = {
  'order:placed': { orderId: string; totalCents: number };
  'inventory:low': { sku: string; remaining: number };
};

// bus.ts — single broker instance
import { Emitter } from './emitter';
import type { DomainEvents } from './events';

const emitter = new Emitter<DomainEvents>();

export function publish<K extends keyof DomainEvents>(
  topic: K,
  payload: DomainEvents[K],
): void {
  emitter.emit(topic, payload);
}

export function subscribe<K extends keyof DomainEvents>(
  topic: K,
  handler: (payload: DomainEvents[K]) => void,
): () => void {
  return emitter.on(topic, handler);
}
// analytics.ts — never imports checkout.ts
import { subscribe } from './bus';

export function registerAnalytics(): () => void {
  return subscribe('order:placed', ({ orderId, totalCents }) => {
    track('purchase', { orderId, totalCents });
  });
}
// checkout.ts — never imports analytics.ts
import { publish } from './bus';

export function placeOrder(orderId: string, totalCents: number): void {
  saveOrder(orderId, totalCents);
  publish('order:placed', { orderId, totalCents });
}

Topics are the keys of DomainEvents; adding a topic is a type-level change every subscriber sees {Topic là key của DomainEvents; thêm topic là thay đổi cấp kiểu mọi subscriber thấy}. For cross-tab or cross-service messaging you might swap the in-memory bus for Redis, MQTT, or BroadcastChannel — the pattern stays the same {Cho messaging cross-tab hoặc cross-service có thể đổi bus in-memory bằng Redis, MQTT, hoặc BroadcastChannel — pattern giữ nguyên}.


Relation to signals & reactivity {Liên hệ với signal & reactivity}

Framework reactivity (Vue ref, Solid/Preact signals, Angular signals, RxJS observables) is Observer with automatic dependency tracking {Reactivity framework (Vue ref, signal Solid/Preact, signal Angular, RxJS observable) là Observer có theo dõi dependency tự động}. You read a signal inside an effect; the runtime records that edge and re-runs the effect when the signal changes {Bạn đọc signal trong effect; runtime ghi cạnh đó và chạy lại effect khi signal đổi}. You do not manually on('change', …) for every dependency — the graph is built from reads {Bạn không on('change', …) thủ công cho mọi dependency — đồ thị được dựng từ lần đọc}. Conceptually: subject = reactive source, observers = effects / computed / subscribers {Về ý niệm: subject = nguồn reactive, observer = effect / computed / subscriber}. RxJS adds operators (map, filter, debounce) on the same notify stream — still Observer, with a richer protocol {RxJS thêm operator (map, filter, debounce) trên cùng luồng báo — vẫn Observer, protocol phong phú hơn}. You rarely implement signals yourself; you do implement typed emitters and buses in app code {Hiếm khi tự implement signal; bạn implement emitter và bus có kiểu trong app}.


Real web use cases {Use case web thực tế}

  • DOM events — clicks, input, resize, visibilitychange; capture/bubble is the browser’s Observer graph {DOM event — click, input, resize, visibilitychange; capture/bubble là đồ thị Observer của browser}.
  • State stores — Zustand subscribe, Redux listeners, or a home-grown store calling emit('state', slice) after set {Store statesubscribe Zustand, listener Redux, hoặc store tự gọi emit('state', slice) sau set}.
  • WebSocket / SSE — one connection dispatches message types to registered handlers {WebSocket / SSE — một connection dispatch kiểu message tới handler đã đăng ký}.
  • Cross-component communication — micro-frontends or packages that must not import each other’s UI {Giao tiếp cross-component — micro-frontend hoặc package không được import UI lẫn nhau}.
  • Domain eventsorder:placed, user:deleted for audit, email, projections (CQRS-flavored frontends) {Domain eventorder:placed, user:deleted cho audit, email, projection (frontend kiểu CQRS)}.

Pitfalls {Cạm bẫy}

  1. Memory leaks (#1) — subscribing without unsubscribing when a component unmounts or a request ends keeps closures alive {Rò bộ nhớ (#1) — subscribe mà không unsubscribe khi component unmount hoặc request kết thúc giữ closure sống}. Always return and call an unsubscribe function, or use AbortSignal {Luôn trả và gọi hàm unsubscribe, hoặc dùng AbortSignal}.
  2. Emit storms / cascades — handler A emits B, B emits A; debounce or queue if needed {Bão emit / cascade — handler A emit B, B emit A; debounce hoặc queue nếu cần}.
  3. Ordering assumptionsSet iteration order is insertion order in modern JS, but do not rely on cross-handler ordering for correctness {Giả định thứ tự — thứ tự duyệt Set là insertion order trong JS hiện đại, nhưng đừng dựa vào thứ tự giữa handler cho đúng nghiệp vụ}.
  4. One observer throws — without try/catch per handler, one bug kills the whole notification round {Một observer ném lỗi — không try/catch từng handler thì một bug giết cả vòng báo}. Isolate (as in emit above) {Cô lập (như emit ở trên)}.
  5. this binding — passing obj.handleUpdate loses this; use arrow functions or handler.bind(obj) {Mất this — truyền obj.handleUpdate mất this; dùng arrow function hoặc handler.bind(obj)}.
// ❌ leak: subscription outlives the component
function mountWidget() {
  appBus.on('cart:updated', updateWidget);
  // never unsubscribes on teardown
}

// ✅ cleanup
function mountWidget(): () => void {
  return appBus.on('cart:updated', updateWidget);
}
// on teardown: const off = mountWidget(); ... later: off();

Cheat sheet {Bảng tra nhanh}

// Typed in-process emitter
type Events = { 'x': { n: number } };
const bus = new Emitter<Events>();
const off = bus.on('x', (p) => console.log(p.n));
bus.emit('x', { n: 1 });
off();

// Platform EventTarget + auto-unsubscribe
const ac = new AbortController();
target.addEventListener('x', handler, { signal: ac.signal });
ac.abort();

// Pub/Sub: typed bus module, publishers ≠ subscribers
publish('order:placed', { orderId: '1', totalCents: 100 });
subscribe('order:placed', handler);

Decision: same module, few listeners → Emitter on a class; cross-module → typed bus; DOM integration → EventTarget; async streams with operators → RxJS {Quyết định: cùng module, ít listener → Emitter trên class; cross-module → bus có kiểu; tích hợp DOM → EventTarget; luồng async có operator → RxJS}.


Bài tập / Exercises

1. Implement Emitter<Events> with on, off, emit so emit('cart:updated', wrongShape) fails at compile time {Cài Emitter<Events> với on, off, emit sao cho emit('cart:updated', wrongShape) lỗi biên dịch}.

Solution {Lời giải}
type EventMap = Record<string, unknown>;
type Handler<Events extends EventMap, K extends keyof Events> = (
  payload: Events[K],
) => void;

class Emitter<Events extends EventMap> {
  private readonly listeners: {
    [K in keyof Events]?: Set<Handler<Events, K>>;
  } = {};

  on<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
    let set = this.listeners[event];
    if (!set) {
      set = new Set();
      this.listeners[event] = set;
    }
    set.add(handler);
    return () => this.off(event, handler);
  }

  off<K extends keyof Events>(event: K, handler: Handler<Events, K>): void {
    this.listeners[event]?.delete(handler);
  }

  emit<K extends keyof Events>(event: K, payload: Events[K]): void {
    const set = this.listeners[event];
    if (!set) return;
    for (const h of [...set]) h(payload);
  }
}

type ShopEvents = { 'cart:updated': { itemCount: number } };
const shop = new Emitter<ShopEvents>();
shop.on('cart:updated', ({ itemCount }) => console.log(itemCount));
shop.emit('cart:updated', { itemCount: 2 });

2. Add once(event, handler) and verify the handler runs exactly one time even if you emit twice {Thêm once(event, handler) và xác minh handler chạy đúng một lần dù emit hai lần}.

Solution {Lời giải}
// Extend Emitter with:
once<K extends keyof Events>(event: K, handler: Handler<Events, K>): () => void {
  const wrapped: Handler<Events, K> = (payload) => {
    off();
    handler(payload);
  };
  const off = this.on(event, wrapped);
  return off;
}

let calls = 0;
const off = shop.once('cart:updated', () => { calls += 1; });
shop.emit('cart:updated', { itemCount: 1 });
shop.emit('cart:updated', { itemCount: 2 });
console.log(calls); // 1
off(); // idempotent cleanup

3. Use EventTarget + CustomEvent + AbortController: register a listener, dispatch once, call abort(), dispatch again — prove the second dispatch does not run the handler {Dùng EventTarget + CustomEvent + AbortController: đăng ký listener, dispatch một lần, gọi abort(), dispatch lần hai — chứng minh lần hai không chạy handler}.

Solution {Lời giải}
const target = new EventTarget();
const ac = new AbortController();
let hits = 0;

target.addEventListener(
  'ping',
  () => { hits += 1; },
  { signal: ac.signal },
);

target.dispatchEvent(new CustomEvent('ping'));
ac.abort();
target.dispatchEvent(new CustomEvent('ping'));
console.log(hits); // 1

4. Demonstrate a memory leak: a function registers on a global bus every time it is called without unsubscribing. Fix it by returning and invoking an unsubscribe {Minh họa rò bộ nhớ: hàm đăng ký lên bus global mỗi lần gọi mà không unsubscribe. Sửa bằng cách trả và gọi unsubscribe}.

Solution {Lời giải}
const globalBus = new Emitter<{ tick: { n: number } }>();

// ❌ leak
function brokenMount() {
  globalBus.on('tick', () => { /* ... */ });
}
brokenMount();
brokenMount();
// two handlers — double work, retained closures

// ✅ idempotent mount
let teardown: (() => void) | undefined;

function safeMount(): void {
  teardown?.();
  teardown = globalBus.on('tick', () => { /* ... */ });
}

function safeUnmount(): void {
  teardown?.();
  teardown = undefined;
}

safeMount();
safeMount();
// still one handler after second mount
safeUnmount();

5. Sketch a typed subscribe(topic, handler) / publish(topic, payload) pair for DomainEvents with at least two topics and two subscriber modules that do not import each other {Phác subscribe(topic, handler) / publish(topic, payload) có kiểu cho DomainEvents với ít nhất hai topic và hai module subscriber không import lẫn nhau}.

Solution {Lời giải}
type DomainEvents = {
  'order:placed': { orderId: string };
  'inventory:low': { sku: string };
};

const core = new Emitter<DomainEvents>();

export function publish<K extends keyof DomainEvents>(
  topic: K,
  payload: DomainEvents[K],
): void {
  core.emit(topic, payload);
}

export function subscribe<K extends keyof DomainEvents>(
  topic: K,
  handler: (payload: DomainEvents[K]) => void,
): () => void {
  return core.on(topic, handler);
}

// module A
subscribe('order:placed', ({ orderId }) => console.log('audit', orderId));

// module B
subscribe('inventory:low', ({ sku }) => console.log('restock', sku));

publish('order:placed', { orderId: 'o-1' });

Stretch {Nâng cao}: wrap Emitter handlers so each runs inside queueMicrotask and log event name + duration; emit ten events in a row and confirm no handler blocks another (conceptual backpressure) {Bọc handler Emitter để mỗi cái chạy trong queueMicrotask và log tên event + thời gian; emit mười event liên tiếp và xác nhận handler này không chặn handler kia (backpressure khái niệm)}.

Solution {Lời giải}
class Emitter<Events extends EventMap> {
  protected readonly listeners: {
    [K in keyof Events]?: Set<Handler<Events, K>>;
  } = {};
  // on, off, emit ...
}

class AsyncEmitter<Events extends EventMap> extends Emitter<Events> {
  override emit<K extends keyof Events>(event: K, payload: Events[K]): void {
    const set = this.listeners[event];
    if (!set) return;
    for (const handler of [...set]) {
      queueMicrotask(() => {
        const t0 = performance.now();
        try {
          handler(payload);
        } finally {
          console.log(String(event), performance.now() - t0, 'ms');
        }
      });
    }
  }
}

emit returns immediately; handlers run on the microtask queue — no long sync work blocks peers {emit trả về ngay; handler chạy trên microtask queue — không có việc sync dài chặn peer}.


Key takeaways {Điểm chính}

  • Observer = notify many listeners when state changes; stop polling and stop hard-wiring every reaction into the producer {Observer = báo nhiều listener khi state đổi; bỏ polling và bỏ nối cứng mọi phản ứng vào producer}.
  • Emitter<Events> with a record of payloads gives compile-time safety for on / emit {Emitter<Events> với record payload cho an toàn compile-time cho on / emit}.
  • EventTarget + AbortSignal is the platform unsubscribe story; use it for DOM-aligned lifecycles {EventTarget + AbortSignal là câu chuyện hủy đăng ký nền tảng; dùng cho vòng đời gắn DOM}.
  • Pub/Sub adds a bus so publishers and subscribers stay ignorant of each other {Pub/Sub thêm bus để publisher và subscriber không biết nhau}.
  • Signals/RxJS are Observer with tracking or operators; your job in app code is still subscribe/unsubscribe discipline {Signal/RxJS là Observer có tracking hoặc operator; việc của bạn trong app vẫn là kỷ luật subscribe/unsubscribe}.
  • #1 bug: leaks — always pair subscribe with teardown {Bug #1: leak — luôn ghép subscribe với teardown}.

Next up {Tiếp theo}

Part 6 — Decorator & Middleware: wrap behavior around a core function or object without subclass explosion — logging, auth, caching, and Express-style pipelines {Phần 6 — Decorator & Middleware: bọc hành vi quanh hàm hoặc object lõi không cần bùng nổ subclass — logging, auth, cache, pipeline kiểu Express}. Continue to Part 6 — Decorator & Middleware. ← Part 4 — Strategy