Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlin.jvm.optionals.getOrNull
internal data class AutoBatchIngestLimits(
val batchSizeLimit: Int = AutoBatchQueue.DEFAULT_BATCH_SIZE_LIMIT,
val batchSizeLimitBytes: Int = AutoBatchQueue.DEFAULT_BATCH_SIZE_LIMIT_BYTES,
val useMultipartEndpoint: Boolean = true,
)

internal fun BatchIngestConfig?.toAutoBatchIngestLimits(): AutoBatchIngestLimits =
Expand All @@ -16,4 +17,5 @@ internal fun BatchIngestConfig?.toAutoBatchIngestLimits(): AutoBatchIngestLimits
batchSizeLimitBytes =
this?.sizeLimitBytes()?.getOrNull()?.takeIf { it > 0 && it <= Int.MAX_VALUE }?.toInt()
?: AutoBatchQueue.DEFAULT_BATCH_SIZE_LIMIT_BYTES,
useMultipartEndpoint = this?.useMultipartEndpoint()?.getOrNull() ?: true,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.langchain.smith.client

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import com.langchain.smith.core.MultipartField
import com.langchain.smith.core.http.HttpRequestBody
import com.langchain.smith.core.http.multipartFormData
import com.langchain.smith.models.runs.Run
import com.langchain.smith.models.runs.RunIngestBatchParams
import kotlin.jvm.optionals.getOrNull

internal fun RunIngestBatchParams.toRunMultipartFormData(jsonMapper: JsonMapper): HttpRequestBody? {
// Multipart requires run id, trace_id, and dotted_order to address each part correctly.
// If a queued run is missing those fields, callers fall back to legacy JSON batch ingest
// instead of failing the entire auto-batch.
// TODO: Add attachment parts if Java tracing starts enqueueing run attachments.
val fields =
listOf(
multipartFieldsForRuns(
jsonMapper,
operation = "post",
runs = post().orElse(emptyList()),
) ?: return null,
multipartFieldsForRuns(
jsonMapper,
operation = "patch",
runs = patch().orElse(emptyList()),
) ?: return null,
)
.flatten()
.toMap()

return fields.takeIf { it.isNotEmpty() }?.let { multipartFormData(jsonMapper, it) }
}

private fun multipartFieldsForRuns(
jsonMapper: JsonMapper,
operation: String,
runs: List<Run>,
): List<Pair<String, MultipartField<*>>>? =
runs.flatMap { run -> multipartFieldsForRun(jsonMapper, operation, run) ?: return null }

private fun multipartFieldsForRun(
jsonMapper: JsonMapper,
operation: String,
run: Run,
): List<Pair<String, MultipartField<*>>>? {
val runId = run.id().getOrNull() ?: return null
run.traceId().getOrNull() ?: return null
run.dottedOrder().getOrNull() ?: return null

val runFields =
jsonMapper
.valueToTree<ObjectNode>(run)
.fields()
.asSequence()
.map { it.key to it.value }
.toList()
val main =
jsonMapper
.createObjectNode()
.setAll<ObjectNode>(
runFields
.filterNot { (fieldName, _) -> fieldName in MULTIPART_SEPARATE_FIELDS }
.toMap()
)
val separateFields =
MULTIPART_SEPARATE_FIELDS.mapNotNull { fieldName ->
runFields
.firstOrNull { (name, _) -> name == fieldName }
?.second
?.takeUnless { it.isNull || it.isMissingNode }
?.let { fieldName to it }
}

return listOf("$operation.$runId" to jsonField(jsonMapper, main)) +
separateFields.map { (fieldName, value) ->
"$operation.$runId.$fieldName" to jsonField(jsonMapper, value)
}
}

private fun jsonField(jsonMapper: JsonMapper, value: JsonNode): MultipartField<ByteArray> {
val bytes = jsonMapper.writeValueAsBytes(value)
return MultipartField.builder<ByteArray>()
.value(bytes)
.contentType("application/json; length=${bytes.size}")
.build()
}

