Skip to content

Commit 010179f

Browse files
authored
Feature/queue dispatch (#48)
* feat: add queue support for dispatcher, fallback to RPC * doc: update CF integration
1 parent 6f92f1b commit 010179f

3 files changed

Lines changed: 87 additions & 39 deletions

File tree

CLOUDFLARE_DEPLOYMENT.md

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -166,37 +166,72 @@ The Worker can fan out transcriptions to a separate Transcription Dispatcher wor
166166
2. Container decodes audio and gets transcriptions from OpenAI
167167
3. Worker intercepts transcriptions and:
168168
- Forwards to media server via WebSocket (low latency path)
169-
- Asynchronously dispatches to Dispatcher worker via RPC (doesn't block)
169+
- Asynchronously dispatches to Dispatcher worker (doesn't block)
170+
171+
**Dispatch Methods (in order of preference):**
172+
173+
1. **Cloudflare Queue** (recommended) - Messages sent to a queue, consumed by Dispatcher
174+
- Doesn't count against the 50 subrequest limit per WebSocket connection
175+
- Better for high-throughput scenarios
176+
177+
2. **Service Binding RPC** (fallback) - Direct RPC call to Dispatcher worker
178+
- Each dispatch counts as a subrequest (50 limit per WebSocket lifetime)
179+
- Use only for low-traffic scenarios
180+
181+
**Setup with Queue (Recommended):**
182+
183+
1. Deploy your Transcription Dispatcher worker with queue consumer configured
184+
2. Update `wrangler.jsonc` to add a queue producer:
185+
```jsonc
186+
"queues": {
187+
"producers": [{
188+
"binding": "TRANSCRIPTION_QUEUE",
189+
"queue": "transcription-dispatch-queue-staging"
190+
}]
191+
}
192+
```
193+
3. Set `USE_DISPATCHER` environment variable:
194+
```jsonc
195+
"vars": {
196+
"USE_DISPATCHER": "true"
197+
}
198+
```
199+
4. Or enable per-connection with query parameter:
200+
```
201+
wss://your-worker.workers.dev/transcribe?sessionId=test&transcribe=true&sendBack=true&useDispatcher=true
202+
```
203+
204+
**Setup with RPC (Fallback):**
170205

171-
**Setup:**
172206
1. Deploy your Transcription Dispatcher worker (must implement the `dispatch()` RPC method)
173-
2. Update `wrangler-container.jsonc` to point to your dispatcher:
207+
2. Update `wrangler.jsonc` to add a service binding:
174208
```jsonc
175209
"services": [{
176210
"binding": "TRANSCRIPTION_DISPATCHER",
177-
"service": "your-dispatcher-worker-name",
178-
"environment": "production"
211+
"service": "transcription-dispatcher"
179212
}]
180213
```
181-
3. Connect with `useDispatcher=true`:
182-
```
183-
wss://your-worker.workers.dev/transcribe?sessionId=test&transcribe=true&sendBack=true&useDispatcher=true
184-
```
214+
3. Enable with `USE_DISPATCHER=true` or `useDispatcher=true` query param
185215

186-
**Dispatcher Interface:**
187-
Your dispatcher worker must implement:
188-
```typescript
189-
export interface TranscriptionDispatcher extends WorkerEntrypoint<Env> {
190-
dispatch(message: DispatcherTranscriptionMessage): Promise<RPCResponse>;
191-
}
216+
**Environment Variables:**
217+
- `USE_DISPATCHER` - Set to `"true"` to enable dispatching by default (can be overridden per-connection via URL param)
192218

219+
**Message Format:**
220+
```typescript
193221
interface DispatcherTranscriptionMessage {
194222
sessionId: string;
195223
endpointId: string; // participant ID
196224
text: string; // full transcript text
197225
timestamp: number;
198226
language?: string;
199227
}
228+
```
229+
230+
**Dispatcher Interface (for RPC fallback):**
231+
```typescript
232+
export interface TranscriptionDispatcher extends WorkerEntrypoint<Env> {
233+
dispatch(message: DispatcherTranscriptionMessage): Promise<RPCResponse>;
234+
}
200235

201236
interface RPCResponse {
202237
success: boolean;
@@ -206,7 +241,7 @@ interface RPCResponse {
206241
}
207242
```
208243

209-
See the original dispatcher implementation on the `main` branch for a reference implementation.
244+
See the [transcription-dispatcher](https://github.com/jitsi/vo_meetings_cf-transcription-dispatcher) repository for the dispatcher implementation.
210245

211246
### Monitoring
212247

@@ -341,7 +376,7 @@ Media Server (WebSocket)
341376
342377
Cloudflare Worker (intercepts & fans out)
343378
↓ ↓
344-
↓ (audio) (transcripts via RPC)
379+
↓ (audio) (transcripts via Queue/RPC)
345380
↓ ↓
346381
Container Instance Transcription Dispatcher
347382
↓ (optional, parallel)
@@ -364,7 +399,7 @@ Media Server (receives transcripts)
364399
3. OpenAI returns transcripts → Container → Worker
365400
4. Worker fans out transcripts to:
366401
- Media server (via WebSocket, low latency)
367-
- Dispatcher (via Service Binding RPC, async, optional)
402+
- Dispatcher (via Queue or RPC, async, optional)
368403

369404
Each container instance:
370405
- Runs the full Node.js application

worker/env.d.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
// Environment types for the Cloudflare Worker
22

3-
import type { TranscriptionDispatcher } from './index';
3+
import type { TranscriptionDispatcher, DispatcherTranscriptionMessage } from './index';
44

55
interface Env {
66
// Durable Object binding for the container
77
TRANSCRIBER: DurableObjectNamespace;
88

9-
// Service bindings
9+
// Service bindings (kept for backwards compatibility, prefer queue)
1010
TRANSCRIPTION_DISPATCHER?: Service<TranscriptionDispatcher>;
1111

12+
// Queue binding for transcription dispatch (preferred)
13+
TRANSCRIPTION_QUEUE?: Queue<DispatcherTranscriptionMessage>;
14+
1215
// Durable Object for auto-scaling
1316
CONTAINER_COORDINATOR: DurableObjectNamespace;
1417

worker/index.ts

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,9 @@ async function handleWebSocketWithDispatcher(
220220
const containerWs = containerResponse.webSocket;
221221
containerWs.accept();
222222

223-
const dispatcher = env.TRANSCRIPTION_DISPATCHER!;
223+
// Prefer queue over RPC (queue doesn't count against subrequest limit)
224+
const queue = env.TRANSCRIPTION_QUEUE;
225+
const dispatcher = env.TRANSCRIPTION_DISPATCHER;
224226

225227
// Pipe: client → container (upstream, no interception needed)
226228
serverWs.addEventListener('message', (event) => {
@@ -249,25 +251,33 @@ async function handleWebSocketWithDispatcher(
249251
language: data.language,
250252
};
251253

252-
// Fire and forget - don't block the client
253-
dispatcher
254-
.dispatch(dispatcherMessage)
255-
.then((response) => {
256-
if (!response.success || response.errors) {
257-
console.error(
258-
'Dispatcher error:',
259-
JSON.stringify({
260-
message: response.message,
261-
errors: response.errors,
262-
}),
263-
);
264-
}
265-
})
266-
.catch((error) => {
254+
// Use queue if available (preferred - no subrequest limit)
255+
if (queue) {
256+
queue.send(dispatcherMessage).catch((error) => {
267257
const msg = error instanceof Error ? error.message : String(error);
268-
const stack = error instanceof Error ? error.stack : undefined;
269-
console.error('Dispatcher RPC failed:', msg, stack || '');
258+
console.error('Queue send failed:', msg);
270259
});
260+
} else if (dispatcher) {
261+
// Fall back to RPC (has subrequest limit)
262+
dispatcher
263+
.dispatch(dispatcherMessage)
264+
.then((response) => {
265+
if (!response.success || response.errors) {
266+
console.error(
267+
'Dispatcher error:',
268+
JSON.stringify({
269+
message: response.message,
270+
errors: response.errors,
271+
}),
272+
);
273+
}
274+
})
275+
.catch((error) => {
276+
const msg = error instanceof Error ? error.message : String(error);
277+
const stack = error instanceof Error ? error.stack : undefined;
278+
console.error('Dispatcher RPC failed:', msg, stack || '');
279+
});
280+
}
271281
}
272282
} catch {
273283
// Not JSON or parse error - ignore, still forwarded to client
@@ -370,7 +380,7 @@ export default {
370380
}
371381

372382
// If dispatcher is enabled and this is a WebSocket upgrade, intercept the connection
373-
if (useDispatcher && upgradeHeader === 'websocket' && env.TRANSCRIPTION_DISPATCHER) {
383+
if (useDispatcher && upgradeHeader === 'websocket' && (env.TRANSCRIPTION_QUEUE || env.TRANSCRIPTION_DISPATCHER)) {
374384
return handleWebSocketWithDispatcher(request, container, env, ctx, sessionId);
375385
}
376386

0 commit comments

Comments
 (0)