Skip to content

Commit 288221c

Browse files
authored
feat: Add multipart batching (#138)
* Add multipart batching * Handle 404s better
1 parent b30293b commit 288221c

6 files changed

Lines changed: 665 additions & 21 deletions

File tree

langsmith-java-core/src/main/kotlin/com/langchain/smith/client/AutoBatchIngestLimits.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import kotlin.jvm.optionals.getOrNull
66
internal data class AutoBatchIngestLimits(
77
val batchSizeLimit: Int = AutoBatchQueue.DEFAULT_BATCH_SIZE_LIMIT,
88
val batchSizeLimitBytes: Int = AutoBatchQueue.DEFAULT_BATCH_SIZE_LIMIT_BYTES,
9+
val useMultipartEndpoint: Boolean = true,
910
)
1011

1112
internal fun BatchIngestConfig?.toAutoBatchIngestLimits(): AutoBatchIngestLimits =
@@ -16,4 +17,5 @@ internal fun BatchIngestConfig?.toAutoBatchIngestLimits(): AutoBatchIngestLimits
1617
batchSizeLimitBytes =
1718
this?.sizeLimitBytes()?.getOrNull()?.takeIf { it > 0 && it <= Int.MAX_VALUE }?.toInt()
1819
?: AutoBatchQueue.DEFAULT_BATCH_SIZE_LIMIT_BYTES,
20+
useMultipartEndpoint = this?.useMultipartEndpoint()?.getOrNull() ?: true,
1921
)
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.langchain.smith.client
2+
3+
import com.fasterxml.jackson.databind.JsonNode
4+
import com.fasterxml.jackson.databind.json.JsonMapper
5+
import com.fasterxml.jackson.databind.node.ObjectNode
6+
import com.langchain.smith.core.MultipartField
7+
import com.langchain.smith.core.http.HttpRequestBody
8+
import com.langchain.smith.core.http.multipartFormData
9+
import com.langchain.smith.models.runs.Run
10+
import com.langchain.smith.models.runs.RunIngestBatchParams
11+
import kotlin.jvm.optionals.getOrNull
12+
13+
internal fun RunIngestBatchParams.toRunMultipartFormData(jsonMapper: JsonMapper): HttpRequestBody? {
14+
// Multipart requires run id, trace_id, and dotted_order to address each part correctly.
15+
// If a queued run is missing those fields, callers fall back to legacy JSON batch ingest
16+
// instead of failing the entire auto-batch.
17+
// TODO: Add attachment parts if Java tracing starts enqueueing run attachments.
18+
val fields =
19+
listOf(
20+
multipartFieldsForRuns(
21+
jsonMapper,
22+
operation = "post",
23+
runs = post().orElse(emptyList()),
24+
) ?: return null,
25+
multipartFieldsForRuns(
26+
jsonMapper,
27+
operation = "patch",
28+
runs = patch().orElse(emptyList()),
29+
) ?: return null,
30+
)
31+
.flatten()
32+
.toMap()
33+
34+
return fields.takeIf { it.isNotEmpty() }?.let { multipartFormData(jsonMapper, it) }
35+
}
36+
37+
private fun multipartFieldsForRuns(
38+
jsonMapper: JsonMapper,
39+
operation: String,
40+
runs: List<Run>,
41+
): List<Pair<String, MultipartField<*>>>? =
42+
runs.flatMap { run -> multipartFieldsForRun(jsonMapper, operation, run) ?: return null }
43+
44+
private fun multipartFieldsForRun(
45+
jsonMapper: JsonMapper,
46+
operation: String,
47+
run: Run,
48+
): List<Pair<String, MultipartField<*>>>? {
49+
val runId = run.id().getOrNull() ?: return null
50+
run.traceId().getOrNull() ?: return null
51+
run.dottedOrder().getOrNull() ?: return null
52+
53+
val runFields =
54+
jsonMapper
55+
.valueToTree<ObjectNode>(run)
56+
.fields()
57+
.asSequence()
58+
.map { it.key to it.value }
59+
.toList()
60+
val main =
61+
jsonMapper
62+
.createObjectNode()
63+
.setAll<ObjectNode>(
64+
runFields
65+
.filterNot { (fieldName, _) -> fieldName in MULTIPART_SEPARATE_FIELDS }
66+
.toMap()
67+
)
68+
val separateFields =
69+
MULTIPART_SEPARATE_FIELDS.mapNotNull { fieldName ->
70+
runFields
71+
.firstOrNull { (name, _) -> name == fieldName }
72+
?.second
73+
?.takeUnless { it.isNull || it.isMissingNode }
74+
?.let { fieldName to it }
75+
}
76+
77+
return listOf("$operation.$runId" to jsonField(jsonMapper, main)) +
78+
separateFields.map { (fieldName, value) ->
79+
"$operation.$runId.$fieldName" to jsonField(jsonMapper, value)
80+
}
81+
}
82+
83+
private fun jsonField(jsonMapper: JsonMapper, value: JsonNode): MultipartField<ByteArray> {
84+
val bytes = jsonMapper.writeValueAsBytes(value)
85+
return MultipartField.builder<ByteArray>()
86+
.value(bytes)
87+
.contentType("application/json; length=${bytes.size}")
88+
.build()
89+
}
90+
91+
private val MULTIPART_SEPARATE_FIELDS =
92+
listOf("inputs", "outputs", "events", "extra", "error", "serialized")

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/async/RunServiceAsyncImpl.kt

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package com.langchain.smith.services.async
55
import com.langchain.smith.client.AutoBatchIngestLimits
66
import com.langchain.smith.client.AutoBatchQueue
77
import com.langchain.smith.client.toAutoBatchIngestLimits
8+
import com.langchain.smith.client.toRunMultipartFormData
89
import com.langchain.smith.core.ClientOptions
910
import com.langchain.smith.core.RequestOptions
1011
import com.langchain.smith.core.checkRequired
@@ -19,6 +20,7 @@ import com.langchain.smith.core.http.HttpResponseFor
1920
import com.langchain.smith.core.http.json
2021
import com.langchain.smith.core.http.parseable
2122
import com.langchain.smith.core.prepareAsync
23+
import com.langchain.smith.errors.NotFoundException
2224
import com.langchain.smith.models.runs.RunCreateParams
2325
import com.langchain.smith.models.runs.RunCreateResponse
2426
import com.langchain.smith.models.runs.RunIngestBatchParams
@@ -37,35 +39,77 @@ import com.langchain.smith.services.async.annotationqueues.InfoServiceAsyncImpl
3739
import com.langchain.smith.services.async.runs.RuleServiceAsync
3840
import com.langchain.smith.services.async.runs.RuleServiceAsyncImpl
3941
import java.util.concurrent.CompletableFuture
42+
import java.util.concurrent.CompletionException
43+
import java.util.concurrent.atomic.AtomicBoolean
4044
import java.util.function.Consumer
4145
import kotlin.jvm.optionals.getOrNull
4246
import org.slf4j.LoggerFactory
4347