private val MULTIPART_SEPARATE_FIELDS =
listOf("inputs", "outputs", "events", "extra", "error", "serialized")
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.langchain.smith.services.async
import com.langchain.smith.client.AutoBatchIngestLimits
import com.langchain.smith.client.AutoBatchQueue
import com.langchain.smith.client.toAutoBatchIngestLimits
import com.langchain.smith.client.toRunMultipartFormData
import com.langchain.smith.core.ClientOptions
import com.langchain.smith.core.RequestOptions
import com.langchain.smith.core.checkRequired
Expand All @@ -19,6 +20,7 @@ import com.langchain.smith.core.http.HttpResponseFor
import com.langchain.smith.core.http.json
import com.langchain.smith.core.http.parseable
import com.langchain.smith.core.prepareAsync
import com.langchain.smith.errors.NotFoundException
import com.langchain.smith.models.runs.RunCreateParams
import com.langchain.smith.models.runs.RunCreateResponse
import com.langchain.smith.models.runs.RunIngestBatchParams
Expand All @@ -37,35 +39,77 @@ import com.langchain.smith.services.async.annotationqueues.InfoServiceAsyncImpl
import com.langchain.smith.services.async.runs.RuleServiceAsync
import com.langchain.smith.services.async.runs.RuleServiceAsyncImpl
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import kotlin.jvm.optionals.getOrNull
import org.slf4j.LoggerFactory

class RunServiceAsyncImpl internal constructor(private val clientOptions: ClientOptions) :
RunServiceAsync {

private val withRawResponse: RunServiceAsync.WithRawResponse by lazy {
WithRawResponseImpl(clientOptions)
}
private val withRawResponse: WithRawResponseImpl by lazy { WithRawResponseImpl(clientOptions) }

private val rules: RuleServiceAsync by lazy { RuleServiceAsyncImpl(clientOptions) }
private val multipartDisabled = AtomicBoolean(false)

private val batchQueue: AutoBatchQueue by lazy {
val limits = fetchAutoBatchIngestLimits()
AutoBatchQueue(
sendBatch = { params, requestOptions ->
withRawResponse().ingestBatch(params, requestOptions).thenApply {
it.parse()
null
}
},
sendBatch = { params, requestOptions -> sendAutoBatch(params, requestOptions, limits) },
batchSizeLimit = limits.batchSizeLimit,
batchSizeLimitBytes = limits.batchSizeLimitBytes,
)
}

override fun withRawResponse(): RunServiceAsync.WithRawResponse = withRawResponse

private fun sendAutoBatch(
params: RunIngestBatchParams,
requestOptions: RequestOptions,
limits: AutoBatchIngestLimits,
): CompletableFuture<Void?> {
if (!limits.useMultipartEndpoint || multipartDisabled.get()) {
return sendJsonBatch(params, requestOptions)
}

val multipartFuture =
try {
withRawResponse.ingestMultipartBatch(params, requestOptions)
} catch (e: Exception) {
return failedFuture(e)
}

return multipartFuture
.handle { sentMultipart, throwable ->
if (throwable == null) {
if (sentMultipart) {
CompletableFuture.completedFuture<Void?>(null)
} else {
sendJsonBatch(params, requestOptions)
}
} else {
val cause = (throwable as? CompletionException)?.cause ?: throwable
if (cause is NotFoundException) {
multipartDisabled.set(true)
sendJsonBatch(params, requestOptions)
} else {
failedFuture(cause)
}
}
}
.thenCompose { it }
}

private fun sendJsonBatch(
params: RunIngestBatchParams,
requestOptions: RequestOptions,
): CompletableFuture<Void?> =
withRawResponse().ingestBatch(params, requestOptions).thenApply {
it.parse()
null
}

private fun fetchAutoBatchIngestLimits() =
try {
InfoServiceAsyncImpl(clientOptions)
Expand Down Expand Up @@ -177,6 +221,9 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client

private companion object {
private val logger = LoggerFactory.getLogger(RunServiceAsyncImpl::class.java)

private fun <T> failedFuture(throwable: Throwable): CompletableFuture<T> =
CompletableFuture<T>().also { it.completeExceptionally(throwable) }
}

class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
Expand Down Expand Up @@ -327,6 +374,33 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client
}
}

internal fun ingestMultipartBatch(
params: RunIngestBatchParams,
requestOptions: RequestOptions,
): CompletableFuture<Boolean> {
val body = params.toRunMultipartFormData(clientOptions.jsonMapper)
if (body == null) {
// Some queued runs do not have the fields required by multipart ingest; fall
// back to legacy JSON batch ingest for this batch only.
return CompletableFuture.completedFuture(false)
}
val request =
HttpRequest.builder()
.method(HttpMethod.POST)
.baseUrl(clientOptions.baseUrl())
.addPathSegments("runs", "multipart")
.body(body)
.build()
.prepareAsync(clientOptions, params)
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))
return request
.thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) }
.thenApply { response ->
errorHandler.handle(response).use {}
true
}
}

