Skip to content

Commit d65e301

Browse files
authored
feat: Merge enqueued posts and patches to optimize batching (#135)
* Merge enqueued posts and patches to optimize batching * Exclude patched inputs by default * Logs * Fix import order * Feedback
1 parent 7cab9f9 commit d65e301

6 files changed

Lines changed: 179 additions & 11 deletions

File tree

AGENTS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ fun format(variables: Map<String, Any>): PromptMessages {
4444

4545
## Kotlin idioms
4646

47+
### Prefer immutable collection transformations
48+
49+
Avoid mutable accumulators when `map`, `filter`, `partition`, `associate`, `buildMap`, or `buildList` express the same logic clearly. Use mutation only when it materially improves readability, performance, or is required by an API.
50+
4751
### Use `buildMap` / `buildList` instead of mutable + convert
4852

4953
```kotlin

langsmith-java-core/src/main/kotlin/com/langchain/smith/client/AutoBatchQueue.kt

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.langchain.smith.client
22

3+
import com.fasterxml.jackson.databind.node.ObjectNode
34
import com.langchain.smith.core.RequestOptions
45
import com.langchain.smith.core.Timeout
56
import com.langchain.smith.core.http.Headers
67
import com.langchain.smith.core.http.QueryParams
8+
import com.langchain.smith.core.jsonMapper
79
import com.langchain.smith.models.runs.Run
810
import com.langchain.smith.models.runs.RunIngestBatchParams
911
import java.util.concurrent.CompletionException
@@ -19,6 +21,7 @@ import java.util.concurrent.TimeUnit
1921
import java.util.concurrent.atomic.AtomicBoolean
2022
import java.util.concurrent.atomic.AtomicInteger
2123
import java.util.concurrent.locks.ReentrantLock
24+
import kotlin.jvm.optionals.getOrNull
2225
import org.slf4j.LoggerFactory
2326

2427
/**
@@ -238,9 +241,6 @@ class AutoBatchQueue(
238241
* Drains up to [maxItems] queued operations and returns batch params grouped by request
239242
* options.
240243
*
241-
* TODO: Merge create + update for the same run ID before sending (like the JS/Python SDKs).
242-
* This would reduce the number of operations in each batch when a run is created and
243-
* immediately updated (common for short-lived runs).
244244
* TODO: Also flush/split batches based on serialized payload size, not just operation count.
245245
* TODO: Support multipart ingest endpoint for large payloads with attachments.
246246
* TODO: Support gzip compression for batch requests.
@@ -324,15 +324,58 @@ class AutoBatchQueue(
324324
val queryParams: QueryParams.Builder = QueryParams.builder(),
325325
) {
326326
fun toBatch(): Batch {
327+
val mergeResult = mergePostsAndPatches()
327328
val builder = RunIngestBatchParams.builder()
328-
if (posts.isNotEmpty()) builder.post(posts)
329-
if (patches.isNotEmpty()) builder.patch(patches)
329+
if (mergeResult.posts.isNotEmpty()) builder.post(mergeResult.posts)
330+
if (mergeResult.patches.isNotEmpty()) builder.patch(mergeResult.patches)
330331
builder.additionalHeaders(headers.build())
331332
builder.additionalQueryParams(queryParams.build())
332333
return Batch(params = builder.build(), requestOptions = requestOptions)
333334
}
335+
336+
private fun mergePostsAndPatches(): MergeResult {
337+
if (posts.isEmpty() || patches.isEmpty()) {
338+
return MergeResult(posts = posts, patches = patches)
339+
}
340+
341+
val postsById =
342+
posts.mapNotNull { post -> post.id().getOrNull()?.let { it to post } }.toMap()
343+
val postsWithoutId = posts.filter { it.id().getOrNull() == null }
344+
val patchesByPostId =
345+
patches.mapNotNull { patch ->
346+
patch.id().getOrNull()?.takeIf(postsById::containsKey)?.let { it to patch }
347+
}
348+
val patchesByPostIdMap = patchesByPostId.toMap()
349+
val standalonePatches =
350+
patches.filter { patch ->
351+
patch.id().getOrNull()?.let(postsById::containsKey) != true
352+
}
353+
354+
return MergeResult(
355+
posts =
356+
postsWithoutId +
357+
postsById.map { (id, post) ->
358+
patchesByPostIdMap[id]?.let { mergePostAndPatch(post, it) } ?: post
359+
},
360+
patches = standalonePatches,
361+
mergedRunIds = patchesByPostId.map { it.first },
362+
)
363+
}
364+
365+
private fun mergePostAndPatch(post: Run, patch: Run): Run {
366+
val merged = objectMapper.valueToTree<ObjectNode>(post)
367+
val patchFields = objectMapper.valueToTree<ObjectNode>(patch)
368+
patchFields.fields().forEach { (field, value) -> merged.set<ObjectNode>(field, value) }
369+
return objectMapper.treeToValue(merged, Run::class.java)
370+
}
334371
}
335372

373+
private data class MergeResult(
374+
val posts: List<Run>,
375+
val patches: List<Run>,
376+
val mergedRunIds: List<String> = emptyList(),
377+
)
378+
336379
private data class BatchItem(
337380
val op: BatchOp,
338381
val run: Run,
@@ -348,6 +391,7 @@ class AutoBatchQueue(
348391

349392
companion object {
350393
private val logger = LoggerFactory.getLogger(AutoBatchQueue::class.java)
394+
private val objectMapper = jsonMapper()
351395

352396
const val DEFAULT_BATCH_SIZE_LIMIT = 100
353397
const val DEFAULT_AGGREGATION_DELAY_MS = 250L

langsmith-java-core/src/main/kotlin/com/langchain/smith/tracing/RunTree.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,19 @@ class RunTree(
132132
submitSafely(e, name) { c.runs().create(runData) }
133133
}
134134

135+
/**
136+
* Patches this run in LangSmith in the background, batched with other pending operations.
137+
*
138+
* Inputs are omitted by default so the patch does not duplicate or overwrite the create payload
139+
* if the create and patch are merged during batching.
140+
*/
141+
fun patchRun() = patchRun(excludeInputs = true)
142+
135143
/** Patches this run in LangSmith in the background, batched with other pending operations. */
136-
fun patchRun() {
144+
fun patchRun(excludeInputs: Boolean) {
137145
val c = client ?: return
138146
val e = executor ?: DEFAULT_EXECUTOR
139-
val runData = buildRunData()
147+
val runData = buildRunData(includeInputs = !excludeInputs)
140148
submitSafely(e, name) {
141149
c.runs()
142150
.update(
@@ -149,7 +157,9 @@ class RunTree(
149157
}
150158

151159
/** Builds the [Run] data object for posting/patching to the LangSmith API. */
152-
fun buildRunData(): Run {
160+
fun buildRunData(): Run = buildRunData(includeInputs = true)
161+
162+
private fun buildRunData(includeInputs: Boolean): Run {
153163
val builder =
154164
Run.builder()
155165
.id(id)
@@ -160,9 +170,6 @@ class RunTree(
160170
.startTime(startTime)
161171
.apply { this@RunTree.projectName?.let { sessionName(it) } }
162172
.tags(tags)
163-
.inputs(
164-
Run.Inputs.builder().putAllAdditionalProperties(toJsonValueMap(inputs)).build()
165-
)
166173
.extra(
167174
Run.Extra.builder()
168175
.apply {
@@ -177,6 +184,11 @@ class RunTree(
177184
.build()
178185
)
179186

187+
if (includeInputs) {
188+
builder.inputs(
189+
Run.Inputs.builder().putAllAdditionalProperties(toJsonValueMap(inputs)).build()
190+
)
191+
}
180192
endTime?.let { builder.endTime(it) }
181193
error?.let { builder.error(it) }
182194
parentRunId?.let { builder.parentRunId(it) }

langsmith-java-core/src/test/kotlin/com/langchain/smith/client/AutoBatchQueueTest.kt

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.langchain.smith.client
22

3+
import com.langchain.smith.core.JsonValue
34
import com.langchain.smith.core.RequestOptions
45
import com.langchain.smith.core.Timeout
56
import com.langchain.smith.core.http.Headers
@@ -84,6 +85,45 @@ internal class AutoBatchQueueTest {
8485
assertThat(capture.patchedRuns).hasSize(1)
8586
}
8687

88+
@Test
89+
fun `merges post and patch for same run id`() {
90+
val capture = CapturingLangsmithClient()
91+
val queue = testQueue(capture)
92+
val post =
93+
testRun("r1")
94+
.toBuilder()
95+
.inputs(
96+
Run.Inputs.builder()
97+
.putAdditionalProperty("question", JsonValue.from("hello"))
98+
.build()
99+
)
100+
.build()
101+
val patch =
102+
Run.builder()
103+
.id("r1")
104+
.outputs(
105+
Run.Outputs.builder()
106+
.putAdditionalProperty("answer", JsonValue.from("world"))
107+
.build()
108+
)
109+
.endTime("2024-01-01T00:00:01Z")
110+
.build()
111+
112+
queue.post(post)
113+
queue.patch(patch)
114+
queue.flush()
115+
116+
assertThat(capture.postedRuns).hasSize(1)
117+
assertThat(capture.patchedRuns).isEmpty()
118+
val mergedRun = capture.postedRuns.single()
119+
assertThat(mergedRun.id().getOrNull()).isEqualTo("r1")
120+
assertThat(mergedRun.inputs().get()._additionalProperties()["question"])
121+
.isEqualTo(JsonValue.from("hello"))
122+
assertThat(mergedRun.outputs().get()._additionalProperties()["answer"])
123+
.isEqualTo(JsonValue.from("world"))
124+
assertThat(mergedRun.endTime().getOrNull()).isEqualTo("2024-01-01T00:00:01Z")
125+
}
126+
87127
@Test
88128
fun `flush is no-op when queue is empty`() {
89129
val capture = CapturingLangsmithClient()
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.langchain.smith.tracing
2+
3+
import com.langchain.smith.testutils.CapturingLangsmithClient
4+
import java.util.concurrent.Executors
5+
import java.util.concurrent.TimeUnit
6+
import kotlin.jvm.optionals.getOrNull
7+
import org.assertj.core.api.Assertions.assertThat
8+
import org.junit.jupiter.api.Test
9+
10+
internal class RunTreeTest {
11+
12+
@Test
13+
fun `patchRun omits inputs by default`() {
14+
val capture = CapturingLangsmithClient()
15+
val executor = Executors.newSingleThreadExecutor()
16+
val run =
17+
RunTree.builder()
18+
.name("test")
19+
.inputs(mapOf("question" to "hello"))
20+
.outputs(mapOf("answer" to "world"))
21+
.client(capture.client)
22+
.executor(executor)
23+
.build()
24+
25+
run.patchRun()
26+
executor.shutdown()
27+
assertThat(executor.awaitTermination(5, TimeUnit.SECONDS)).isTrue()
28+
29+
assertThat(capture.patchedRuns).hasSize(1)
30+
assertThat(capture.patchedRuns.single().inputs().isPresent).isFalse()
31+
}
32+
33+
@Test
34+
fun `patchRun can include inputs when requested`() {
35+
val capture = CapturingLangsmithClient()
36+
val executor = Executors.newSingleThreadExecutor()
37+
val run =
38+
RunTree.builder()
39+
.name("test")
40+
.inputs(mapOf("question" to "hello"))
41+
.client(capture.client)
42+
.executor(executor)
43+
.build()
44+
45+
run.patchRun(excludeInputs = false)
46+
executor.shutdown()
47+
assertThat(executor.awaitTermination(5, TimeUnit.SECONDS)).isTrue()
48+
49+
assertThat(capture.patchedRuns).hasSize(1)
50+
assertThat(
51+
capture.patchedRuns
52+
.single()
53+
.inputs()
54+
.get()
55+
._additionalProperties()["question"]
56+
?.asString()
57+
?.getOrNull()
58+
)
59+
.isEqualTo("hello")
60+
}
61+
}

langsmith-java-example/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ dependencies {
2727
implementation("org.springframework.boot:spring-boot-starter-web")
2828
implementation("org.springframework.boot:spring-boot-starter")
2929

30+
// Simple logging for examples. This lets smoke tests show SDK debug/trace logs.
31+
runtimeOnly("org.slf4j:slf4j-simple:2.0.17")
32+
3033
// Security: constrain vulnerable transitive dependencies from Spring Boot 2.7.18.
3134
// Spring Boot 2.7.x is EOL; these constraints override the managed versions in-place.
3235
// None of these affect published artifacts (this is a non-published example module).
@@ -94,6 +97,10 @@ application {
9497
// Export stdin to examples for readln(); require -Pexample= when running (configuration-cache safe: no project access in doFirst)
9598
tasks.named<JavaExec>("run") {
9699
standardInput = System.`in`
100+
systemProperty("org.slf4j.simpleLogger.defaultLogLevel", "info")
101+
systemProperty("org.slf4j.simpleLogger.log.com.langchain.smith.client.AutoBatchQueue", "trace")
102+
systemProperty("org.slf4j.simpleLogger.showDateTime", "false")
103+
systemProperty("org.slf4j.simpleLogger.showShortLogName", "true")
97104
doFirst {
98105
if (mainClass.get() == "Main") {
99106
throw GradleException(

0 commit comments

Comments
 (0)