Skip to content

Commit 1c23af7

Browse files
authored
Add first-frame audio sniff and session-end summary diagnostic logs (#84)
* feat: add first-frame audio sniff and session-end summary diagnostic logs * review: sniff per tag, fix head-bytes label, add diagnostic-logging tests * review: quote JSON log fields, reset sniff on reattach, nits * diag: log per-transcription text length and preview at DEBUG * worker: forward LOG_LEVEL to container (derives debug from DEBUG=true) * review: shallow-copy inputFormat, gate debug log, flip DEBUG default to false
1 parent 6215b13 commit 1c23af7

6 files changed

Lines changed: 174 additions & 2 deletions

File tree

src/OutgoingConnection.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ export class OutgoingConnection {
7272
this.initializeBackend();
7373
}
7474

75+
getInputFormat(): AudioFormat {
76+
// Return a shallow copy so callers can't mutate the decoder's format state.
77+
return { ...this.inputAudioFormat };
78+
}
79+
7580
updateInputFormat(inputFormat: AudioFormat): void {
7681
// Validate synchronously so callers get an immediate error rather than
7782
// an async failure deep in reinitializeDecoder -> createAudioDecoder.
@@ -179,11 +184,13 @@ export class OutgoingConnection {
179184
private setupBackendHandlers(backend: TranscriptionBackend): void {
180185
backend.onInterimTranscription = (message) => {
181186
getInstruments().transcriptionsReceivedTotal.add(1, { provider: this.options.provider || 'unknown', is_interim: 'true' });
187+
this.logTranscriptionSummary(message, true);
182188
this.onInterimTranscription?.(message);
183189
};
184190

185191
backend.onCompleteTranscription = (message) => {
186192
getInstruments().transcriptionsReceivedTotal.add(1, { provider: this.options.provider || 'unknown', is_interim: 'false' });
193+
this.logTranscriptionSummary(message, false);
187194
this.clearIdleCommitTimeout();
188195
this.onCompleteTranscription?.(message);
189196
};
@@ -202,6 +209,21 @@ export class OutgoingConnection {
202209
};
203210
}
204211

212+
private logTranscriptionSummary(message: TranscriptionMessage, isInterim: boolean): void {
213+
// DEBUG-only: summarises each transcription arriving from the backend so we
214+
// can tell apart empty end-of-utterance finals from real speech without
215+
// logging full PII. Controlled by LOG_LEVEL=debug.
216+
// Early-exit when debug isn't enabled so we don't allocate join/slice strings
217+
// on every transcription in production.
218+
if (!logger.isLevelEnabled('debug')) return;
219+
const segments = message.transcript ?? [];
220+
const text = segments.map((s) => s.text ?? '').join(' ').trim();
221+
const preview = text.length > 40 ? text.slice(0, 40) + '…' : text;
222+
logger.debug(
223+
`Backend ${isInterim ? 'interim' : 'final'} tag=${this.localTag} lang=${message.language ?? 'n/a'} segments=${segments.length} textLen=${text.length} preview=${JSON.stringify(preview)}`,
224+
);
225+
}
226+
205227
private async reinitializeDecoder(): Promise<void> {
206228
if (!this.backend) {
207229
throw new Error('Cannot initialize decoder without a backend');

src/transcriberproxy.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ export class TranscriberProxy extends EventEmitter {
4242
private sessionId?: string;
4343
private dispatcherConnection?: DispatcherConnection;
4444
private createdAt: number;
45+
private audioPacketCount = 0;
46+
private interimTranscriptionCount = 0;
47+
private finalTranscriptionCount = 0;
48+
private firstFrameLoggedTags = new Set<string>();
4549

4650
constructor(ws: WebSocket, options: TranscriberProxyOptions) {
4751
super({ captureRejections: true });
@@ -158,9 +162,11 @@ export class TranscriberProxy extends EventEmitter {
158162
const newConnection = new OutgoingConnection(tag, mediaFormat, this.options);
159163

160164
newConnection.onInterimTranscription = (message) => {
165+
this.interimTranscriptionCount++;
161166
this.emit('interim_transcription', message);
162167
};
163168
newConnection.onCompleteTranscription = (message) => {
169+
this.finalTranscriptionCount++;
164170
// Dump transcript if enabled
165171
if (this.transcriptDumpStream) {
166172
try {
@@ -273,6 +279,24 @@ export class TranscriberProxy extends EventEmitter {
273279
logger.warn(`Received media event for tag "${tag}" with no prior start event; creating connection with encoding "${encoding}"`);
274280
connection = this.createConnection(tag, mediaFormat);
275281
}
282+
const payloadB64 = parsedMessage.media?.payload;
283+
const hasAudio = typeof payloadB64 === 'string' && payloadB64.length > 0;
284+
if (hasAudio) {
285+
this.audioPacketCount++;
286+
if (!this.firstFrameLoggedTags.has(tag)) {
287+
// 64 base64 chars decode to at most 48 bytes; we only emit the first 16.
288+
const head = Buffer.from(payloadB64.slice(0, 64), 'base64');
289+
const headByteCount = Math.min(16, head.length);
290+
const headHex = head.subarray(0, headByteCount).toString('hex');
291+
const mediaSnapshot = { ...parsedMessage.media, payload: `<b64:${payloadB64.length} chars, first ${headByteCount} decoded bytes=${headHex}>` };
292+
// JSON-valued fields are quoted so that downstream logfmt-style parsers
293+
// don't misinterpret spaces inside the JSON payload (e.g. inside `tag`).
294+
logger.info(
295+
`First client frame sniff: sessionId=${this.sessionId} tag=${tag} provider=${this.options.provider ?? 'default'} urlEncoding=${this.options.encoding ?? 'opus'} startFormat='${JSON.stringify(connection.getInputFormat())}' media='${JSON.stringify(mediaSnapshot)}'`,
296+
);
297+
this.firstFrameLoggedTags.add(tag);
298+
}
299+
}
276300
connection.handleMediaEvent(parsedMessage);
277301
}
278302
}
@@ -348,6 +372,11 @@ export class TranscriberProxy extends EventEmitter {
348372
// Re-setup listeners on new WebSocket
349373
this.setupWebSocketListeners();
350374

375+
// Treat a reattach as a new connection for diagnostic purposes: the client
376+
// may negotiate a different audio format on reconnect, so fire the
377+
// first-frame sniff again on the first real audio packet per tag.
378+
this.firstFrameLoggedTags.clear();
379+
351380
// Reset chunk tracking on all connections so frames from the new client
352381
// aren't discarded as "reordered" (chunk numbers restart from 0)
353382
this.outgoingConnections.forEach((connection, tag) => {
@@ -360,6 +389,9 @@ export class TranscriberProxy extends EventEmitter {
360389
}
361390

362391
close(): void {
392+
logger.info(
393+
`Session ended: sessionId=${this.sessionId} provider=${this.options.provider ?? 'default'} audioPackets=${this.audioPacketCount} interims=${this.interimTranscriptionCount} finals=${this.finalTranscriptionCount} durationSec=${this.getSessionDurationSec().toFixed(1)}`,
394+
);
363395
this.outgoingConnections.forEach((connection) => {
364396
connection.close();
365397
});

test/unit/OutgoingConnection.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ vi.mock('../../src/logger', () => ({
2323
error: vi.fn(),
2424
warn: vi.fn(),
2525
debug: vi.fn(),
26+
isLevelEnabled: vi.fn(() => true),
2627
},
2728
}));
2829

test/unit/TranscriberProxy.test.ts

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ vi.mock('../../src/config', () => ({
4646
// to `this` in the implementation do not persist on the created instance. Passing the
4747
// implementation directly to vi.fn() avoids this issue.
4848
vi.mock('../../src/OutgoingConnection', () => ({
49-
OutgoingConnection: vi.fn(function (this: any, tag: string) {
49+
OutgoingConnection: vi.fn(function (this: any, tag: string, inputFormat: unknown) {
5050
this.localTag = tag;
5151
this.participantId = tag.split('-')[0]; // Extract participant ID from tag like "participant1-ssrc123"
5252
this.handleMediaEvent = vi.fn();
5353
this.addTranscriptContext = vi.fn();
5454
this.updateInputFormat = vi.fn();
55+
this.getInputFormat = vi.fn(() => inputFormat ?? { encoding: 'opus' });
56+
this.resetChunkTracking = vi.fn();
5557
this.close = vi.fn();
5658
this.onInterimTranscription = undefined;
5759
this.onCompleteTranscription = undefined;
@@ -578,4 +580,115 @@ describe('TranscriberProxy', () => {
578580
expect(conn.handleMediaEvent).toHaveBeenCalledWith(mediaEvent);
579581
});
580582
});
583+
584+
describe('diagnostic logging', () => {
585+
// 'T2dnUw==' is base64 for 'OggS' — the Ogg page capture pattern.
586+
const OGG_PAYLOAD = 'T2dnUw==';
587+
588+
it('logs the first client frame sniff exactly once per tag', () => {
589+
const proxy = new TranscriberProxy(mockWebSocket, options);
590+
proxy.handleStartEvent({ event: 'start', start: { tag: 'tag1', mediaFormat: { encoding: 'opus' } } });
591+
vi.mocked(logger.info).mockClear();
592+
593+
const media1 = { event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 0, timestamp: 0 } };
594+
proxy.handleMediaEvent(media1);
595+
proxy.handleMediaEvent(media1);
596+
proxy.handleMediaEvent(media1);
597+
598+
const sniffCalls = vi.mocked(logger.info).mock.calls.filter(([msg]) => typeof msg === 'string' && msg.startsWith('First client frame sniff:'));
599+
expect(sniffCalls).toHaveLength(1);
600+
const msg = sniffCalls[0][0] as string;
601+
expect(msg).toContain('tag=tag1');
602+
expect(msg).toContain('urlEncoding=opus');
603+
expect(msg).toContain(`startFormat='{"encoding":"opus"}'`);
604+
expect(msg).toContain('4f676753'); // 'OggS' in hex
605+
expect(msg).toContain(`<b64:${OGG_PAYLOAD.length} chars, first 4 decoded bytes=4f676753>`);
606+
});
607+
608+
it('logs the first client frame sniff once per participant tag', () => {
609+
const proxy = new TranscriberProxy(mockWebSocket, options);
610+
proxy.handleStartEvent({ event: 'start', start: { tag: 'tag1', mediaFormat: { encoding: 'opus' } } });
611+
proxy.handleStartEvent({ event: 'start', start: { tag: 'tag2', mediaFormat: { encoding: 'opus' } } });
612+
vi.mocked(logger.info).mockClear();
613+
614+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 0, timestamp: 0 } });
615+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag2', payload: OGG_PAYLOAD, chunk: 0, timestamp: 0 } });
616+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 1, timestamp: 0 } });
617+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag2', payload: OGG_PAYLOAD, chunk: 1, timestamp: 0 } });
618+
619+
const sniffCalls = vi.mocked(logger.info).mock.calls.filter(([msg]) => typeof msg === 'string' && msg.startsWith('First client frame sniff:'));
620+
expect(sniffCalls).toHaveLength(2);
621+
expect(sniffCalls[0][0]).toContain('tag=tag1');
622+
expect(sniffCalls[1][0]).toContain('tag=tag2');
623+
});
624+
625+
it('does not sniff or count empty-payload frames, and retries the sniff on the next real frame', () => {
626+
const proxy = new TranscriberProxy(mockWebSocket, options);
627+
proxy.handleStartEvent({ event: 'start', start: { tag: 'tag1', mediaFormat: { encoding: 'opus' } } });
628+
vi.mocked(logger.info).mockClear();
629+
630+
// Missing payload → should not log, should not flip the flag
631+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', chunk: 0, timestamp: 0 } });
632+
// Empty-string payload → same
633+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: '', chunk: 1, timestamp: 0 } });
634+
635+
let sniffCalls = vi.mocked(logger.info).mock.calls.filter(([msg]) => typeof msg === 'string' && msg.startsWith('First client frame sniff:'));
636+
expect(sniffCalls).toHaveLength(0);
637+
638+
// Real audio frame → sniff now fires (not short-circuited by prior empty frames)
639+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 2, timestamp: 0 } });
640+
641+
sniffCalls = vi.mocked(logger.info).mock.calls.filter(([msg]) => typeof msg === 'string' && msg.startsWith('First client frame sniff:'));
642+
expect(sniffCalls).toHaveLength(1);
643+
644+
// Session-end summary reflects that only the real frame was counted as audio
645+
vi.mocked(logger.info).mockClear();
646+
proxy.close();
647+
const endCall = vi.mocked(logger.info).mock.calls.find(([msg]) => typeof msg === 'string' && msg.startsWith('Session ended:'));
648+
expect(endCall?.[0]).toContain('audioPackets=1');
649+
});
650+
651+
it('fires the first-frame sniff again after a WebSocket reattach', () => {
652+
const proxy = new TranscriberProxy(mockWebSocket, options);
653+
proxy.handleStartEvent({ event: 'start', start: { tag: 'tag1', mediaFormat: { encoding: 'opus' } } });
654+
655+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 0, timestamp: 0 } });
656+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 1, timestamp: 0 } });
657+
658+
vi.mocked(logger.info).mockClear();
659+
proxy.reattachWebSocket({ addEventListener: vi.fn(), send: vi.fn(), close: vi.fn() } as any);
660+
661+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 0, timestamp: 0 } });
662+
const sniffCalls = vi.mocked(logger.info).mock.calls.filter(([msg]) => typeof msg === 'string' && msg.startsWith('First client frame sniff:'));
663+
expect(sniffCalls).toHaveLength(1);
664+
});
665+
666+
it('emits a session-end summary with audioPackets, interims, finals, and provider', () => {
667+
const proxy = new TranscriberProxy(mockWebSocket, { ...options, provider: 'deepgram' });
668+
proxy.handleStartEvent({ event: 'start', start: { tag: 'tag1', mediaFormat: { encoding: 'opus' } } });
669+
const conn = vi.mocked(OutgoingConnection).mock.instances[0] as any;
670+
671+
// 3 audio packets
672+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 0, timestamp: 0 } });
673+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 1, timestamp: 0 } });
674+
proxy.handleMediaEvent({ event: 'media', media: { tag: 'tag1', payload: OGG_PAYLOAD, chunk: 2, timestamp: 0 } });
675+
676+
// 2 interims + 1 final via the connection callbacks
677+
conn.onInterimTranscription({ transcript: [], is_interim: true, message_id: 'a', type: 'transcription-result', event: 'transcription-result', participant: { id: 'tag1' }, timestamp: 0 });
678+
conn.onInterimTranscription({ transcript: [], is_interim: true, message_id: 'b', type: 'transcription-result', event: 'transcription-result', participant: { id: 'tag1' }, timestamp: 0 });
679+
conn.onCompleteTranscription({ transcript: [{ text: 'hi' }], is_interim: false, message_id: 'c', type: 'transcription-result', event: 'transcription-result', participant: { id: 'tag1' }, timestamp: 0 });
680+
681+
vi.mocked(logger.info).mockClear();
682+
proxy.close();
683+
684+
const endCall = vi.mocked(logger.info).mock.calls.find(([msg]) => typeof msg === 'string' && msg.startsWith('Session ended:'));
685+
expect(endCall).toBeDefined();
686+
const endMsg = endCall![0] as string;
687+
expect(endMsg).toContain('provider=deepgram');
688+
expect(endMsg).toContain('audioPackets=3');
689+
expect(endMsg).toContain('interims=2');
690+
expect(endMsg).toContain('finals=1');
691+
expect(endMsg).toMatch(/durationSec=\d+\.\d/);
692+
});
693+
});
581694
});

