diff --git a/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java b/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java new file mode 100644 index 000000000..5b2b03a33 --- /dev/null +++ b/core/src/main/java/com/google/adk/models/chat/ChatCompletionsHttpClient.java @@ -0,0 +1,256 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.models.chat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.adk.JsonBaseModel; +import com.google.adk.models.LlmRequest; +import com.google.adk.models.LlmResponse; +import com.google.common.collect.ImmutableMap; +import com.google.genai.types.HttpOptions; +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableEmitter; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An HTTP client for interacting with OpenAI-compatible chat completions endpoints. + * + *

Supports both non-streaming responses (single {@link LlmResponse} emission) and streaming + * Server-Sent Events (SSE) responses (multiple incremental {@link LlmResponse} emissions). See the + * OpenAI Chat Completions API + * reference for the wire protocol. + */ +public class ChatCompletionsHttpClient { + private static final Logger logger = LoggerFactory.getLogger(ChatCompletionsHttpClient.class); + private static final ObjectMapper objectMapper = JsonBaseModel.getMapper(); + + private static final MediaType JSON = MediaType.get("application/json; charset=utf-8"); + + /** + * Default OkHttp call timeout used when the caller does not supply an {@link HttpOptions} + * timeout. Five minutes is long enough for most non-streaming completions and short enough to + * prevent indefinite hangs in the common case where the caller does not configure timeouts. + * Callers who need infinite (e.g. long batch jobs or open streams) can opt in by passing an + * {@link HttpOptions} with {@code timeout() == 0}. + */ + private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(5); + + /** + * Shared OkHttpClient instance whose connection pool and thread dispatcher are reused across all + * {@link ChatCompletionsHttpClient} instances. Each instance forks this client via {@link + * OkHttpClient#newBuilder()} to apply per-instance timeouts without leaking pools. + */ + private static final OkHttpClient SHARED_POOL_CLIENT = new OkHttpClient(); + + private final OkHttpClient client; + private final HttpUrl completionsUrl; + private final ImmutableMap headers; + + /** + * Constructs a new {@link ChatCompletionsHttpClient} that facilitates API interaction with the + * standard {@code /chat/completions} REST endpoint. + * + *

All configuration is sourced from the supplied {@link HttpOptions}: + * + *

+ * + *

Example: + * + *

{@code
+   * HttpOptions options =
+   *     HttpOptions.builder()
+   *         .baseUrl("https://example.com/v1/")
+   *         .headers(ImmutableMap.of("Authorization", "Bearer my-token"))
+   *         .timeout(30_000)
+   *         .build();
+   * ChatCompletionsHttpClient client = new ChatCompletionsHttpClient(options);
+   * }
+ * + * @param httpOptions HTTP configuration. Must not be {@code null}, and {@link + * HttpOptions#baseUrl()} must be present and parseable as an HTTP(S) URL. + * @throws IllegalArgumentException if {@code httpOptions.baseUrl()} is missing or is not a valid + * HTTP(S) URL. + */ + public ChatCompletionsHttpClient(HttpOptions httpOptions) { + Objects.requireNonNull(httpOptions, "httpOptions cannot be null"); + String baseUrl = + httpOptions + .baseUrl() + .orElseThrow(() -> new IllegalArgumentException("httpOptions.baseUrl() must be set")); + HttpUrl parsedBaseUrl = HttpUrl.parse(baseUrl); + if (parsedBaseUrl == null) { + throw new IllegalArgumentException( + "httpOptions.baseUrl() is not a valid HTTP(S) URL: " + baseUrl); + } + // Pre-build the completions URL once. HttpUrl.addPathSegment handles trailing slashes, + // percent-encoding, and existing path components on baseUrl deterministically. + this.completionsUrl = + parsedBaseUrl.newBuilder().addPathSegment("chat").addPathSegment("completions").build(); + // Defensive copy of caller-supplied headers; absent is treated as no extra headers. + this.headers = + httpOptions + .headers() + .>map(ImmutableMap::copyOf) + .orElse(ImmutableMap.of()); + + // Apply custom timeouts per instance. All internal timeouts are bounded by callTimeout. + OkHttpClient.Builder builder = SHARED_POOL_CLIENT.newBuilder(); + builder.connectTimeout(Duration.ZERO); + builder.readTimeout(Duration.ZERO); + builder.writeTimeout(Duration.ZERO); + builder.callTimeout(resolveCallTimeout(httpOptions)); + this.client = builder.build(); + } + + /** Resolves the call timeout from HttpOptions. */ + private static Duration resolveCallTimeout(HttpOptions httpOptions) { + if (httpOptions.timeout().isEmpty()) { + return DEFAULT_CALL_TIMEOUT; + } + long timeoutMs = httpOptions.timeout().get(); + // 0 is treated as no timeout (Duration.ZERO). + return timeoutMs == 0L ? Duration.ZERO : Duration.ofMillis(timeoutMs); + } + + /** + * Generates a conversational response from the chat completions endpoint based on the provided + * messages. This encapsulates building the HTTP payload, sending the request to the completions + * endpoint, and initiating the handling of complete calls. + * + * @param llmRequest The request containing the model, configuration, and sequence of messages. + * @param stream Whether to request a streaming response. + * @return A {@link Flowable} emitting the discrete (or combined) {@link LlmResponse} objects. + */ + public Flowable complete(LlmRequest llmRequest, boolean stream) { + return Flowable.defer( + () -> { + ChatCompletionsRequest dtoRequest = + ChatCompletionsRequest.fromLlmRequest(llmRequest, stream); + String jsonPayload = objectMapper.writeValueAsString(dtoRequest); + logger.trace( + "Chat Completion Request: model={}, stream={}, messagesCount={}", + dtoRequest.model, + dtoRequest.stream, + dtoRequest.messages != null ? dtoRequest.messages.size() : 0); + + Request.Builder requestBuilder = + new Request.Builder().url(completionsUrl).post(RequestBody.create(jsonPayload, JSON)); + + for (Map.Entry entry : headers.entrySet()) { + requestBuilder.addHeader(entry.getKey(), entry.getValue()); + } + // Defensively force Content-Type to JSON by replacing instead of appending. + requestBuilder.header("Content-Type", JSON.toString()); + + Request request = requestBuilder.build(); + if (stream) { + return createStreamingFlowable(request); + } else { + return createNonStreamingFlowable(request); + } + }); + } + + /** Placeholder for streaming responses. Errors with {@link UnsupportedOperationException}. */ + @SuppressWarnings("UnusedVariable") + private Flowable createStreamingFlowable(Request request) { + return Flowable.error( + new UnsupportedOperationException("Streaming is not yet implemented in this client.")); + } + + /** + * Wraps an OkHttp {@link Callback} in a reactive {@link Flowable} for single-turn, non-streaming + * responses. + */ + private Flowable createNonStreamingFlowable(Request request) { + return Flowable.create( + emitter -> { + Call call = client.newCall(request); + emitter.setCancellable(call::cancel); + call.enqueue(new NonStreamingCallback(emitter)); + }, + BackpressureStrategy.BUFFER); + } + + /** + * Handles OkHttp failure and success callbacks, pushing {@link LlmResponse} results to the given + * emitter. + */ + private static final class NonStreamingCallback implements Callback { + private final FlowableEmitter emitter; + + NonStreamingCallback(FlowableEmitter emitter) { + this.emitter = emitter; + } + + @Override + public void onFailure(Call call, IOException e) { + emitter.tryOnError(e); + } + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody body = response.body()) { + if (!response.isSuccessful()) { + String bodyStr = body != null ? body.string() : ""; + emitter.tryOnError( + new IOException("Unexpected code " + response + " - body: " + bodyStr)); + return; + } + if (body == null) { + emitter.tryOnError(new IOException("Empty response body")); + return; + } + + String jsonResponse = body.string(); + ChatCompletionsResponse.ChatCompletion completion = + objectMapper.readValue(jsonResponse, ChatCompletionsResponse.ChatCompletion.class); + emitter.onNext(completion.toLlmResponse()); + emitter.onComplete(); + } catch (Exception e) { + emitter.tryOnError(e); + } + } + } +} diff --git a/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java b/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java new file mode 100644 index 000000000..175ca777e --- /dev/null +++ b/core/src/test/java/com/google/adk/models/chat/ChatCompletionsHttpClientTest.java @@ -0,0 +1,477 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.models.chat; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.adk.JsonBaseModel; +import com.google.adk.models.LlmRequest; +import com.google.adk.models.LlmResponse; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.genai.types.Content; +import com.google.genai.types.FinishReason; +import com.google.genai.types.HttpOptions; +import com.google.genai.types.Part; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import java.io.IOException; +import java.lang.reflect.Field; +import java.time.Duration; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.Buffer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public final class ChatCompletionsHttpClientTest { + private static final ObjectMapper objectMapper = JsonBaseModel.getMapper(); + private static final MediaType JSON = MediaType.get("application/json"); + + /** + * Bounded wait for {@link TestSubscriber#await} so a buggy callback wiring cannot hang the test + * JVM. The mock callbacks fire synchronously in the same thread, so this value is intentionally + * short -- on a successful run the await returns in microseconds, and on a hung run we fail fast + * instead of stalling the test suite. + */ + private static final Duration AWAIT_TIMEOUT = Duration.ofMillis(500); + + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock private OkHttpClient mockHttpClient; + @Mock private Call mockCall; + + private ChatCompletionsHttpClient client; + + @Before + public void setUp() throws Exception { + client = + new ChatCompletionsHttpClient( + HttpOptions.builder().baseUrl("https://example.com/").build()); + swapInMockHttpClient(client); + } + + /** + * Reflectively replaces the production {@link OkHttpClient} on a {@link + * ChatCompletionsHttpClient} with the test's mock so callbacks can be captured. Used by both + * setUp and tests that construct their own client (e.g. timeout tests, header tests). + */ + private void swapInMockHttpClient(ChatCompletionsHttpClient target) throws Exception { + when(mockHttpClient.newCall(any())).thenReturn(mockCall); + Field clientField = ChatCompletionsHttpClient.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(target, mockHttpClient); + } + + private Response createMockResponse(String body, MediaType mediaType) { + return createMockResponse(body, mediaType, 200, "OK"); + } + + private Response createMockResponse(String body, MediaType mediaType, int code, String message) { + Response.Builder builder = + new Response.Builder() + .request(new Request.Builder().url("https://example.com/chat/completions").build()) + .protocol(Protocol.HTTP_1_1) + .code(code) + .message(message); + // OkHttp's Response.Builder rejects a null body via its Kotlin @NotNull contract; omit + // the body() call entirely to model an empty/null response body. + if (body != null) { + builder.body(ResponseBody.create(body, mediaType)); + } + return builder.build(); + } + + /** Returns a minimal {@link LlmRequest} suitable for tests that don't care about the payload. */ + private static LlmRequest minimalRequest() { + return LlmRequest.builder() + .model("gpt-4") + .contents(ImmutableList.of(Content.builder().parts(Part.fromText("hello")).build())) + .build(); + } + + @Test + public void complete_nonStreaming_sendsCorrectPayload() throws Exception { + String responseBody = + """ + { + "choices": [ + { + "message": { + "role": "assistant", + "content": "Hi" + }, + "finish_reason": "stop" + } + ] + } + """; + + Response mockResponse = createMockResponse(responseBody, JSON); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + LlmResponse response = testSubscriber.values().get(0); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockHttpClient).newCall(requestCaptor.capture()); + Request capturedRequest = requestCaptor.getValue(); + assertThat(capturedRequest.url().encodedPath()).isEqualTo("/chat/completions"); + + Buffer buffer = new Buffer(); + capturedRequest.body().writeTo(buffer); + JsonNode requestBodyJson = objectMapper.readTree(buffer.readUtf8()); + assertThat(requestBodyJson.get("model").asText()).isEqualTo("gpt-4"); + assertThat(requestBodyJson.get("messages").get(0).get("role").asText()).isEqualTo("user"); + assertThat(requestBodyJson.get("messages").get(0).get("content").asText()).isEqualTo("hello"); + + LlmResponse expectedResponse = + LlmResponse.builder() + .content( + Content.builder() + .role("model") + .parts(ImmutableList.of(Part.fromText("Hi"))) + .build()) + .finishReason(new FinishReason(FinishReason.Known.STOP.toString())) + .customMetadata(ImmutableList.of()) + .build(); + + assertThat(response).isEqualTo(expectedResponse); + } + + @Test + public void complete_nonStreaming_propagateFailure() throws Exception { + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onFailure(mockCall, new IOException("Network Error")); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + testSubscriber.assertError(IOException.class); + } + + // -- Header, error-propagation, and timeout coverage. ---------------------------------- + + /** + * Verifies that an HTTP error status (e.g. 500) propagates as a stream error and that the error + * message includes the response body so callers can debug. Covers the {@code + * !response.isSuccessful()} branch of the non-streaming path. The streaming counterpart lives in + * the streaming follow-up CL. + */ + @Test + public void complete_nonStreaming_propagatesHttpErrorStatus() throws Exception { + Response mockResponse = + createMockResponse("{\"error\":\"server exploded\"}", JSON, 500, "Internal Server Error"); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + testSubscriber.assertError( + e -> + e instanceof IOException + && e.getMessage().contains("Unexpected code") + && e.getMessage().contains("server exploded")); + } + + /** + * Verifies that an empty response body propagates as a stream error rather than silently emitting + * an empty value. The exact exception class depends on OkHttp's behavior: + * + *
    + *
  • If OkHttp produces a {@code null} body, our code surfaces an {@link IOException} with the + * message {@code "Empty response body"}. + *
  • If OkHttp produces an empty (non-null) body, Jackson surfaces a {@link + * com.fasterxml.jackson.databind.exc.MismatchedInputException} ("No content to map"). + *
+ * + * Both outcomes satisfy the contract: empty body must NOT silently produce a successful empty + * {@link LlmResponse}. + */ + @Test + public void complete_nonStreaming_propagatesEmptyBody() throws Exception { + Response mockResponse = createMockResponse(null, JSON); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = client.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(Throwable.class); + } + + /** + * Verifies that caller-supplied headers reach the wire on the captured {@link Request}. This is + * the most common production failure mode (missing or wrong Authorization header), so it gets its + * own test rather than being implicit in other tests. + */ + @Test + public void complete_sendsCustomHeaders() throws Exception { + ChatCompletionsHttpClient clientWithHeaders = + new ChatCompletionsHttpClient( + HttpOptions.builder() + .baseUrl("https://example.com/") + .headers(ImmutableMap.of("Authorization", "Bearer test-token", "X-Custom", "value")) + .build()); + swapInMockHttpClient(clientWithHeaders); + + String responseBody = + """ + {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]} + """; + Response mockResponse = createMockResponse(responseBody, JSON); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = + clientWithHeaders.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockHttpClient).newCall(requestCaptor.capture()); + Request capturedRequest = requestCaptor.getValue(); + assertThat(capturedRequest.header("Authorization")).isEqualTo("Bearer test-token"); + assertThat(capturedRequest.header("X-Custom")).isEqualTo("value"); + // Content-Type is forced to application/json regardless of caller input. + assertThat(capturedRequest.header("Content-Type")).contains("application/json"); + } + + /** + * Verifies that even when a caller passes a conflicting {@code Content-Type} header, the client + * overrides it with {@code application/json} so the upstream API does not reject the request as a + * malformed payload. + */ + @Test + public void complete_overridesCallerContentType() throws Exception { + ChatCompletionsHttpClient clientWithBadHeader = + new ChatCompletionsHttpClient( + HttpOptions.builder() + .baseUrl("https://example.com/") + .headers(ImmutableMap.of("Content-Type", "text/plain")) + .build()); + swapInMockHttpClient(clientWithBadHeader); + + String responseBody = + """ + {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]} + """; + Response mockResponse = createMockResponse(responseBody, JSON); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = + clientWithBadHeader.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockHttpClient).newCall(requestCaptor.capture()); + Request capturedRequest = requestCaptor.getValue(); + // Should be exactly one Content-Type header, not two. + assertThat(capturedRequest.headers("Content-Type")).hasSize(1); + assertThat(capturedRequest.header("Content-Type")).contains("application/json"); + } + + /** + * Verifies that a {@code baseUrl} without a trailing slash still produces the correct {@code + * /chat/completions} path. {@link okhttp3.HttpUrl#newBuilder()} normalizes path segments + * regardless of the trailing-slash state of the base URL. + */ + @Test + public void complete_handlesBaseUrlWithoutTrailingSlash() throws Exception { + ChatCompletionsHttpClient clientNoSlash = + new ChatCompletionsHttpClient(HttpOptions.builder().baseUrl("https://example.com").build()); + swapInMockHttpClient(clientNoSlash); + + String responseBody = + """ + {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]} + """; + Response mockResponse = createMockResponse(responseBody, JSON); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = + clientNoSlash.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Request.class); + verify(mockHttpClient).newCall(requestCaptor.capture()); + assertThat(requestCaptor.getValue().url().encodedPath()).isEqualTo("/chat/completions"); + } + + /** + * Verifies that omitting {@code headers} on the supplied {@link HttpOptions} is treated as no + * extra headers, not as an NPE. + */ + @Test + public void constructor_missingHeaders_isTreatedAsEmpty() throws Exception { + ChatCompletionsHttpClient clientWithoutHeaders = + new ChatCompletionsHttpClient( + HttpOptions.builder().baseUrl("https://example.com/").build()); + swapInMockHttpClient(clientWithoutHeaders); + + String responseBody = + """ + {"choices":[{"message":{"role":"assistant","content":"Hi"},"finish_reason":"stop"}]} + """; + Response mockResponse = createMockResponse(responseBody, JSON); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Callback.class); + doNothing().when(mockCall).enqueue(callbackCaptor.capture()); + + TestSubscriber testSubscriber = + clientWithoutHeaders.complete(minimalRequest(), false).test(); + + callbackCaptor.getValue().onResponse(mockCall, mockResponse); + testSubscriber.await(AWAIT_TIMEOUT.toMillis(), MILLISECONDS); + + testSubscriber.assertNoErrors(); + testSubscriber.assertValueCount(1); + } + + /** Verifies that a {@code null} {@link HttpOptions} is rejected at construction time. */ + @Test + public void constructor_nullHttpOptions_throws() { + assertThrows(NullPointerException.class, () -> new ChatCompletionsHttpClient(null)); + } + + /** + * Verifies that an {@link HttpOptions} without a {@code baseUrl} is rejected at construction time + * as bad configuration. {@link IllegalArgumentException} (not NPE) is the conventional signal for + * missing required configuration. + */ + @Test + public void constructor_missingBaseUrl_throws() { + HttpOptions noBaseUrl = HttpOptions.builder().build(); + assertThrows(IllegalArgumentException.class, () -> new ChatCompletionsHttpClient(noBaseUrl)); + } + + /** + * Verifies that an {@link HttpOptions} with a malformed (non-HTTP(S)) {@code baseUrl} is rejected + * at construction time, rather than failing later at the first {@code complete()} call with a + * confusing NPE from {@link okhttp3.HttpUrl#parse}. + */ + @Test + public void constructor_malformedBaseUrl_throws() { + HttpOptions malformed = HttpOptions.builder().baseUrl("not a url").build(); + assertThrows(IllegalArgumentException.class, () -> new ChatCompletionsHttpClient(malformed)); + } + + // -- Tri-state timeout policy. ---------------------------------------------------------- + + /** + * Verifies that when {@code httpOptions} omits {@code timeout()}, the client applies the 5-minute + * default call timeout to prevent indefinite hangs in callers that did not explicitly configure a + * timeout. + */ + @Test + public void constructor_missingTimeout_appliesDefaultFiveMinuteTimeout() { + ChatCompletionsHttpClient defaultClient = + new ChatCompletionsHttpClient( + HttpOptions.builder().baseUrl("https://example.com/").build()); + + OkHttpClient internal = readInternalClient(defaultClient); + assertThat(internal.callTimeoutMillis()) + .isEqualTo((int) Duration.ofMinutes(5).toMillis()); // 300_000 + } + + /** + * Verifies that when the caller explicitly sets {@code httpOptions.timeout() == 0}, the client + * respects this as the explicit opt-in to infinite hang. This is the migration path for + * long-running streams or batch jobs that need no timeout. + */ + @Test + public void constructor_zeroTimeout_respectsInfiniteHang() { + HttpOptions zeroTimeout = + HttpOptions.builder().baseUrl("https://example.com/").timeout(0).build(); + ChatCompletionsHttpClient infiniteClient = new ChatCompletionsHttpClient(zeroTimeout); + + OkHttpClient internal = readInternalClient(infiniteClient); + assertThat(internal.callTimeoutMillis()).isEqualTo(0); // OkHttp: 0 = no timeout + } + + /** + * Verifies that when the caller sets a positive timeout, that value (in milliseconds) is used as + * the call timeout. + */ + @Test + public void constructor_explicitTimeout_appliesIt() { + HttpOptions tenSeconds = + HttpOptions.builder().baseUrl("https://example.com/").timeout(10_000).build(); + ChatCompletionsHttpClient timedClient = new ChatCompletionsHttpClient(tenSeconds); + + OkHttpClient internal = readInternalClient(timedClient); + assertThat(internal.callTimeoutMillis()).isEqualTo(10_000); + } + + /** Reflectively reads the internal {@link OkHttpClient} to inspect the resolved timeout. */ + private static OkHttpClient readInternalClient(ChatCompletionsHttpClient target) { + try { + Field clientField = ChatCompletionsHttpClient.class.getDeclaredField("client"); + clientField.setAccessible(true); + return (OkHttpClient) clientField.get(target); + } catch (ReflectiveOperationException e) { + throw new LinkageError("Failed to read internal client", e); + } + } +}