4448
class RunServiceAsyncImpl internal constructor(private val clientOptions: ClientOptions) :
4549
RunServiceAsync {
4650

47-
private val withRawResponse: RunServiceAsync.WithRawResponse by lazy {
48-
WithRawResponseImpl(clientOptions)
49-
}
51+
private val withRawResponse: WithRawResponseImpl by lazy { WithRawResponseImpl(clientOptions) }
5052

5153
private val rules: RuleServiceAsync by lazy { RuleServiceAsyncImpl(clientOptions) }
54+
private val multipartDisabled = AtomicBoolean(false)
5255

5356
private val batchQueue: AutoBatchQueue by lazy {
5457
val limits = fetchAutoBatchIngestLimits()
5558
AutoBatchQueue(
56-
sendBatch = { params, requestOptions ->
57-
withRawResponse().ingestBatch(params, requestOptions).thenApply {
58-
it.parse()
59-
null
60-
}
61-
},
59+
sendBatch = { params, requestOptions -> sendAutoBatch(params, requestOptions, limits) },
6260
batchSizeLimit = limits.batchSizeLimit,
6361
batchSizeLimitBytes = limits.batchSizeLimitBytes,
6462
)
6563
}
6664

6765
override fun withRawResponse(): RunServiceAsync.WithRawResponse = withRawResponse
6866

67+
private fun sendAutoBatch(
68+
params: RunIngestBatchParams,
69+
requestOptions: RequestOptions,
70+
limits: AutoBatchIngestLimits,
71+
): CompletableFuture<Void?> {
72+
if (!limits.useMultipartEndpoint || multipartDisabled.get()) {
73+
return sendJsonBatch(params, requestOptions)
74+
}
75+
76+
val multipartFuture =
77+
try {
78+
withRawResponse.ingestMultipartBatch(params, requestOptions)
79+
} catch (e: Exception) {
80+
return failedFuture(e)
81+
}
82+
83+
return multipartFuture
84+
.handle { sentMultipart, throwable ->
85+
if (throwable == null) {
86+
if (sentMultipart) {
87+
CompletableFuture.completedFuture<Void?>(null)
88+
} else {
89+
sendJsonBatch(params, requestOptions)
90+
}
91+
} else {
92+
val cause = (throwable as? CompletionException)?.cause ?: throwable
93+
if (cause is NotFoundException) {
94+
multipartDisabled.set(true)
95+
sendJsonBatch(params, requestOptions)
96+
} else {
97+
failedFuture(cause)
98+
}
99+
}
100+
}
101+
.thenCompose { it }
102+
}
103+
104+
private fun sendJsonBatch(
105+
params: RunIngestBatchParams,
106+
requestOptions: RequestOptions,
107+
): CompletableFuture<Void?> =
108+
withRawResponse().ingestBatch(params, requestOptions).thenApply {
109+
it.parse()
110+
null
111+
}
112+
69113
private fun fetchAutoBatchIngestLimits() =
70114
try {
71115
InfoServiceAsyncImpl(clientOptions)
@@ -177,6 +221,9 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client
177221

178222
private companion object {
179223
private val logger = LoggerFactory.getLogger(RunServiceAsyncImpl::class.java)
224+
225+
private fun <T> failedFuture(throwable: Throwable): CompletableFuture<T> =
226+
CompletableFuture<T>().also { it.completeExceptionally(throwable) }
180227
}
181228

182229
class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
@@ -327,6 +374,33 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client
327374
}
328375
}
329376

