Downstream Messaging
Downstream messaging allows your Durable Objects to send messages to connected clients without waiting for an RPC call. This is perfect for real-time notifications, progress updates, or server-initiated events.
Overview
- Server → Client communication: DO can send messages to connected clients at any time
- WebSocket only: Requires WebSocket transport (not HTTP)
- Automatic handling: Messages are delivered via the
onDownstreamcallback - Connection lifecycle: Track when clients connect/disconnect with
onClosecallback
Basic Usage
Client Setup
Register handlers when creating the RPC client:
import { createRpcClient, createWebSocketTransport } from '@lumenize/rpc';
using client = createRpcClient<typeof MyDO>({
transport: createWebSocketTransport('MY_DO', 'instance-1', {
// ...
clientId: 'user-123', // Optional: identify this client
onDownstream: (message) => {
// ...
// Handle real-time updates from the DO
},
onClose: (code, reason) => {
// ...
// Handle disconnection
},
}),
});
Server (Durable Object) Setup
Send messages to connected clients from within your DO:
import { sendDownstream } from '@lumenize/rpc';
export class MyDO {
ctx: DurableObjectState;
env: Env;
constructor(ctx: DurableObjectState, env: Env) {
this.ctx = ctx;
this.env = env;
}
async processLongTask() {
// Send progress updates to all connected clients
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
if (clientIds.length > 0) {
await sendDownstream(clientIds, this, { type: 'progress', percent: 25 });
}
// ... do some work ...
if (clientIds.length > 0) {
await sendDownstream(clientIds, this, { type: 'progress', percent: 50 });
}
// ... more work ...
if (clientIds.length > 0) {
await sendDownstream(clientIds, this, { type: 'progress', percent: 100 });
}
return { status: 'complete' };
}
async notifyClient(clientId: string, message: any) {
// Send to a specific client
await sendDownstream(clientId, this, message);
}
}
Use Cases
Progress Updates
Server side - Send progress updates during long-running operations:
// Durable Object
class VideoDO {
// ...
async processVideo(videoId: string) {
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
await sendDownstream(clientIds, this, { stage: 'uploading', progress: 0 });
await this.uploadToStorage(videoId);
await sendDownstream(clientIds, this, { stage: 'encoding', progress: 30 });
await this.encodeVideo(videoId);
await sendDownstream(clientIds, this, { stage: 'thumbnails', progress: 70 });
await this.generateThumbnails(videoId);
await sendDownstream(clientIds, this, { stage: 'complete', progress: 100 });
return { success: true };
}
// ...
}
Client side - Receive and handle progress updates:
using client = createRpcClient<typeof VideoDO>({
transport: createWebSocketTransport('VIDEO_DO', videoId, {
onDownstream: (update) => {
updateProgressBar(update.stage, update.progress);
}
})
});
await client.processVideo(videoId);
Real-Time Notifications
Server side - Broadcast notifications when events occur:
// Durable Object - Notification Hub
class NotificationHub {
// ...
async onUserAction(userId: string, action: string) {
// Notify all connected clients when something happens
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
await sendDownstream(clientIds, this, {
userId,
action,
timestamp: Date.now()
});
}
}
Client side - Subscribe to and display notifications:
using client = createRpcClient<typeof NotificationHub>({
transport: createWebSocketTransport('NOTIFICATIONS', 'global', {
onDownstream: (notification) => {
showToast(`${notification.userId} performed ${notification.action}`);
}
})
});
Multiplayer/Collaborative Features
// Durable Object - Collaborative Document
class DocumentDO {
// ...
getConnectedClients(): string[] {
const connections = this.ctx.getWebSockets();
return [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
}
async updateContent(clientId: string, changes: any) {
// Apply changes locally
await this.applyChanges(changes);
// Broadcast to all OTHER clients (exclude sender)
const otherClients = this.getConnectedClients().filter(id => id !== clientId);
for (const otherId of otherClients) {
await sendDownstream(otherId, this, { changes });
}
}
// ...
}
API Details
sendDownstream(ctx, message, options?)
Send a message from a Durable Object to connected clients.
Parameters:
ctx: DurableObjectState- The DO's contextmessage: any- Any StructuredCloneable dataoptions?: { clientId?: string }- Optional: target a specific client
Behavior:
- If
clientIdis provided: sends only to that client - If
clientIdis omitted: sends to all connected clients - Silently ignores if client(s) not connected
- Non-blocking: doesn't wait for delivery
onDownstream Callback
Registered on the client when creating the RPC client.
Signature:
onDownstream?: (message: any) => void
Called when:
- The DO sends a message via
sendDownstream() - Receives the exact message sent by the DO
onClose Callback
Registered on the client when creating the RPC client.
Signature:
onClose?: (code: number, reason: string) => void
Called when:
- WebSocket connection closes (graceful or error)
- Receives standard WebSocket close codes and reasons
Common close codes:
1000: Normal closure1001: Going away (e.g., page navigation)1006: Abnormal closure (no close frame)
Important Notes
WebSocket Requirement
Downstream messaging only works with WebSocket transport. HTTP transport doesn't support server-initiated messages.
// ✅ Works - WebSocket transport
createRpcClient({
transport: createWebSocketTransport('MY_DO', 'id'),
config: { onDownstream: (msg) => console.log(msg) }
});
// ❌ Won't work - HTTP doesn't support downstream
createRpcClient({
transport: createHttpTransport('MY_DO', 'id'),
config: { onDownstream: (msg) => console.log(msg) } // Never called
});
Client IDs
The clientId is optional but recommended for multi-user scenarios. See the full working example in client-ids.test.ts.
// Without clientId - broadcast to all connected clients
const connections = this.ctx.getWebSockets();
const clientIds = [...new Set(connections.flatMap(ws => this.ctx.getTags(ws)))];
await sendDownstream(clientIds, this, message);
// With clientId - target a specific client
await sendDownstream('user-123', this, message);
Message Serialization
Messages use the same serialization as RPC calls (StructuredClone):
- Primitives, objects, arrays ✅
Date,Map,Set,RegExp✅- Circular references ✅
- Functions, symbols ❌
- DOM nodes ❌
See Also
- Quick Start - Basic RPC usage
- WebSocket Transport API - Transport configuration
sendDownstreamAPI - Full API reference