Skip to content

Commit 230959e

Browse files
authored
feat: Add autobatch tracing mode (#134)
* Add autobatch mode * Fix * Update signature * Add async impl * Fix optin * Fix * Fix * Fix tests * Add test * Refactor to be more idiomatic * Feedback * Use explicit lock, add to AGENTS.md * Remove test script
1 parent c3ab60b commit 230959e

18 files changed

Lines changed: 946 additions & 377 deletions

File tree

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ Tests skip gracefully via `assumeTrue` if keys are missing.
229229

230230
## Code style
231231

232+
- For cross-method concurrency coordination, prefer an explicit named `ReentrantLock` with `lock()` / `try` / `finally { unlock() }` over `synchronized` when review clarity matters. Keep the locked section minimal and do slow/blocking work outside the lock.
232233
- `toString()` should be single-line, following the `ClassName{field=value, field=value}` convention used by the rest of the SDK.
233234
- Avoid `@Suppress("UNCHECKED_CAST")` — restructure code to use safe patterns (`as? String`, `is Map<*, *>` with `entries.associate`, etc). When unavoidable (e.g. generic type erasure after an `is` check), add a comment explaining why the cast is safe.
234235
- Use named arguments for constructor/function calls with 2+ parameters, especially when types could be confused:

langsmith-java-client-okhttp/src/main/kotlin/com/langchain/smith/client/okhttp/LangsmithOkHttpClient.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,16 @@ class LangsmithOkHttpClient private constructor() {
271271
*/
272272
fun maxRetries(maxRetries: Int) = apply { clientOptions.maxRetries(maxRetries) }
273273

274+
/**
275+
* Whether run create/update calls should be automatically batched for tracing.
276+
*
277+
* Defaults to true. Set to false to send run create/update calls synchronously through the
278+
* single-run endpoints.
279+
*/
280+
fun autoBatchTracing(autoBatchTracing: Boolean) = apply {
281+
clientOptions.autoBatchTracing(autoBatchTracing)
282+
}
283+
274284
fun apiKey(apiKey: String?) = apply { clientOptions.apiKey(apiKey) }
275285

276286
/** Alias for calling [Builder.apiKey] with `apiKey.orElse(null)`. */

langsmith-java-client-okhttp/src/main/kotlin/com/langchain/smith/client/okhttp/LangsmithOkHttpClientAsync.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,16 @@ class LangsmithOkHttpClientAsync private constructor() {
271271
*/
272272
fun maxRetries(maxRetries: Int) = apply { clientOptions.maxRetries(maxRetries) }
273273

274+
/**
275+
* Whether run create/update calls should be automatically batched for tracing.
276+
*
277+
* Defaults to true. Set to false to send run create/update calls through the single-run
278+
* endpoints.
279+
*/
280+
fun autoBatchTracing(autoBatchTracing: Boolean) = apply {
281+
clientOptions.autoBatchTracing(autoBatchTracing)
282+
}
283+
274284
fun apiKey(apiKey: String?) = apply { clientOptions.apiKey(apiKey) }
275285

276286
/** Alias for calling [Builder.apiKey] with `apiKey.orElse(null)`. */
Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
package com.langchain.smith.client
2+
3+
import com.langchain.smith.core.RequestOptions
4+
import com.langchain.smith.core.Timeout
5+
import com.langchain.smith.core.http.Headers
6+
import com.langchain.smith.core.http.QueryParams
7+
import com.langchain.smith.models.runs.Run
8+
import com.langchain.smith.models.runs.RunIngestBatchParams
9+
import java.util.concurrent.CompletionException
10+
import java.util.concurrent.CompletionStage
11+
import java.util.concurrent.ConcurrentLinkedQueue
12+
import java.util.concurrent.ExecutionException
13+
import java.util.concurrent.ExecutorService
14+
import java.util.concurrent.Executors
15+
import java.util.concurrent.Phaser
16+
import java.util.concurrent.RejectedExecutionException
17+
import java.util.concurrent.ScheduledExecutorService
18+
import java.util.concurrent.TimeUnit
19+
import java.util.concurrent.atomic.AtomicBoolean
20+
import java.util.concurrent.atomic.AtomicInteger
21+
import java.util.concurrent.locks.ReentrantLock
22+
import org.slf4j.LoggerFactory
23+
24+
/**
25+
* Batches run create/update operations and sends them to LangSmith in a single `ingestBatch`
26+
* request, reducing HTTP overhead.
27+
*
28+
* Operations are buffered and flushed either:
29+
* - When the buffer reaches [batchSizeLimit] operations
30+
* - After [aggregationDelayMs] milliseconds of inactivity (timer-based drain)
31+
* - When [flush] is called explicitly
32+
*
33+
* @param sendBatch sends a batch and completes when the send has finished
34+
* @param batchSizeLimit max operations before auto-flush (default 100)
35+
* @param aggregationDelayMs delay before timer-based flush (default 250ms)
36+
* @param sendParallelism max number of batch requests to send concurrently (default 4)
37+
*/
38+
class AutoBatchQueue(
39+
private val sendBatch: (RunIngestBatchParams, RequestOptions) -> CompletionStage<Void?>,
40+
private val batchSizeLimit: Int = DEFAULT_BATCH_SIZE_LIMIT,
41+
private val aggregationDelayMs: Long = DEFAULT_AGGREGATION_DELAY_MS,
42+
private val sendParallelism: Int = DEFAULT_SEND_PARALLELISM,
43+
) {
44+
private val items = ConcurrentLinkedQueue<BatchItem>()
45+
private val queuedCount = AtomicInteger(0)
46+
private val shutdown = AtomicBoolean(false)
47+
private val delayedFlushScheduled = AtomicBoolean(false)
48+
private val enqueueShutdownLock = ReentrantLock()
49+
private val activeSends =
50+
object : Phaser(0) {
51+
override fun onAdvance(phase: Int, registeredParties: Int): Boolean = false
52+
}
53+
54+
private val coordinator: ScheduledExecutorService =
55+
Executors.newSingleThreadScheduledExecutor { r ->
56+
Thread(r, "langsmith-batch-coordinator").apply { isDaemon = true }
57+
}
58+
59+
private val sendExecutor: ExecutorService =
60+
Executors.newFixedThreadPool(sendParallelism) { r ->
61+
Thread(r, "langsmith-batch-sender").apply { isDaemon = true }
62+
}
63+
64+
/** Enqueues a run create operation. */
65+
fun post(
66+
run: Run,
67+
headers: Headers = Headers.builder().build(),
68+
queryParams: QueryParams = QueryParams.builder().build(),
69+
requestOptions: RequestOptions = RequestOptions.none(),
70+
) {
71+
enqueue(BatchOp.Post, run, headers, queryParams, requestOptions)
72+
}
73+
74+
/** Enqueues a run update (patch) operation. */
75+
fun patch(
76+
run: Run,
77+
headers: Headers = Headers.builder().build(),
78+
queryParams: QueryParams = QueryParams.builder().build(),
79+
requestOptions: RequestOptions = RequestOptions.none(),
80+
) {
81+
enqueue(BatchOp.Patch, run, headers, queryParams, requestOptions)
82+
}
83+
84+
/**
85+
* Flushes all queued operations immediately, blocking until batch requests that were queued or
86+
* already in-flight have completed.
87+
*
88+
* Safe to call from any thread. No-op if the queue is empty.
89+
*/
90+
fun flush() {
91+
while (true) {
92+
if (queuedCount.get() > 0 && !drainOnCoordinator()) {
93+
return
94+
}
95+
96+
if (!waitForActiveSends()) {
97+
return
98+
}
99+
100+
if (queuedCount.get() == 0 && !hasActiveSends()) {
101+
return
102+
}
103+
}
104+
}
105+
106+
/**
107+
* Flushes remaining operations and shuts down the background executors.
108+
*
109+
* After calling this, the queue will no longer accept new operations.
110+
*/
111+
fun shutdown() {
112+
enqueueShutdownLock.lock()
113+
try {
114+
// Serialize with enqueue's check-and-add so flush cannot miss an item that observed
115+
// shutdown=false but has not yet been queued.
116+
if (!shutdown.compareAndSet(false, true)) return
117+
} finally {
118+
enqueueShutdownLock.unlock()
119+
}
120+
121+
flush()
122+
coordinator.shutdown()
123+
sendExecutor.shutdown()
124+
125+
try {
126+
if (!coordinator.awaitTermination(5, TimeUnit.SECONDS)) {
127+
coordinator.shutdownNow()
128+
}
129+
if (!sendExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
130+
sendExecutor.shutdownNow()
131+
}
132+
} catch (_: InterruptedException) {
133+
coordinator.shutdownNow()
134+
sendExecutor.shutdownNow()
135+
Thread.currentThread().interrupt()
136+
}
137+
}
138+
139+
/** Returns the number of queued operations (for testing). */
140+
internal fun size(): Int = queuedCount.get()
141+
142+
private fun enqueue(
143+
op: BatchOp,
144+
run: Run,
145+
headers: Headers,
146+
queryParams: QueryParams,
147+
requestOptions: RequestOptions,
148+
) {
149+
val count = run {
150+
enqueueShutdownLock.lock()
151+
try {
152+
check(!shutdown.get()) { "AutoBatchQueue is shut down" }
153+
items.add(
154+
BatchItem(
155+
op = op,
156+
run = run,
157+
headers = headers,
158+
queryParams = queryParams,
159+
requestOptions = requestOptions,
160+
)
161+
)
162+
queuedCount.incrementAndGet()
163+
} finally {
164+
enqueueShutdownLock.unlock()
165+
}
166+
}
167+
168+
afterEnqueue(count)
169+
}
170+
171+
private fun afterEnqueue(count: Int) {
172+
if (count >= batchSizeLimit) {
173+
triggerFlush()
174+
} else {
175+
scheduleFlush()
176+
}
177+
}
178+
179+
private fun scheduleFlush() {
180+
if (!delayedFlushScheduled.compareAndSet(false, true)) return
181+
182+
try {
183+
coordinator.schedule(
184+
{
185+
delayedFlushScheduled.set(false)
186+
drainAndSubmitSends()
187+
},
188+
aggregationDelayMs,
189+
TimeUnit.MILLISECONDS,
190+
)
191+
} catch (e: RejectedExecutionException) {
192+
delayedFlushScheduled.set(false)
193+
logger.warn("Batch queue coordinator rejected delayed flush", e)
194+
}
195+
}
196+
197+
private fun triggerFlush() {
198+
try {
199+
coordinator.execute { drainAndSubmitSends() }
200+
} catch (e: RejectedExecutionException) {
201+
logger.warn("Batch queue coordinator rejected flush", e)
202+
}
203+
}
204+
205+
private fun drainOnCoordinator(): Boolean {
206+
val drainFuture =
207+
try {
208+
coordinator.submit { drainAndSubmitSends() }
209+
} catch (e: RejectedExecutionException) {
210+
throw IllegalStateException("Batch queue coordinator rejected flush", e)
211+
}
212+
213+
try {
214+
drainFuture.get()
215+
return true
216+
} catch (_: InterruptedException) {
217+
Thread.currentThread().interrupt()
218+
return false
219+
} catch (e: ExecutionException) {
220+
throw RuntimeException("Failed to flush batch queue", e.cause)
221+
}
222+
}
223+
224+
private fun drainAndSubmitSends() {
225+
val batches = drainUpTo(batchSizeLimit)
226+
if (batches.isEmpty()) return
227+
228+
batches.forEach(::submitBatch)
229+
230+
when {
231+
queuedCount.get() >= batchSizeLimit -> triggerFlush()
232+
queuedCount.get() > 0 && !shutdown.get() -> scheduleFlush()
233+
queuedCount.get() > 0 && shutdown.get() -> triggerFlush()
234+
}
235+
}
236+
237+
/**
238+
* Drains up to [maxItems] queued operations and returns batch params grouped by request
239+
* options.
240+
*
241+
* TODO: Merge create + update for the same run ID before sending (like the JS/Python SDKs).
242+
* This would reduce the number of operations in each batch when a run is created and
243+
* immediately updated (common for short-lived runs).
244+
* TODO: Also flush/split batches based on serialized payload size, not just operation count.
245+
* TODO: Support multipart ingest endpoint for large payloads with attachments.
246+
* TODO: Support gzip compression for batch requests.
247+
*/
248+
private fun drainUpTo(maxItems: Int): List<Batch> {
249+
val groups = linkedMapOf<RequestOptionsKey, BatchGroup>()
250+
var drained = 0
251+
252+
while (drained < maxItems) {
253+
val item = items.poll() ?: break
254+
queuedCount.decrementAndGet()
255+
drained++
256+
257+
val group =
258+
groups.getOrPut(item.requestOptions.key()) { BatchGroup(item.requestOptions) }
259+
when (item.op) {
260+
BatchOp.Post -> group.posts.add(item.run)
261+
BatchOp.Patch -> group.patches.add(item.run)
262+
}
263+
group.headers.putAll(item.headers)
264+
group.queryParams.putAll(item.queryParams)
265+
}
266+
267+
return groups.values.map { it.toBatch() }
268+
}
269+
270+
private fun submitBatch(batch: Batch) {
271+
activeSends.register()
272+
try {
273+
sendExecutor.execute {
274+
try {
275+
sendBatch(batch.params, batch.requestOptions).toCompletableFuture().join()
276+
} catch (e: CompletionException) {
277+
logger.warn("Failed to send batch of runs", e.cause ?: e)
278+
} catch (e: Exception) {
279+
logger.warn("Failed to send batch of runs", e)
280+
} finally {
281+
activeSends.arriveAndDeregister()
282+
}
283+
}
284+
} catch (e: RejectedExecutionException) {
285+
activeSends.arriveAndDeregister()
286+
logger.warn(
287+
"Batch queue sender rejected a batch; dropping {} run operations",
288+
operationCount(batch.params),
289+
e,
290+
)
291+
}
292+
}
293+
294+
private fun waitForActiveSends(): Boolean {
295+
while (hasActiveSends()) {
296+
val phase = activeSends.phase
297+
try {
298+
activeSends.awaitAdvanceInterruptibly(phase)
299+
} catch (_: InterruptedException) {
300+
Thread.currentThread().interrupt()
301+
return false
302+
}
303+
}
304+
return true
305+
}
306+
307+
private fun hasActiveSends(): Boolean = activeSends.registeredParties > 0
308+
309+
private fun operationCount(params: RunIngestBatchParams): Int =
310+
params.post().orElse(emptyList()).size + params.patch().orElse(emptyList()).size
311+
312+
private enum class BatchOp {
313+
Post,
314+
Patch,
315+
}
316+
317+
private data class Batch(val params: RunIngestBatchParams, val requestOptions: RequestOptions)
318+
319+
private data class BatchGroup(
320+
val requestOptions: RequestOptions,
321+
val posts: MutableList<Run> = mutableListOf(),
322+
val patches: MutableList<Run> = mutableListOf(),
323+
val headers: Headers.Builder = Headers.builder(),
324+
val queryParams: QueryParams.Builder = QueryParams.builder(),
325+
) {
326+
fun toBatch(): Batch {
327+
val builder = RunIngestBatchParams.builder()
328+
if (posts.isNotEmpty()) builder.post(posts)
329+
if (patches.isNotEmpty()) builder.patch(patches)
330+
builder.additionalHeaders(headers.build())
331+
builder.additionalQueryParams(queryParams.build())
332+
return Batch(params = builder.build(), requestOptions = requestOptions)
333+
}
334+
}
335+
336+
private data class BatchItem(
337+
val op: BatchOp,
338+
val run: Run,
339+
val headers: Headers,
340+
val queryParams: QueryParams,
341+
val requestOptions: RequestOptions,
342+
)
343+
344+
private data class RequestOptionsKey(val responseValidation: Boolean?, val timeout: Timeout?)
345+
346+
private fun RequestOptions.key(): RequestOptionsKey =
347+
RequestOptionsKey(responseValidation = responseValidation, timeout = timeout)
348+
349+
companion object {
350+
private val logger = LoggerFactory.getLogger(AutoBatchQueue::class.java)
351+
352+
const val DEFAULT_BATCH_SIZE_LIMIT = 100
353+
const val DEFAULT_AGGREGATION_DELAY_MS = 250L
354+
const val DEFAULT_SEND_PARALLELISM = 4
355+
}
356+
}

0 commit comments

Comments
 (0)