realtime/subscriber_registry.ts

Generic subscriber registry for broadcasting to SSE clients.

Supports channel-based filtering — subscribers connect with optional channel filters, and broadcasts reach only matching subscribers.

Two identity slots enable both targeted disconnection and per-scope cap enforcement: - scope — a single capped identity (e.g., session hash). Subject to the per-scope cap and matched by close_by_identity. Use for the narrowest identity the subscriber belongs to. - groups — any number of uncapped identities (e.g., account id). Matched by close_by_identity but not subject to any cap. Use for coarser scopes a stream should be reachable by.

The split keeps "tabs-per-session" cap semantics sane when a stream also carries a broader identity for coarse close — the broader identity doesn't cap across sessions.

Declarations
#

4 declarations

view source

SubscribeOptions
#

realtime/subscriber_registry.ts view source

SubscribeOptions

Options for SubscriberRegistry.subscribe.

channels

Channels to subscribe to. Empty/absent = all channels.

type ReadonlyArray<string>

scope

Primary (capped) identity — e.g., session hash. Subject to max_per_scope and matched by close_by_identity.

type string

groups

Grouping identities — e.g., account id. Matched by close_by_identity but NOT subject to the cap. Use for coarse-targeted close.

type ReadonlyArray<string>

Subscriber
#

realtime/subscriber_registry.ts view source

Subscriber<T>

generics

T

stream

type SseStream<T>

channels

Channels this subscriber listens to. null means all channels.

type Set<string> | null

scope

Primary (capped) identity. null when none.

type string | null

groups

Grouping identities for close_by_identity. null when none.

type Set<string> | null

SubscriberRegistry
#

realtime/subscriber_registry.ts view source

Generic subscriber registry with channel-based filtering and identity-keyed disconnection.

Subscribers connect with optional channel filters, a capped scope, and uncapped groups. Broadcasts go to a specific channel and reach only matching subscribers. close_by_identity force-closes all subscribers whose scope or groups contain the given key — use for auth revocation.

generics

T

examples

const registry = new SubscriberRegistry<SseNotification>(); // subscriber connects (from SSE endpoint) const unsubscribe = registry.subscribe(stream, {channels: ['runs']}); // when a run changes registry.broadcast('runs', {method: 'run_created', params: {run}}); // subscriber disconnects unsubscribe();
// scope = session hash (capped), groups = [account id] (close-only) const unsubscribe = registry.subscribe(stream, { channels: ['audit_log'], scope: session_hash, groups: [account_id], }); // coarse — close all of a user's streams on role revocation registry.close_by_identity(account_id); // fine — close just the stream(s) tied to a specific session registry.close_by_identity(session_hash);

constructor

type new <T>(options?: SubscriberRegistryOptions | undefined): SubscriberRegistry<T>

options?
type SubscriberRegistryOptions | undefined
optional

subscribe

Add a subscriber.

type (stream: SseStream<T>, options?: SubscribeOptions | undefined): () => void

stream

SSE stream to send data to

type SseStream<T>
options?

channel filter and identity slots (scope + groups)

type SubscribeOptions | undefined
optional
returns () => void

unsubscribe function

broadcast

Broadcast data to all subscribers on a channel.

Subscribers with no channel filter receive all broadcasts. Subscribers with a channel filter only receive matching broadcasts.

type (channel: string, data: T): void

channel
type string
data
type T
returns void

close_by_identity

Force-close all subscribers whose scope or groups match the given key.

Closes each matching stream and removes the subscriber from the registry. Use for auth revocation — when a user's permissions change, close their SSE connections so they must reconnect and re-authenticate.

type (identity: string): number

identity

the identity key to match (checked against scope and groups)

type string
returns number

the number of subscribers closed

SubscriberRegistryOptions
#

realtime/subscriber_registry.ts view source

SubscriberRegistryOptions

Options for SubscriberRegistry.

max_per_scope

Max subscribers sharing a single scope. On subscribe, when the count of subscribers with the same scope reaches this limit, the oldest matching subscriber(s) are closed before the new one is added. null (default) disables the cap. groups identities are never capped.

type number | null

Imported by
#