realtime/subscriber_registry.ts

Generic subscriber registry for broadcasting to SSE clients.

Supports channel-based filtering — subscribers connect with optional channel filters, and broadcasts reach only matching subscribers. Optional identity keys enable force-closing subscribers by identity (e.g., close all streams for a specific account when their permissions change).

Declarations
#

2 declarations

view source

Subscriber
#

realtime/subscriber_registry.ts view source

Subscriber<T>

generics

T

stream

type SseStream<T>

channels

Channels this subscriber listens to. null means all channels.

type Set<string> | null

identity

Optional identity key for targeted disconnection (e.g., account_id).

type string | null

SubscriberRegistry
#

realtime/subscriber_registry.ts view source

Generic subscriber registry with channel-based filtering and identity-keyed disconnection.

Subscribers connect with optional channel filters and an optional identity key. Broadcasts go to a specific channel and reach only matching subscribers. close_by_identity force-closes all subscribers with a given identity — use for auth revocation (close streams when a user's permissions change).

generics

T

examples

const registry = new SubscriberRegistry<SseNotification>(); // subscriber connects (from SSE endpoint) const unsubscribe = registry.subscribe(stream, ['runs']); // when a run changes registry.broadcast('runs', {method: 'run_created', params: {run}}); // subscriber disconnects unsubscribe();
// identity-keyed subscription for auth revocation const unsubscribe = registry.subscribe(stream, ['audit_log'], account_id); // when admin revokes the user's role — close their streams registry.close_by_identity(account_id);

subscribe

Add a subscriber.

type (stream: SseStream<T>, channels?: string[] | undefined, identity?: string | undefined): () => void

stream

SSE stream to send data to

type SseStream<T>
channels?

channels to subscribe to (undefined or empty = all channels)

type string[] | undefined
optional
identity?

optional identity key for targeted disconnection

type string | undefined
optional
returns () => void

unsubscribe function

broadcast

Broadcast data to all subscribers on a channel.

Subscribers with no channel filter receive all broadcasts. Subscribers with a channel filter only receive matching broadcasts.

type (channel: string, data: T): void

channel

the channel to broadcast on

type string
data

the data to send

type T
returns void

close_by_identity

Force-close all subscribers with the given identity.

Closes each matching stream and removes the subscriber from the registry. Use for auth revocation — when a user's permissions change, close their SSE connections so they must reconnect and re-authenticate.

type (identity: string): number

identity

the identity key to match

type string
returns number

the number of subscribers closed

Imported by
#