377+
internal fun ingestMultipartBatch(
378+
params: RunIngestBatchParams,
379+
requestOptions: RequestOptions,
380+
): CompletableFuture<Boolean> {
381+
val body = params.toRunMultipartFormData(clientOptions.jsonMapper)
382+
if (body == null) {
383+
// Some queued runs do not have the fields required by multipart ingest; fall
384+
// back to legacy JSON batch ingest for this batch only.
385+
return CompletableFuture.completedFuture(false)
386+
}
387+
val request =
388+
HttpRequest.builder()
389+
.method(HttpMethod.POST)
390+
.baseUrl(clientOptions.baseUrl())
391+
.addPathSegments("runs", "multipart")
392+
.body(body)
393+
.build()
394+
.prepareAsync(clientOptions, params)
395+
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))
396+
return request
397+
.thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) }
398+
.thenApply { response ->
399+
errorHandler.handle(response).use {}
400+
true
401+
}
402+
}
403+
330404
private val queryHandler: Handler<RunQueryResponse> =
331405
jsonHandler<RunQueryResponse>(clientOptions.jsonMapper)
332406

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/blocking/RunServiceImpl.kt

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package com.langchain.smith.services.blocking
55
import com.langchain.smith.client.AutoBatchIngestLimits
66
import com.langchain.smith.client.AutoBatchQueue
77
import com.langchain.smith.client.toAutoBatchIngestLimits
8+
import com.langchain.smith.client.toRunMultipartFormData
89
import com.langchain.smith.core.ClientOptions
910
import com.langchain.smith.core.RequestOptions
1011
import com.langchain.smith.core.checkRequired
@@ -19,6 +20,7 @@ import com.langchain.smith.core.http.HttpResponseFor
1920
import com.langchain.smith.core.http.json
2021
import com.langchain.smith.core.http.parseable
2122
import com.langchain.smith.core.prepare
23+
import com.langchain.smith.errors.NotFoundException
2224
import com.langchain.smith.models.runs.RunCreateParams
2325
import com.langchain.smith.models.runs.RunCreateResponse
2426
import com.langchain.smith.models.runs.RunIngestBatchParams
@@ -37,36 +39,57 @@ import com.langchain.smith.services.blocking.annotationqueues.InfoServiceImpl
3739
import com.langchain.smith.services.blocking.runs.RuleService
3840
import com.langchain.smith.services.blocking.runs.RuleServiceImpl
3941
import java.util.concurrent.CompletableFuture
42+
import java.util.concurrent.atomic.AtomicBoolean
4043
import java.util.function.Consumer
4144
import kotlin.jvm.optionals.getOrNull
4245
import org.slf4j.LoggerFactory
4346

