Skip to main content

Downstream Messaging

Downstream messaging allows your Durable Objects to send messages to connected clients without waiting for an RPC call. This is perfect for real-time notifications, progress updates, or server-initiated events.

Overview

  • Server → Client communication: DO can send messages to connected clients at any time
  • WebSocket only: Requires WebSocket transport (not HTTP)
  • Automatic handling: Messages are delivered via the onDownstream callback
  • Connection lifecycle: Track when clients connect/disconnect with onClose callback

Basic Usage

Client Setup

Register handlers when creating the RPC client:

import { createRpcClient, createWebSocketTransport } from '@lumenize/rpc';

using client = createRpcClient<typeof MyDO>({
transport: createWebSocketTransport('MY_DO', 'instance-1', {
// ...
clientId: 'user-123', // Optional: identify this client
onDownstream: (message) => {
// ...
// Handle real-time updates from the DO
},
onClose: (code, reason) => {
// ...
// Handle disconnection
},
}),
});

Server (Durable Object) Setup

Send messages to connected clients from within your DO:

import { sendDownstream } from '@lumenize/rpc';

export class MyDO {
ctx: DurableObjectState;
env: Env;

constructor(ctx: DurableObjectState, env: Env) {
this.ctx = ctx;
this.env = env;
}

async processLongTask() {
// Send progress updates to all connected clients
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
if (clientIds.length > 0) {
await sendDownstream(clientIds, this, { type: 'progress', percent: 25 });
}

// ... do some work ...

if (clientIds.length > 0) {
await sendDownstream(clientIds, this, { type: 'progress', percent: 50 });
}

// ... more work ...

if (clientIds.length > 0) {
await sendDownstream(clientIds, this, { type: 'progress', percent: 100 });
}

return { status: 'complete' };
}

async notifyClient(clientId: string, message: any) {
// Send to a specific client
await sendDownstream(clientId, this, message);
}
}

Use Cases

Progress Updates

Server side - Send progress updates during long-running operations:

// Durable Object
class VideoDO {
// ...

async processVideo(videoId: string) {
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];

await sendDownstream(clientIds, this, { stage: 'uploading', progress: 0 });

await this.uploadToStorage(videoId);
await sendDownstream(clientIds, this, { stage: 'encoding', progress: 30 });

await this.encodeVideo(videoId);
await sendDownstream(clientIds, this, { stage: 'thumbnails', progress: 70 });

await this.generateThumbnails(videoId);
await sendDownstream(clientIds, this, { stage: 'complete', progress: 100 });

return { success: true };
}
// ...
}

Client side - Receive and handle progress updates:

using client = createRpcClient<typeof VideoDO>({
transport: createWebSocketTransport('VIDEO_DO', videoId, {
onDownstream: (update) => {
updateProgressBar(update.stage, update.progress);
}
})
});

await client.processVideo(videoId);

Real-Time Notifications

Server side - Broadcast notifications when events occur:

// Durable Object - Notification Hub
class NotificationHub {
// ...

async onUserAction(userId: string, action: string) {
// Notify all connected clients when something happens
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];

await sendDownstream(clientIds, this, {
userId,
action,
timestamp: Date.now()
});
}
}

Client side - Subscribe to and display notifications:

using client = createRpcClient<typeof NotificationHub>({
transport: createWebSocketTransport('NOTIFICATIONS', 'global', {
onDownstream: (notification) => {
showToast(`${notification.userId} performed ${notification.action}`);
}
})
});

Multiplayer/Collaborative Features

// Durable Object - Collaborative Document
class DocumentDO {
// ...

getConnectedClients(): string[] {
const connections = this.ctx.getWebSockets();
return [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
}

async updateContent(clientId: string, changes: any) {
// Apply changes locally
await this.applyChanges(changes);

// Broadcast to all OTHER clients (exclude sender)
const otherClients = this.getConnectedClients().filter(id => id !== clientId);
for (const otherId of otherClients) {
await sendDownstream(otherId, this, { changes });
}
}
// ...
}

API Details

sendDownstream(ctx, message, options?)

Send a message from a Durable Object to connected clients.

Parameters:

  • ctx: DurableObjectState - The DO's context
  • message: any - Any StructuredCloneable data
  • options?: { clientId?: string } - Optional: target a specific client

Behavior:

  • If clientId is provided: sends only to that client
  • If clientId is omitted: sends to all connected clients
  • Silently ignores if client(s) not connected
  • Non-blocking: doesn't wait for delivery

onDownstream Callback

Registered on the client when creating the RPC client.

Signature:

onDownstream?: (message: any) => void

Called when:

  • The DO sends a message via sendDownstream()
  • Receives the exact message sent by the DO

onClose Callback

Registered on the client when creating the RPC client.

Signature:

onClose?: (code: number, reason: string) => void

Called when:

  • WebSocket connection closes (graceful or error)
  • Receives standard WebSocket close codes and reasons

Common close codes:

  • 1000: Normal closure
  • 1001: Going away (e.g., page navigation)
  • 1006: Abnormal closure (no close frame)

Important Notes

WebSocket Requirement

Downstream messaging only works with WebSocket transport. HTTP transport doesn't support server-initiated messages.

// ✅ Works - WebSocket transport
createRpcClient({
transport: createWebSocketTransport('MY_DO', 'id'),
config: { onDownstream: (msg) => console.log(msg) }
});

// ❌ Won't work - HTTP doesn't support downstream
createRpcClient({
transport: createHttpTransport('MY_DO', 'id'),
config: { onDownstream: (msg) => console.log(msg) } // Never called
});

Client IDs

The clientId is optional but recommended for multi-user scenarios. See the full working example in client-ids.test.ts.

// Without clientId - broadcast to all connected clients
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
await sendDownstream(clientIds, this, message);

// With clientId - target a specific client
await sendDownstream('user-123', this, message);

Message Serialization

Messages use the same serialization as RPC calls (StructuredClone):

  • Primitives, objects, arrays ✅
  • Date, Map, Set, RegExp
  • Circular references ✅
  • Functions, symbols ❌
  • DOM nodes ❌

See Also