Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ static class AgentV2VoiceAssistant {
private final AtomicBoolean running = new AtomicBoolean(false);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);

private VoiceLiveSessionAsyncClient session;
private AudioProcessor audioProcessor;
// volatile: written by reactor thread (doOnSuccess), read by shutdown-hook thread
private volatile VoiceLiveSessionAsyncClient session;
private volatile AudioProcessor audioProcessor;

AgentV2VoiceAssistant(String endpoint, AgentSessionConfig agentConfig, String voice) {
this.endpoint = endpoint;
Expand Down Expand Up @@ -272,12 +273,14 @@ void start() {
// Subscribe to events
subscribeToEvents();
})
.doOnError(e -> {
System.err.println("Failed to connect: " + e.getMessage());
running.set(false);
shutdownLatch.countDown();
})
.subscribe();
.subscribe(
v -> {},
e -> {
System.err.println("Failed to connect: " + e.getMessage());
running.set(false);
shutdownLatch.countDown();
}
);

// Wait for shutdown
try {
Expand Down Expand Up @@ -330,23 +333,26 @@ private void configureSession() {
// Send session update
ClientEventSessionUpdate sessionUpdate = new ClientEventSessionUpdate(sessionOptions);
session.sendEvent(sessionUpdate)
.doOnSuccess(v -> System.out.println("Session configuration sent"))
.doOnError(e -> System.err.println("Failed to configure session: " + e.getMessage()))
.subscribe();
.subscribe(
v -> System.out.println("Session configuration sent"),
e -> System.err.println("Failed to configure session: " + e.getMessage())
);
}

private void subscribeToEvents() {
session.receiveEvents()
.doOnNext(this::handleEvent)
.doOnError(e -> {
System.err.println("Error receiving events: " + e.getMessage());
shutdown();
})
.doOnComplete(() -> {
System.out.println("Event stream completed");
shutdown();
})
.subscribe();
.subscribe(
v -> {},
e -> {
System.err.println("Error receiving events: " + e.getMessage());
shutdown();
}
);
}

private void handleEvent(SessionUpdate event) {
Expand Down Expand Up @@ -483,15 +489,16 @@ private void writeConversationLog(String message) {
static class AudioProcessor {
private final VoiceLiveSessionAsyncClient session;
private final AudioFormat format;
private final BlockingQueue<AudioPacket> playbackQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<AudioPacket> playbackQueue = new LinkedBlockingQueue<>(1000);
private final AtomicInteger nextSeqNum = new AtomicInteger(0);
private final AtomicInteger playbackBase = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);

private TargetDataLine inputLine;
private SourceDataLine outputLine;
private Thread captureThread;
private Thread playbackThread;
// volatile: written by reactor thread (startCapture/Playback), read/closed by shutdown-hook thread
private volatile TargetDataLine inputLine;
private volatile SourceDataLine outputLine;
private volatile Thread captureThread;
private volatile Thread playbackThread;

AudioProcessor(VoiceLiveSessionAsyncClient session) {
this.session = session;
Expand Down Expand Up @@ -532,8 +539,11 @@ private void captureLoop() {
? buffer.clone()
: Arrays.copyOf(buffer, bytesRead);

// Send audio to service (sendInputAudio takes byte[])
session.sendInputAudio(audioData).subscribe();
session.sendInputAudio(audioData)
.subscribe(
v -> {},
error -> System.err.println("Error sending audio: " + error.getMessage())
);
}
}
}
Expand Down Expand Up @@ -583,7 +593,10 @@ private void playbackLoop() {

void queueAudio(byte[] audioData) {
int seqNum = nextSeqNum.getAndIncrement();
playbackQueue.offer(new AudioPacket(seqNum, audioData));
// offer() returns false if the bounded queue is full; warn so a slow consumer is visible
if (!playbackQueue.offer(new AudioPacket(seqNum, audioData))) {
System.err.println("Warning: playback queue full, dropping audio packet seq=" + seqNum);
}
}

void skipPendingAudio() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.azure.ai.voicelive.models.SessionUpdateResponseAudioDelta;
import com.azure.ai.voicelive.models.UserMessageItem;
import com.azure.ai.voicelive.models.VoiceLiveSessionOptions;
import com.azure.core.credential.KeyCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.core.util.BinaryData;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -59,7 +59,7 @@
* <p><strong>Environment Variables Required:</strong></p>
* <ul>
* <li>AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL</li>
* <li>AZURE_VOICELIVE_API_KEY - The API key for authentication</li>
* <li>AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential</li>
* </ul>
*
* <p><strong>Audio Requirements:</strong></p>
Expand Down Expand Up @@ -87,12 +87,11 @@ public final class AudioPlaybackSample {
* @param args Unused command line arguments
*/
public static void main(String[] args) {
// Get credentials from environment variables
// Get endpoint from environment variable
String endpoint = System.getenv("AZURE_VOICELIVE_ENDPOINT");
String apiKey = System.getenv("AZURE_VOICELIVE_API_KEY");

if (endpoint == null || apiKey == null) {
System.err.println("Please set AZURE_VOICELIVE_ENDPOINT and AZURE_VOICELIVE_API_KEY environment variables");
if (endpoint == null) {
System.err.println("Please set AZURE_VOICELIVE_ENDPOINT environment variable");
return;
}

Expand All @@ -102,10 +101,12 @@ public static void main(String[] args) {
return;
}

// Create the VoiceLive client
// Create the VoiceLive client using DefaultAzureCredential (recommended).
// To use an API key instead:
// .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY")))
VoiceLiveAsyncClient client = new VoiceLiveClientBuilder()
.endpoint(endpoint)
.credential(new KeyCredential(apiKey))
.credential(new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();

System.out.println("Starting audio playback sample...");
Expand All @@ -123,7 +124,7 @@ public static void main(String[] args) {
.setInputAudioSamplingRate(SAMPLE_RATE);

// Audio playback components
final BlockingQueue<byte[]> audioQueue = new LinkedBlockingQueue<>();
final BlockingQueue<byte[]> audioQueue = new LinkedBlockingQueue<>(1000);
final AtomicBoolean isPlaying = new AtomicBoolean(false);
final SourceDataLine[] speakerRef = new SourceDataLine[1];

Expand All @@ -132,17 +133,16 @@ public static void main(String[] args) {
.flatMap(session -> {
System.out.println("✓ Session started");

// Send session configuration, then listen for events.
// Send session configuration, send text message, trigger response,
// then listen for events. Events are buffered by the SDK's receiveSink,
// so none are lost between sending and subscribing.
Comment thread
xitzhang marked this conversation as resolved.
Outdated
ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions);
return session.sendEvent(updateEvent)
.doOnSuccess(v -> {
System.out.println("\u2713 Session configured");
System.out.println(" Session configured");
// Start audio playback system
startPlayback(audioQueue, isPlaying, speakerRef);
})
.thenMany(session.receiveEvents()
.doOnNext(event -> handleEvent(event, audioQueue))
.doOnError(error -> System.err.println("Error: " + error.getMessage())))
.then(Mono.delay(Duration.ofMillis(500))) // Wait for session to be fully ready
.flatMap(v -> {
// Send a user message to trigger an audio response
Expand All @@ -162,7 +162,11 @@ public static void main(String[] args) {
ClientEventResponseCreate responseEvent = new ClientEventResponseCreate();
return session.sendEvent(responseEvent);
})
.then(Mono.delay(Duration.ofSeconds(10))) // Wait for audio response
.thenMany(session.receiveEvents()
.doOnNext(event -> handleEvent(event, audioQueue))
.doOnError(error -> System.err.println("Error: " + error.getMessage()))
.take(Duration.ofSeconds(10))) // Listen for 10 seconds then complete
.then()
.doFinally(signal -> System.out.println("\n✓ Sample completed - audio playback demonstrated"));
})
.doFinally(signalType -> {
Expand Down Expand Up @@ -284,8 +288,11 @@ private static void handleEvent(SessionUpdate event, BlockingQueue<byte[]> audio
SessionUpdateResponseAudioDelta audioEvent = (SessionUpdateResponseAudioDelta) event;
byte[] audioData = audioEvent.getDelta();
if (audioData != null && audioData.length > 0) {
audioQueue.offer(audioData);
System.out.println("🔊 Received audio chunk: " + audioData.length + " bytes");
if (!audioQueue.offer(audioData)) {
System.err.println("Warning: audio queue full, dropping chunk of " + audioData.length + " bytes");
} else {
System.out.println("🔊 Received audio chunk: " + audioData.length + " bytes");
}
}
}
} else if (eventType == ServerEventType.RESPONSE_AUDIO_DONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.azure.ai.voicelive.models.ServerEventType;
import com.azure.ai.voicelive.models.SessionUpdate;
import com.azure.ai.voicelive.models.VoiceLiveSessionOptions;
import com.azure.core.credential.KeyCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.core.util.BinaryData;

import java.util.Arrays;
Expand Down Expand Up @@ -40,7 +40,7 @@
* <p><strong>Environment Variables Required:</strong></p>
* <ul>
* <li>AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL</li>
* <li>AZURE_VOICELIVE_API_KEY - The API key for authentication</li>
* <li>AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential</li>
* </ul>
*
* <p><strong>How to Run:</strong></p>
Expand All @@ -56,19 +56,20 @@ public final class BasicVoiceConversationSample {
* @param args Unused command line arguments
*/
public static void main(String[] args) {
// Get credentials from environment variables
// Get endpoint from environment variable
String endpoint = System.getenv("AZURE_VOICELIVE_ENDPOINT");
String apiKey = System.getenv("AZURE_VOICELIVE_API_KEY");

if (endpoint == null || apiKey == null) {
System.err.println("Please set AZURE_VOICELIVE_ENDPOINT and AZURE_VOICELIVE_API_KEY environment variables");
if (endpoint == null) {
System.err.println("Please set AZURE_VOICELIVE_ENDPOINT environment variable");
return;
}

// Create the VoiceLive client
// Create the VoiceLive client using DefaultAzureCredential (recommended).
// To use an API key instead:
// .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY")))
VoiceLiveAsyncClient client = new VoiceLiveClientBuilder()
.endpoint(endpoint)
.credential(new KeyCredential(apiKey))
.credential(new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();

System.out.println("Starting basic voice conversation...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.azure.ai.voicelive.models.SessionUpdateSessionUpdated;
import com.azure.ai.voicelive.models.VoiceLiveSessionOptions;
import com.azure.ai.voicelive.models.VoiceLiveToolDefinition;
import com.azure.core.credential.KeyCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.core.util.BinaryData;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -70,7 +70,7 @@
* <p><strong>Environment Variables Required:</strong></p>
* <ul>
* <li>AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL</li>
* <li>AZURE_VOICELIVE_API_KEY - The API key for authentication</li>
* <li>AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential</li>
* </ul>
*
* <p><strong>How to Run:</strong></p>
Expand All @@ -90,7 +90,6 @@ public final class FunctionCallingSample {
// Service configuration
private static final String DEFAULT_MODEL = "gpt-realtime";
private static final String ENV_ENDPOINT = "AZURE_VOICELIVE_ENDPOINT";
private static final String ENV_API_KEY = "AZURE_VOICELIVE_API_KEY";

// Audio format constants
private static final int SAMPLE_RATE = 24000;
Expand All @@ -111,29 +110,24 @@ private FunctionCallingSample() {
public static void main(String[] args) {
// Load configuration
String endpoint = System.getenv(ENV_ENDPOINT);
String apiKey = System.getenv(ENV_API_KEY);

if (endpoint == null || endpoint.isEmpty()) {
System.err.println("Error: AZURE_VOICELIVE_ENDPOINT environment variable is not set.");
System.exit(1);
}

if (apiKey == null || apiKey.isEmpty()) {
System.err.println("Error: AZURE_VOICELIVE_API_KEY environment variable is not set.");
System.exit(1);
}

String separator = new String(new char[70]).replace("\0", "=");
System.out.println(separator);
System.out.println("🎤️ Voice Assistant with Function Calling - Azure VoiceLive SDK");
System.out.println(separator);

try {
// Create client
KeyCredential credential = new KeyCredential(apiKey);
// Create client using DefaultAzureCredential (recommended).
// To use an API key instead:
// .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY")))
VoiceLiveAsyncClient client = new VoiceLiveClientBuilder()
.endpoint(endpoint)
.credential(credential)
.credential(new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();

runFunctionCallingSession(client);
Expand Down Expand Up @@ -379,8 +373,10 @@ private static void handleServerEvent(

session.sendEvent(createItem)
.then(session.sendEvent(new ClientEventResponseCreate()))
.doOnSuccess(v -> System.out.println("🤖 Function result sent"))
.subscribe();
.subscribe(
v -> System.out.println("🤖 Function result sent"),
error -> System.err.println("❌ Failed to send function result: " + error.getMessage())
);
}
}

Expand Down Expand Up @@ -468,11 +464,12 @@ private static class AudioProcessor {
private final VoiceLiveSessionAsyncClient session;
private final AudioFormat audioFormat;

private TargetDataLine microphone;
// volatile: written by reactor thread (startCapture/Playback), read/closed by shutdown-hook thread
private volatile TargetDataLine microphone;
private final AtomicBoolean isCapturing = new AtomicBoolean(false);

private SourceDataLine speaker;
private final BlockingQueue<byte[]> playbackQueue = new LinkedBlockingQueue<>();
private volatile SourceDataLine speaker;
private final BlockingQueue<byte[]> playbackQueue = new LinkedBlockingQueue<>(1000);
private final AtomicBoolean isPlaying = new AtomicBoolean(false);

AudioProcessor(VoiceLiveSessionAsyncClient session) {
Expand All @@ -499,7 +496,11 @@ void startCapture() {
int bytesRead = microphone.read(buffer, 0, buffer.length);
if (bytesRead > 0) {
byte[] audioData = Arrays.copyOf(buffer, bytesRead);
session.sendInputAudio(BinaryData.fromBytes(audioData)).subscribe();
session.sendInputAudio(BinaryData.fromBytes(audioData))
.subscribe(
v -> {},
error -> System.err.println("Error sending audio: " + error.getMessage())
);
}
}
}, "AudioCapture").start();
Expand Down Expand Up @@ -527,12 +528,8 @@ void startPlayback() {
new Thread(() -> {
while (isPlaying.get()) {
try {
byte[] audioData = playbackQueue.poll();
if (audioData != null) {
speaker.write(audioData, 0, audioData.length);
} else {
Thread.sleep(10);
}
byte[] audioData = playbackQueue.take();
speaker.write(audioData, 0, audioData.length);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
Expand All @@ -553,7 +550,10 @@ void skipPendingAudio() {

void queueAudio(byte[] audioData) {
if (isPlaying.get()) {
playbackQueue.offer(audioData);
// offer() returns false if the bounded queue is full; warn so a slow consumer is visible
if (!playbackQueue.offer(audioData)) {
System.err.println("Warning: playback queue full, dropping audio chunk of " + audioData.length + " bytes");
}
}
}

Expand Down
Loading