Queue-Based Proxy Fetch
Route fetch requests through Cloudflare Queues to Workers with CPU billing.
- Automatically scales to cloud-scale
- High latency penalty (200ms to 2-3 seconds p90)
proxyFetch() automatically selects the Queue variant when it detects a PROXY_FETCH_QUEUE binding in your environment. If you have multiple proxy-fetch variants configured and want to explicitly use the Queue variant, call proxyFetchQueue() directly instead.
How It Works
Quick Start
1. Install
npm install @lumenize/proxy-fetch
2. Configure Queue
Add queue configuration to your wrangler.jsonc:
{
"name": "lumenize-proxy-fetch-queue-tests",
"main": "test-worker-and-dos.ts",
"compatibility_date": "2025-09-12",
"durable_objects": {
"bindings": [
{
"name": "MY_DO",
"class_name": "MyDO"
}
]
},
"queues": {
"producers": [
{
"queue": "proxy-fetch-queue",
"binding": "PROXY_FETCH_QUEUE"
}
],
"consumers": [
{
"queue": "proxy-fetch-queue",
"max_batch_size": 10,
"max_batch_timeout": 5,
"max_retries": 3
}
]
},
"migrations": [
{
"tag": "v1",
"new_sqlite_classes": [
"MyDO"
]
}
]
}
3. Set Up DO
import { DurableObject } from 'cloudflare:workers';
import { proxyFetch } from '@lumenize/proxy-fetch';
import type { ProxyFetchHandlerItem } from '@lumenize/proxy-fetch';
export class MyDO extends DurableObject<Env> {
/**
* Your business logic that needs to call external API
*/
async myBusinessProcess(): Promise<void> {
// Send to queue - returns reqId, DO wall clock billing stops
const reqId = await proxyFetch(
this, // DO instance
'https://api.example.com/data', // URL or Request object
'MY_DO', // DO binding name
'myResponseHandler' // Handler method name (optional for fire-and-forget)
);
// Response will arrive later via myResponseHandler()
}
/**
* Your response handler - called when response arrives
*/
async myResponseHandler({
response,
error,
reqId
}: ProxyFetchHandlerItem): Promise<void> {
if (error) {
console.error('Fetch failed:', error);
return;
}
// Process the response
const data = await response!.json();
// Store it, process it, whatever your business logic needs
this.ctx.storage.kv.put('api-data', data);
}
// ...
}
4. Set Up Queue Consumer
In your worker, handle queue messages:
import { proxyFetchQueueConsumer } from '@lumenize/proxy-fetch';
export default {
// ...
// Required boilerplate
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
await proxyFetchQueueConsumer(batch, env);
}
} satisfies ExportedHandler<Env>;
That's it! Now when you call myBusinessProcess(), the fetch happens in a Worker and DO
wall clock billing stops.
Configuration Options
Control retry behavior and timeouts:
const reqId = await proxyFetch(
this,
'https://api.example.com/data',
'MY_DO',
'myResponseHandler',
{
timeout: 30000, // Request timeout in ms (default: 30000)
maxRetries: 3, // Max retry attempts (default: 3)
retryDelay: 1000, // Initial retry delay in ms (default: 1000)
maxRetryDelay: 10000, // Max retry delay in ms (default: 10000)
retryOn5xx: true // Retry on 5xx errors (default: true)
}
);
Retry Behavior:
- Network errors: Always retried
- 5xx errors: Retried if
retryOn5xxis true - 4xx errors: Never retried (client errors)
- Uses exponential backoff:
delay * 2^retryCountcapped atmaxRetryDelay
Request Objects
Pass full Request objects with headers, body, etc.:
const request = new Request('https://api.example.com/data', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.env.API_TOKEN}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({ query: 'something' })
});
await proxyFetch(this, request, 'MY_DO', 'myResponseHandler');
Error Handling
Your handler receives both successful responses and errors:
async myResponseHandler({
response,
error,
reqId,
retryCount,
duration
}: ProxyFetchHandlerItem): Promise<void> {
if (error) {
// Network error, timeout, or max retries exceeded
console.error('Fetch failed:', error.message);
this.ctx.storage.kv.put('last-error', error.message);
return;
}
// Check HTTP status
if (!response.ok) {
console.error('HTTP error:', response.status, response.statusText);
return;
}
// Success
const data = await response.json();
console.log('%o', { reqId, retryCount, duration }); // duration in ms
// Process data...
}
Multiple Handlers
Use different handlers for different API calls:
export class MyDO extends DurableObject {
async fetchUsers(): Promise<void> {
await proxyFetch(this, '/api/users', 'MY_DO', 'handleUsers');
}
async fetchPosts(): Promise<void> {
await proxyFetch(this, '/api/posts', 'MY_DO', 'handlePosts');
}
async handleUsers({ response }: ProxyFetchHandlerItem): Promise<void> {
const users = await response!.json();
// Store users...
}
async handlePosts({ response }: ProxyFetchHandlerItem): Promise<void> {
const posts = await response!.json();
// Store posts...
}
}
Fire-and-Forget Mode
Omit the handler parameter for fire-and-forget requests (no callback).
This saves the request count charge for the additional Workers RPC call and saves a miniscule amount of wall clock charges for the handler processing.
Use cases:
- Webhooks
- Logging and analytics
- Background tasks that don't affect business logic
Note: The queue consumer still processes the request, but skips the RPC callback if no handler is specified.
export class MyDO extends DurableObject {
async logAnalytics(): Promise<void> {
// Queue the request but don't wait for response
const reqId = await proxyFetch(
this,
'https://analytics.example.com/event',
'MY_DO'
// No handler - fire and forget!
);
// Continue immediately, no callback when fetch completes
}
}
How the Pieces Fit Together
The two core functions work together to route requests through the queue:
-
proxyFetch()- Called from within your DO- Validates handler exists on DO instance (if provided)
- Serializes request and queues it with handler name embedded
- Returns reqId immediately (non-blocking)
-
proxyFetchQueueConsumer()- Put in your Workerqueue()handler- Deserializes requests from queue
- Makes external fetch calls (CPU time billing)
- Handles retries with exponential backoff
- Routes responses back to DOs via Workers RPC
- Calls your handler directly:
stub[handlerName](handlerItem)
When To Use
Use proxy-fetch when:
- ✅ External API calls are slow (>100ms)
- ✅ High volume of requests
- ✅ Cost is a concern
Don't use when:
- ❌ Latency-sensitive
- ❌ Low volume (not worth the complexity)
- ❌ External API is fast (<50ms)
Best Practices
Error Handling
Always handle both success and error cases in your handlers:
async myHandler({ response, error }: ProxyFetchHandlerItem): Promise<void> {
// Handle errors first
if (error) {
// Log, store error state, trigger fallback logic, etc.
return;
}
// Check HTTP status
if (!response.ok) {
// Handle HTTP errors
return;
}
// Process successful response
const data = await response.json();
// ...
}
Storage Patterns
Store context before calling proxyFetch if your handler needs it:
async processUser(userId: string, sessionData: any): Promise<void> {
// Store context that handler will need
this.ctx.storage.kv.put(`session:${userId}`, sessionData);
// Include userId in the request URL or body so handler can retrieve context
await proxyFetch(
this,
`/api/users/${userId}`,
'MY_DO',
'handleUserData'
);
}
async handleUserData({ response, error }: ProxyFetchHandlerItem): Promise<void> {
if (error) return;
const data = await response!.json();
const userId = data.id; // Extract from response
// Retrieve stored context
const session = this.ctx.storage.kv.get(`session:${userId}`);
// Process with both response data and stored context
this.ctx.storage.kv.put(`user:${userId}`, { data, session });
}
Using reqId for Request Correlation
Use reqId to correlate responses with stored context:
/**
* Example: Using reqId to correlate with stored context
*/
async fetchUserWithContext(userId: string): Promise<void> {
const reqId = await proxyFetch(
this,
`https://api.example.com/users/${userId}`,
'MY_DO',
'handleUserWithContext'
);
// Store context associated with this specific request
this.ctx.storage.kv.put(`context:${reqId}`, {
userId,
requestedAt: Date.now(),
source: 'user-sync'
});
}
/**
* Handler that retrieves context using reqId
*/
async handleUserWithContext({
response,
error,
reqId
}: ProxyFetchHandlerItem): Promise<void> {
// Retrieve the context we stored using reqId
const context = this.ctx.storage.kv.get(`context:${reqId}`) //...
if (!context) {
console.error(`No context found for reqId: ${reqId}`);
return;
}
if (error) {
console.error(`[${reqId}] Fetch failed for user ${context.userId}:`, error);
// Store error with context
this.ctx.storage.kv.put(`error:${context.userId}`, {
error: error.message,
reqId,
context
});
// Clean up context
this.ctx.storage.kv.delete(`context:${reqId}`);
return;
}
const userData = await response!.json();
console.log(`[${reqId}] Processed user ${context.userId} in ${Date.now() - context.requestedAt}ms`);
// Store result with both API data and our context
this.ctx.storage.kv.put(`user:${context.userId}`, {
userData,
fetchedFrom: context.source,
reqId
});
// Clean up context now that we're done
this.ctx.storage.kv.delete(`context:${reqId}`);
}
Why this pattern works:
- Calls to
proxyFetch()return a uniquereqId - Store temporary context using
reqIdas the key - The payload delivered to your handler includes
reqId - Handler retrieves context using the same
reqId - Clean up context after processing
Queue Configuration
Adjust queue settings based on your needs:
{
"consumers": [
{
"queue": "proxy-fetch-queue",
"max_batch_size": 10, // Higher for throughput
"max_batch_timeout": 5, // Lower for latency
"max_retries": 3 // Queue-level retries
}
]
}
max_batch_size: Higher = more throughput, but longer wait for small batchesmax_batch_timeout: Lower = faster processing, but smaller batchesmax_retries: Queue retries are separate from proxy-fetch retries
Performance Considerations
Latency
- Queue delivery: 60ms to 2-3 seconds (p90)
- External fetch: Depends on API
- RPC callback to DO: Should be same location so minimal
Throughput
- Queue can handle millions of messages
- Worker scales automatically
- Batch processing for efficiency (but adds latency)
Migration from Direct Fetch
Before (Direct fetch in DO):
export class MyDO extends DurableObject {
async fetchData(): Promise<void> {
// DO blocked during fetch (wall clock billing)
const response = await fetch('https://api.example.com/data');
const data = await response.json();
this.ctx.storage.kv.put('data', data);
}
}
After (Using proxy-fetch):
export class MyDO extends DurableObject {
async fetchData(): Promise<void> {
await proxyFetch(this, 'https://api.example.com/data', 'MY_DO', 'handleData');
// Returns immediately (no blocking)
}
async handleData({ response, error }: ProxyFetchHandlerItem): Promise<void> {
//...
const data = await response!.json();
this.ctx.storage.kv.put('data', data);
}
}
Key Differences:
- Add handler method that receives response
- Logic splits into two methods (trigger + handler)
- Set up queue configuration in wrangler.jsonc
- Add queue consumer to worker
proxyFetchreturns reqId (string) instead of Promise<Response>