worker/env.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export interface Env {
3434
PROVIDERS_PRIORITY?: string;
3535
FORCE_COMMIT_TIMEOUT?: string;
3636
DEBUG?: string;
37+
LOG_LEVEL?: string;
3738
ROUTING_MODE?: string;
3839
CONTAINER_POOL_SIZE?: string;
3940
MAX_CONNECTIONS_PER_CONTAINER?: string;

worker/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ function buildContainerEnvVars(env: Env): Record<string, string> {
5656
ENABLE_OPENAI_CUSTOM_PROVIDER: env.ENABLE_OPENAI_CUSTOM_PROVIDER || 'false',
5757
OPENAI_CUSTOM_REQUIRE_WSS: env.OPENAI_CUSTOM_REQUIRE_WSS || 'true',
5858
FORCE_COMMIT_TIMEOUT: env.FORCE_COMMIT_TIMEOUT || '2',
59-
DEBUG: env.DEBUG || 'true',
59+
DEBUG: env.DEBUG || 'false',
60+
// If DEBUG=true is explicitly set on the Worker env, default LOG_LEVEL to
61+
// 'debug' unless an operator overrides it (e.g. LOG_LEVEL=info).
62+
LOG_LEVEL: env.LOG_LEVEL || (env.DEBUG === 'true' ? 'debug' : 'info'),
6063
ROUTING_MODE: env.ROUTING_MODE || 'session',
6164
CONTAINER_POOL_SIZE: env.CONTAINER_POOL_SIZE || '5',
6265
MAX_CONNECTIONS_PER_CONTAINER: env.MAX_CONNECTIONS_PER_CONTAINER || '10',

0 commit comments

Comments
 (0)