4447
class RunServiceImpl internal constructor(private val clientOptions: ClientOptions) : RunService {
4548

46-
private val withRawResponse: RunService.WithRawResponse by lazy {
47-
WithRawResponseImpl(clientOptions)
48-
}
49+
private val withRawResponse: WithRawResponseImpl by lazy { WithRawResponseImpl(clientOptions) }
4950

5051
private val rules: RuleService by lazy { RuleServiceImpl(clientOptions) }
52+
private val multipartDisabled = AtomicBoolean(false)
5153

5254
private val batchQueue: AutoBatchQueue by lazy {
5355
val limits = fetchAutoBatchIngestLimits()
5456
AutoBatchQueue(
55-
sendBatch = { params, requestOptions ->
56-
try {
57-
withRawResponse().ingestBatch(params, requestOptions).parse()
58-
CompletableFuture.completedFuture(null)
59-
} catch (e: Exception) {
60-
CompletableFuture<Void?>().also { it.completeExceptionally(e) }
61-
}
62-
},
57+
sendBatch = { params, requestOptions -> sendAutoBatch(params, requestOptions, limits) },
6358
batchSizeLimit = limits.batchSizeLimit,
6459
batchSizeLimitBytes = limits.batchSizeLimitBytes,
6560
)
6661
}
6762

6863
override fun withRawResponse(): RunService.WithRawResponse = withRawResponse
6964

65+
private fun sendAutoBatch(
66+
params: RunIngestBatchParams,
67+
requestOptions: RequestOptions,
68+
limits: AutoBatchIngestLimits,
69+
): CompletableFuture<Void?> =
70+
try {
71+
if (limits.useMultipartEndpoint && !multipartDisabled.get()) {
72+
val sentMultipart =
73+
try {
74+
// If the multipart endpoint is unavailable on this server, fall back to
75+
// legacy JSON batch ingest and keep using it for future auto-batches.
76+
withRawResponse.ingestMultipartBatch(params, requestOptions)
77+
} catch (e: NotFoundException) {
78+
multipartDisabled.set(true)
79+
withRawResponse().ingestBatch(params, requestOptions).parse()
80+
true
81+
}
82+
if (!sentMultipart) {
83+
withRawResponse().ingestBatch(params, requestOptions).parse()
84+
}
85+
} else {
86+
withRawResponse().ingestBatch(params, requestOptions).parse()
87+
}
88+
CompletableFuture.completedFuture(null)
89+
} catch (e: Exception) {
90+
CompletableFuture<Void?>().also { it.completeExceptionally(e) }
91+
}
92+
7093
private fun fetchAutoBatchIngestLimits() =
7194
try {
7295
InfoServiceImpl(clientOptions)
@@ -291,6 +314,30 @@ class RunServiceImpl internal constructor(private val clientOptions: ClientOptio
291314
}
292315
}
293316

317+
internal fun ingestMultipartBatch(
318+
params: RunIngestBatchParams,
319+
requestOptions: RequestOptions,
320+
): Boolean {
321+
val body = params.toRunMultipartFormData(clientOptions.jsonMapper)
322+
if (body == null) {
323+
// Some queued runs do not have the fields required by multipart ingest; fall
324+
// back to legacy JSON batch ingest for this batch only.
325+
return false
326+
}
327+
val request =
328+
HttpRequest.builder()
329+
.method(HttpMethod.POST)
330+
.baseUrl(clientOptions.baseUrl())
331+
.addPathSegments("runs", "multipart")
332+
.body(body)
333+
.build()
334+
.prepare(clientOptions, params)
335+
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))
336+
val response = clientOptions.httpClient.execute(request, requestOptions)
337+
errorHandler.handle(response).use {}
338+
return true
339+
}
340+
294341
private val queryHandler: Handler<RunQueryResponse> =
295342
jsonHandler<RunQueryResponse>(clientOptions.jsonMapper)
296343

0 commit comments

Comments
 (0)