diff --git a/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java b/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java
new file mode 100644
index 000000000..5b2b03a33
--- /dev/null
+++ b/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.adk.models.chat;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.adk.JsonBaseModel;
+import com.google.adk.models.LlmRequest;
+import com.google.adk.models.LlmResponse;
+import com.google.common.collect.ImmutableMap;
+import com.google.genai.types.HttpOptions;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An HTTP client for interacting with OpenAI-compatible chat completions endpoints.
+ *
+ *
Supports both non-streaming responses (single {@link LlmResponse} emission) and streaming
+ * Server-Sent Events (SSE) responses (multiple incremental {@link LlmResponse} emissions). See the
+ * OpenAI Chat Completions API
+ * reference for the wire protocol.
+ */
+public class ChatCompletionsHttpClient {
+ private static final Logger logger = LoggerFactory.getLogger(ChatCompletionsHttpClient.class);
+ private static final ObjectMapper objectMapper = JsonBaseModel.getMapper();
+
+ private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
+
+ /**
+ * Default OkHttp call timeout used when the caller does not supply an {@link HttpOptions}
+ * timeout. Five minutes is long enough for most non-streaming completions and short enough to
+ * prevent indefinite hangs in the common case where the caller does not configure timeouts.
+ * Callers who need infinite (e.g. long batch jobs or open streams) can opt in by passing an
+ * {@link HttpOptions} with {@code timeout() == 0}.
+ */
+ private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(5);
+
+ /**
+ * Shared OkHttpClient instance whose connection pool and thread dispatcher are reused across all
+ * {@link ChatCompletionsHttpClient} instances. Each instance forks this client via {@link
+ * OkHttpClient#newBuilder()} to apply per-instance timeouts without leaking pools.
+ */
+ private static final OkHttpClient SHARED_POOL_CLIENT = new OkHttpClient();
+
+ private final OkHttpClient client;
+ private final HttpUrl completionsUrl;
+ private final ImmutableMap headers;
+
+ /**
+ * Constructs a new {@link ChatCompletionsHttpClient} that facilitates API interaction with the
+ * standard {@code /chat/completions} REST endpoint.
+ *
+ * All configuration is sourced from the supplied {@link HttpOptions}:
+ *
+ *
+ * - {@link HttpOptions#baseUrl()} -- required. The base URL of the chat completions
+ * endpoint. The {@code chat/completions} path segments are appended automatically using
+ * {@link HttpUrl}, which handles trailing slashes and percent-encoding deterministically.
+ * Set via {@code HttpOptions.builder().baseUrl("https://...").build()}.
+ *
- {@link HttpOptions#headers()} -- optional. Extra HTTP headers to include in outgoing
+ * requests. The {@code Content-Type} header is set automatically and cannot be overridden.
+ * Set via {@code HttpOptions.builder().headers(Map.of("Authorization", "Bearer ...")) }.
+ *
- {@link HttpOptions#timeout()} -- optional. Per-call timeout in milliseconds. A missing
+ * timeout defaults to 5 minutes ({@link #DEFAULT_CALL_TIMEOUT}). A timeout of {@code 0} is
+ * respected as the explicit caller opt-in to infinite wait. Set via {@code
+ * HttpOptions.builder().timeout(10_000).build()}.
+ *
+ *
+ * Example:
+ *
+ *
{@code
+ * HttpOptions options =
+ * HttpOptions.builder()
+ * .baseUrl("https://example.com/v1/")
+ * .headers(ImmutableMap.of("Authorization", "Bearer my-token"))
+ * .timeout(30_000)
+ * .build();
+ * ChatCompletionsHttpClient client = new ChatCompletionsHttpClient(options);
+ * }
+ *
+ * @param httpOptions HTTP configuration. Must not be {@code null}, and {@link
+ * HttpOptions#baseUrl()} must be present and parseable as an HTTP(S) URL.
+ * @throws IllegalArgumentException if {@code httpOptions.baseUrl()} is missing or is not a valid
+ * HTTP(S) URL.
+ */
+ public ChatCompletionsHttpClient(HttpOptions httpOptions) {
+ Objects.requireNonNull(httpOptions, "httpOptions cannot be null");
+ String baseUrl =
+ httpOptions
+ .baseUrl()
+ .orElseThrow(() -> new IllegalArgumentException("httpOptions.baseUrl() must be set"));
+ HttpUrl parsedBaseUrl = HttpUrl.parse(baseUrl);
+ if (parsedBaseUrl == null) {
+ throw new IllegalArgumentException(
+ "httpOptions.baseUrl() is not a valid HTTP(S) URL: " + baseUrl);
+ }
+ // Pre-build the completions URL once. HttpUrl.addPathSegment handles trailing slashes,
+ // percent-encoding, and existing path components on baseUrl deterministically.
+ this.completionsUrl =
+ parsedBaseUrl.newBuilder().addPathSegment("chat").addPathSegment("completions").build();
+ // Defensive copy of caller-supplied headers; absent is treated as no extra headers.
+ this.headers =
+ httpOptions
+ .headers()
+ .>map(ImmutableMap::copyOf)
+ .orElse(ImmutableMap.of());
+
+ // Apply custom timeouts per instance. All internal timeouts are bounded by callTimeout.
+ OkHttpClient.Builder builder = SHARED_POOL_CLIENT.newBuilder();
+ builder.connectTimeout(Duration.ZERO);
+ builder.readTimeout(Duration.ZERO);
+ builder.writeTimeout(Duration.ZERO);
+ builder.callTimeout(resolveCallTimeout(httpOptions));
+ this.client = builder.build();
+ }
+
+ /** Resolves the call timeout from HttpOptions. */
+ private static Duration resolveCallTimeout(HttpOptions httpOptions) {
+ if (httpOptions.timeout().isEmpty()) {
+ return DEFAULT_CALL_TIMEOUT;
+ }
+ long timeoutMs = httpOptions.timeout().get();
+ // 0 is treated as no timeout (Duration.ZERO).
+ return timeoutMs == 0L ? Duration.ZERO : Duration.ofMillis(timeoutMs);
+ }
+
+ /**
+ * Generates a conversational response from the chat completions endpoint based on the provided
+ * messages. This encapsulates building the HTTP payload, sending the request to the completions
+ * endpoint, and initiating the handling of complete calls.
+ *
+ * @param llmRequest The request containing the model, configuration, and sequence of messages.
+ * @param stream Whether to request a streaming response.
+ * @return A {@link Flowable} emitting the discrete (or combined) {@link LlmResponse} objects.
+ */
+ public Flowable complete(LlmRequest llmRequest, boolean stream) {
+ return Flowable.defer(
+ () -> {
+ ChatCompletionsRequest dtoRequest =
+ ChatCompletionsRequest.fromLlmRequest(llmRequest, stream);
+ String jsonPayload = objectMapper.writeValueAsString(dtoRequest);
+ logger.trace(
+ "Chat Completion Request: model={}, stream={}, messagesCount={}",
+ dtoRequest.model,
+ dtoRequest.stream,
+ dtoRequest.messages != null ? dtoRequest.messages.size() : 0);
+
+ Request.Builder requestBuilder =
+ new Request.Builder().url(completionsUrl).post(RequestBody.create(jsonPayload, JSON));
+
+ for (Map.Entry entry : headers.entrySet()) {
+ requestBuilder.addHeader(entry.getKey(), entry.getValue());
+ }
+ // Defensively force Content-Type to JSON by replacing instead of appending.
+ requestBuilder.header("Content-Type", JSON.toString());
+
+ Request request = requestBuilder.build();
+ if (stream) {
+ return createStreamingFlowable(request);
+ } else {
+ return createNonStreamingFlowable(request);
+ }
+ });
+ }
+
+ /** Placeholder for streaming responses. Errors with {@link UnsupportedOperationException}. */
+ @SuppressWarnings("UnusedVariable")
+ private Flowable createStreamingFlowable(Request request) {
+ return Flowable.error(
+ new UnsupportedOperationException("Streaming is not yet implemented in this client."));
+ }
+
+ /**
+ * Wraps an OkHttp {@link Callback} in a reactive {@link Flowable} for single-turn, non-streaming
+ * responses.
+ */
+ private Flowable createNonStreamingFlowable(Request request) {
+ return Flowable.create(
+ emitter -> {
+ Call call = client.newCall(request);
+ emitter.setCancellable(call::cancel);
+ call.enqueue(new NonStreamingCallback(emitter));
+ },
+ BackpressureStrategy.BUFFER);
+ }
+
+ /**
+ * Handles OkHttp failure and success callbacks, pushing {@link LlmResponse} results to the given
+ * emitter.
+ */
+ private static final class NonStreamingCallback implements Callback {
+ private final FlowableEmitter emitter;
+
+ NonStreamingCallback(FlowableEmitter emitter) {
+ this.emitter = emitter;
+ }
+
+ @Override
+ public void onFailure(Call call, IOException e) {
+ emitter.tryOnError(e);
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) {
+ try (ResponseBody body = response.body()) {
+ if (!response.isSuccessful()) {
+ String bodyStr = body != null ? body.string() : "";
+ emitter.tryOnError(
+ new IOException("Unexpected code " + response + " - body: " + bodyStr));
+ return;
+ }
+ if (body == null) {
+ emitter.tryOnError(new IOException("Empty response body"));
+ return;
+ }
+
+ String jsonResponse = body.string();
+ ChatCompletionsResponse.ChatCompletion completion =
+ objectMapper.readValue(jsonResponse, ChatCompletionsResponse.ChatCompletion.class);
+ emitter.onNext(completion.toLlmResponse());
+ emitter.onComplete();
+ } catch (Exception e) {
+ emitter.tryOnError(e);
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java b/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java
new file mode 100644
index 000000000..175ca777e
--- /dev/null
+++ b/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java
@@ -0,0 +1,477 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.adk.models.chat;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.adk.JsonBaseModel;
+import com.google.adk.models.LlmRequest;
+import com.google.adk.models.LlmResponse;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.genai.types.Content;
+import com.google.genai.types.FinishReason;
+import com.google.genai.types.HttpOptions;
+import com.google.genai.types.Part;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public final class ChatCompletionsHttpClientTest {
+ private static final ObjectMapper objectMapper = JsonBaseModel.getMapper();
+ private static final MediaType JSON = MediaType.get("application/json");
+
+ /**
+ * Bounded wait for {@link TestSubscriber#await} so a buggy callback wiring cannot hang the test
+ * JVM. The mock callbacks fire synchronously in the same thread, so this value is intentionally
+ * short -- on a successful run the await returns in microseconds, and on a hung run we fail fast
+ * instead of stalling the test suite.
+ */
+ private static final Duration AWAIT_TIMEOUT = Duration.ofMillis(500);
+
+ @Rule public final MockitoRule mocks = MockitoJUnit.rule();
+
+ @Mock private OkHttpClient mockHttpClient;
+ @Mock private Call mockCall;
+
+ private ChatCompletionsHttpClient client;
+
+ @Before
+ public void setUp() throws Exception {
+ client =
+ new ChatCompletionsHttpClient(
+ HttpOptions.builder().baseUrl("https://example.com/").build());
+ swapInMockHttpClient(client);
+ }
+
+ /**
+ * Reflectively replaces the production {@link OkHttpClient} on a {@link
+ * ChatCompletionsHttpClient} with the test's mock so callbacks can be captured. Used by both
+ * setUp and tests that construct their own client (e.g. timeout tests, header tests).
+ */
+ private void swapInMockHttpClient(ChatCompletionsHttpClient target) throws Exception {
+ when(mockHttpClient.newCall(any())).thenReturn(mockCall);
+ Field clientField = ChatCompletionsHttpClient.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ clientField.set(target, mockHttpClient);
+ }
+
+ private Response createMockResponse(String body, MediaType mediaType) {
+ return createMockResponse(body, mediaType, 200, "OK");
+ }
+
+ private Response createMockResponse(String body, MediaType mediaType, int code, String message) {
+ Response.Builder builder =
+ new Response.Builder()
+ .request(new Request.Builder().url("https://example.com/chat/completions").build())
+ .protocol(Protocol.HTTP_1_1)
+ .code(code)
+ .message(message);
+ // OkHttp's Response.Builder rejects a null body via its Kotlin @NotNull contract; omit
+ // the body() call entirely to model an empty/null response body.
+ if (body != null) {
+ builder.body(ResponseBody.create(body, mediaType));
+ }
+ return builder.build();
+ }
+
+ /** Returns a minimal {@link LlmRequest} suitable for tests that don't care about the payload. */
+ private static LlmRequest minimalRequest() {
+ return LlmRequest.builder()
+ .model("gpt-4")
+ .contents(ImmutableList.of(Content.builder().parts(Part.fromText("hello")).build()))
+ .build();
+ }
+
+ @Test
+ public void complete_nonStreaming_sendsCorrectPayload() throws Exception {
+ String responseBody =
+ """
+ {
+ "choices": [
+ {
+ "message": {
+ "role": "assistant",
+ "content": "Hi"
+ },
+ "finish_reason": "stop"
+ }
+ ]
+ }
+ """;
+
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ LlmResponse response = testSubscriber.values().get(0);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ Request capturedRequest = requestCaptor.getValue();
+ assertThat(capturedRequest.url().encodedPath()).isEqualTo("/chat/completions");
+
+ Buffer buffer = new Buffer();
+ capturedRequest.body().writeTo(buffer);
+ JsonNode requestBodyJson = objectMapper.readTree(buffer.readUtf8());
+ assertThat(requestBodyJson.get("model").asText()).isEqualTo("gpt-4");
+ assertThat(requestBodyJson.get("messages").get(0).get("role").asText()).isEqualTo("user");
+ assertThat(requestBodyJson.get("messages").get(0).get("content").asText()).isEqualTo("hello");
+
+ LlmResponse expectedResponse =
+ LlmResponse.builder()
+ .content(
+ Content.builder()
+ .role("model")
+ .parts(ImmutableList.of(Part.fromText("Hi")))
+ .build())
+ .finishReason(new FinishReason(FinishReason.Known.STOP.toString()))
+ .customMetadata(ImmutableList.of())
+ .build();
+
+ assertThat(response).isEqualTo(expectedResponse);
+ }
+
+ @Test
+ public void complete_nonStreaming_propagateFailure() throws Exception {
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onFailure(mockCall, new IOException("Network Error"));
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ testSubscriber.assertError(IOException.class);
+ }
+
+ // -- Header, error-propagation, and timeout coverage. ----------------------------------
+
+ /**
+ * Verifies that an HTTP error status (e.g. 500) propagates as a stream error and that the error
+ * message includes the response body so callers can debug. Covers the {@code
+ * !response.isSuccessful()} branch of the non-streaming path. The streaming counterpart lives in
+ * the streaming follow-up CL.
+ */
+ @Test
+ public void complete_nonStreaming_propagatesHttpErrorStatus() throws Exception {
+ Response mockResponse =
+ createMockResponse("{\"error\":\"server exploded\"}", JSON, 500, "Internal Server Error");
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ testSubscriber.assertError(
+ e ->
+ e instanceof IOException
+ && e.getMessage().contains("Unexpected code")
+ && e.getMessage().contains("server exploded"));
+ }
+
+ /**
+ * Verifies that an empty response body propagates as a stream error rather than silently emitting
+ * an empty value. The exact exception class depends on OkHttp's behavior:
+ *
+ *
+ * - If OkHttp produces a {@code null} body, our code surfaces an {@link IOException} with the
+ * message {@code "Empty response body"}.
+ *
- If OkHttp produces an empty (non-null) body, Jackson surfaces a {@link
+ * com.fasterxml.jackson.databind.exc.MismatchedInputException} ("No content to map").
+ *
+ *
+ * Both outcomes satisfy the contract: empty body must NOT silently produce a successful empty
+ * {@link LlmResponse}.
+ */
+ @Test
+ public void complete_nonStreaming_propagatesEmptyBody() throws Exception {
+ Response mockResponse = createMockResponse(null, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ testSubscriber.assertNoValues();
+ testSubscriber.assertError(Throwable.class);
+ }
+
+ /**
+ * Verifies that caller-supplied headers reach the wire on the captured {@link Request}. This is
+ * the most common production failure mode (missing or wrong Authorization header), so it gets its
+ * own test rather than being implicit in other tests.
+ */
+ @Test
+ public void complete_sendsCustomHeaders() throws Exception {
+ ChatCompletionsHttpClient clientWithHeaders =
+ new ChatCompletionsHttpClient(
+ HttpOptions.builder()
+ .baseUrl("https://example.com/")
+ .headers(ImmutableMap.of("Authorization", "Bearer test-token", "X-Custom", "value"))
+ .build());
+ swapInMockHttpClient(clientWithHeaders);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientWithHeaders.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ Request capturedRequest = requestCaptor.getValue();
+ assertThat(capturedRequest.header("Authorization")).isEqualTo("Bearer test-token");
+ assertThat(capturedRequest.header("X-Custom")).isEqualTo("value");
+ // Content-Type is forced to application/json regardless of caller input.
+ assertThat(capturedRequest.header("Content-Type")).contains("application/json");
+ }
+
+ /**
+ * Verifies that even when a caller passes a conflicting {@code Content-Type} header, the client
+ * overrides it with {@code application/json} so the upstream API does not reject the request as a
+ * malformed payload.
+ */
+ @Test
+ public void complete_overridesCallerContentType() throws Exception {
+ ChatCompletionsHttpClient clientWithBadHeader =
+ new ChatCompletionsHttpClient(
+ HttpOptions.builder()
+ .baseUrl("https://example.com/")
+ .headers(ImmutableMap.of("Content-Type", "text/plain"))
+ .build());
+ swapInMockHttpClient(clientWithBadHeader);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientWithBadHeader.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ Request capturedRequest = requestCaptor.getValue();
+ // Should be exactly one Content-Type header, not two.
+ assertThat(capturedRequest.headers("Content-Type")).hasSize(1);
+ assertThat(capturedRequest.header("Content-Type")).contains("application/json");
+ }
+
+ /**
+ * Verifies that a {@code baseUrl} without a trailing slash still produces the correct {@code
+ * /chat/completions} path. {@link okhttp3.HttpUrl#newBuilder()} normalizes path segments
+ * regardless of the trailing-slash state of the base URL.
+ */
+ @Test
+ public void complete_handlesBaseUrlWithoutTrailingSlash() throws Exception {
+ ChatCompletionsHttpClient clientNoSlash =
+ new ChatCompletionsHttpClient(HttpOptions.builder().baseUrl("https://example.com").build());
+ swapInMockHttpClient(clientNoSlash);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientNoSlash.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class);
+ verify(mockHttpClient).newCall(requestCaptor.capture());
+ assertThat(requestCaptor.getValue().url().encodedPath()).isEqualTo("/chat/completions");
+ }
+
+ /**
+ * Verifies that omitting {@code headers} on the supplied {@link HttpOptions} is treated as no
+ * extra headers, not as an NPE.
+ */
+ @Test
+ public void constructor_missingHeaders_isTreatedAsEmpty() throws Exception {
+ ChatCompletionsHttpClient clientWithoutHeaders =
+ new ChatCompletionsHttpClient(
+ HttpOptions.builder().baseUrl("https://example.com/").build());
+ swapInMockHttpClient(clientWithoutHeaders);
+
+ String responseBody =
+ """
+ {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]}
+ """;
+ Response mockResponse = createMockResponse(responseBody, JSON);
+
+ ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class);
+ doNothing().when(mockCall).enqueue(callbackCaptor.capture());
+
+ TestSubscriber testSubscriber =
+ clientWithoutHeaders.complete(minimalRequest(), false).test();
+
+ callbackCaptor.getValue().onResponse(mockCall, mockResponse);
+ testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS);
+
+ testSubscriber.assertNoErrors();
+ testSubscriber.assertValueCount(1);
+ }
+
+ /** Verifies that a {@code null} {@link HttpOptions} is rejected at construction time. */
+ @Test
+ public void constructor_nullHttpOptions_throws() {
+ assertThrows(NullPointerException.class, () -> new ChatCompletionsHttpClient(null));
+ }
+
+ /**
+ * Verifies that an {@link HttpOptions} without a {@code baseUrl} is rejected at construction time
+ * as bad configuration. {@link IllegalArgumentException} (not NPE) is the conventional signal for
+ * missing required configuration.
+ */
+ @Test
+ public void constructor_missingBaseUrl_throws() {
+ HttpOptions noBaseUrl = HttpOptions.builder().build();
+ assertThrows(IllegalArgumentException.class, () -> new ChatCompletionsHttpClient(noBaseUrl));
+ }
+
+ /**
+ * Verifies that an {@link HttpOptions} with a malformed (non-HTTP(S)) {@code baseUrl} is rejected
+ * at construction time, rather than failing later at the first {@code complete()} call with a
+ * confusing NPE from {@link okhttp3.HttpUrl#parse}.
+ */
+ @Test
+ public void constructor_malformedBaseUrl_throws() {
+ HttpOptions malformed = HttpOptions.builder().baseUrl("not a url").build();
+ assertThrows(IllegalArgumentException.class, () -> new ChatCompletionsHttpClient(malformed));
+ }
+
+ // -- Tri-state timeout policy. ----------------------------------------------------------
+
+ /**
+ * Verifies that when {@code httpOptions} omits {@code timeout()}, the client applies the 5-minute
+ * default call timeout to prevent indefinite hangs in callers that did not explicitly configure a
+ * timeout.
+ */
+ @Test
+ public void constructor_missingTimeout_appliesDefaultFiveMinuteTimeout() {
+ ChatCompletionsHttpClient defaultClient =
+ new ChatCompletionsHttpClient(
+ HttpOptions.builder().baseUrl("https://example.com/").build());
+
+ OkHttpClient internal = readInternalClient(defaultClient);
+ assertThat(internal.callTimeoutMillis())
+ .isEqualTo((int) Duration.ofMinutes(5).toMillis()); // 300_000
+ }
+
+ /**
+ * Verifies that when the caller explicitly sets {@code httpOptions.timeout() == 0}, the client
+ * respects this as the explicit opt-in to infinite hang. This is the migration path for
+ * long-running streams or batch jobs that need no timeout.
+ */
+ @Test
+ public void constructor_zeroTimeout_respectsInfiniteHang() {
+ HttpOptions zeroTimeout =
+ HttpOptions.builder().baseUrl("https://example.com/").timeout(0).build();
+ ChatCompletionsHttpClient infiniteClient = new ChatCompletionsHttpClient(zeroTimeout);
+
+ OkHttpClient internal = readInternalClient(infiniteClient);
+ assertThat(internal.callTimeoutMillis()).isEqualTo(0); // OkHttp: 0 = no timeout
+ }
+
+ /**
+ * Verifies that when the caller sets a positive timeout, that value (in milliseconds) is used as
+ * the call timeout.
+ */
+ @Test
+ public void constructor_explicitTimeout_appliesIt() {
+ HttpOptions tenSeconds =
+ HttpOptions.builder().baseUrl("https://example.com/").timeout(10_000).build();
+ ChatCompletionsHttpClient timedClient = new ChatCompletionsHttpClient(tenSeconds);
+
+ OkHttpClient internal = readInternalClient(timedClient);
+ assertThat(internal.callTimeoutMillis()).isEqualTo(10_000);
+ }
+
+ /** Reflectively reads the internal {@link OkHttpClient} to inspect the resolved timeout. */
+ private static OkHttpClient readInternalClient(ChatCompletionsHttpClient target) {
+ try {
+ Field clientField = ChatCompletionsHttpClient.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ return (OkHttpClient) clientField.get(target);
+ } catch (ReflectiveOperationException e) {
+ throw new LinkageError("Failed to read internal client", e);
+ }
+ }
+}