realtime/subscriber_registry.ts view source
Generic subscriber registry with channel-based filtering and identity-keyed disconnection.
Subscribers connect with optional channel filters and an optional identity key.
Broadcasts go to a specific channel and reach only matching subscribers.
close_by_identity force-closes all subscribers with a given identity —
use for auth revocation (close streams when a user's permissions change).
examples
const registry = new SubscriberRegistry<SseNotification>();
const unsubscribe = registry.subscribe(stream, ['runs']);
registry.broadcast('runs', {method: 'run_created', params: {run}});
unsubscribe();
const unsubscribe = registry.subscribe(stream, ['audit_log'], account_id);
registry.close_by_identity(account_id);
subscribe
type (stream: SseStream<T>, channels?: string[] | undefined, identity?: string | undefined): () => void
stream
SSE stream to send data to
type SseStream<T>
channels?
channels to subscribe to (undefined or empty = all channels)
type string[] | undefined
optional
identity?
optional identity key for targeted disconnection
type string | undefined
optional
returns () => void
broadcast
Broadcast data to all subscribers on a channel.
Subscribers with no channel filter receive all broadcasts.
Subscribers with a channel filter only receive matching broadcasts.
type (channel: string, data: T): void
channel
the channel to broadcast on
type string
returns void
close_by_identity
Force-close all subscribers with the given identity.
Closes each matching stream and removes the subscriber from the registry.
Use for auth revocation — when a user's permissions change, close their
SSE connections so they must reconnect and re-authenticate.
type (identity: string): number
identity
the identity key to match
type string
returns number
the number of subscribers closed