private val queryHandler: Handler<RunQueryResponse> =
jsonHandler<RunQueryResponse>(clientOptions.jsonMapper)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.langchain.smith.services.blocking
import com.langchain.smith.client.AutoBatchIngestLimits
import com.langchain.smith.client.AutoBatchQueue
import com.langchain.smith.client.toAutoBatchIngestLimits
import com.langchain.smith.client.toRunMultipartFormData
import com.langchain.smith.core.ClientOptions
import com.langchain.smith.core.RequestOptions
import com.langchain.smith.core.checkRequired
Expand All @@ -19,6 +20,7 @@ import com.langchain.smith.core.http.HttpResponseFor
import com.langchain.smith.core.http.json
import com.langchain.smith.core.http.parseable
import com.langchain.smith.core.prepare
import com.langchain.smith.errors.NotFoundException
import com.langchain.smith.models.runs.RunCreateParams
import com.langchain.smith.models.runs.RunCreateResponse
import com.langchain.smith.models.runs.RunIngestBatchParams
Expand All @@ -37,36 +39,57 @@ import com.langchain.smith.services.blocking.annotationqueues.InfoServiceImpl
import com.langchain.smith.services.blocking.runs.RuleService
import com.langchain.smith.services.blocking.runs.RuleServiceImpl
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import kotlin.jvm.optionals.getOrNull
import org.slf4j.LoggerFactory

class RunServiceImpl internal constructor(private val clientOptions: ClientOptions) : RunService {

private val withRawResponse: RunService.WithRawResponse by lazy {
WithRawResponseImpl(clientOptions)
}
private val withRawResponse: WithRawResponseImpl by lazy { WithRawResponseImpl(clientOptions) }

private val rules: RuleService by lazy { RuleServiceImpl(clientOptions) }
private val multipartDisabled = AtomicBoolean(false)

private val batchQueue: AutoBatchQueue by lazy {
val limits = fetchAutoBatchIngestLimits()
AutoBatchQueue(
sendBatch = { params, requestOptions ->
try {
withRawResponse().ingestBatch(params, requestOptions).parse()
CompletableFuture.completedFuture(null)
} catch (e: Exception) {
CompletableFuture<Void?>().also { it.completeExceptionally(e) }
}
},
sendBatch = { params, requestOptions -> sendAutoBatch(params, requestOptions, limits) },
batchSizeLimit = limits.batchSizeLimit,
batchSizeLimitBytes = limits.batchSizeLimitBytes,
)
}

override fun withRawResponse(): RunService.WithRawResponse = withRawResponse

private fun sendAutoBatch(
params: RunIngestBatchParams,
requestOptions: RequestOptions,
limits: AutoBatchIngestLimits,
): CompletableFuture<Void?> =
try {
if (limits.useMultipartEndpoint && !multipartDisabled.get()) {
val sentMultipart =
try {
// If the multipart endpoint is unavailable on this server, fall back to
// legacy JSON batch ingest and keep using it for future auto-batches.
withRawResponse.ingestMultipartBatch(params, requestOptions)
} catch (e: NotFoundException) {
multipartDisabled.set(true)
withRawResponse().ingestBatch(params, requestOptions).parse()
true
}
if (!sentMultipart) {
withRawResponse().ingestBatch(params, requestOptions).parse()
}
} else {
withRawResponse().ingestBatch(params, requestOptions).parse()
}
CompletableFuture.completedFuture(null)
} catch (e: Exception) {
CompletableFuture<Void?>().also { it.completeExceptionally(e) }
}

private fun fetchAutoBatchIngestLimits() =
try {
InfoServiceImpl(clientOptions)
Expand Down Expand Up @@ -291,6 +314,30 @@ class RunServiceImpl internal constructor(private val clientOptions: ClientOptio
}
}

internal fun ingestMultipartBatch(
params: RunIngestBatchParams,
requestOptions: RequestOptions,
): Boolean {
val body = params.toRunMultipartFormData(clientOptions.jsonMapper)
if (body == null) {
// Some queued runs do not have the fields required by multipart ingest; fall
// back to legacy JSON batch ingest for this batch only.
return false
}
val request =
HttpRequest.builder()
.method(HttpMethod.POST)
.baseUrl(clientOptions.baseUrl())
.addPathSegments("runs", "multipart")
.body(body)
.build()
.prepare(clientOptions, params)
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))
val response = clientOptions.httpClient.execute(request, requestOptions)
errorHandler.handle(response).use {}
return true
}

private val queryHandler: Handler<RunQueryResponse> =
jsonHandler<RunQueryResponse>(clientOptions.jsonMapper)

Expand Down
Loading
Loading