Skip to content

Commit ee2991f

Browse files
authored
feat: Adds processInputs and processOutputs to traceable (#113)
* Adds processInputs and processOutputs to traceable * Move generics into TraceProcessIO to avoid having them top level * Fix docstring, nit
1 parent 66b5f27 commit ee2991f

7 files changed

Lines changed: 597 additions & 98 deletions

File tree

langsmith-java-core/src/main/kotlin/com/langchain/smith/tracing/RunTree.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ private val logger = LoggerFactory.getLogger(RunTree::class.java)
1515
/**
1616
* Represents a run in the trace tree.
1717
*
18+
* **Thread safety:** `RunTree` is mutable — [inputs], [outputs], [error], [endTime], [metadata],
19+
* [tags], and [extra] can be modified after construction. A single `RunTree` instance should only
20+
* be accessed from one thread at a time. Within a [traceable] wrapper this is guaranteed by the
21+
* run-context scoping, but if you hold a reference to a `RunTree` and access it from multiple
22+
* threads, external synchronization is required.
23+
*
1824
* Can be created manually, via [traceable], or via [createChild]:
1925
* ```kotlin
2026
* // Via traceable (most common)
@@ -36,8 +42,7 @@ private val logger = LoggerFactory.getLogger(RunTree::class.java)
3642
* .build();
3743
* ```
3844
*/
39-
class RunTree
40-
constructor(
45+
class RunTree(
4146
/** The display name of this run. */
4247
val name: String = "<lambda>",
4348
/** The type of run (chain, llm, tool, retriever). */
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
@file:JvmName("TraceProcessIOHelper")
2+
3+
package com.langchain.smith.tracing
4+
5+
import java.util.function.Function
6+
7+
/**
8+
* Typed processors for transforming inputs and outputs before they are recorded on a traced run.
9+
*
10+
* The type parameters describe what the processors receive:
11+
* - [PI] — the type passed to [inputs]. For 1-arg traced functions this is the raw input type; for
12+
* 0/2/3-arg functions this is `Map<String, Any?>` (the packed representation).
13+
* - [PO] — the type passed to [outputs]. This is always the raw output type of the traced function.
14+
*
15+
* When [inputs] is set, it replaces the default input serialization entirely. Same for [outputs].
16+
*
17+
* ## Example (Kotlin)
18+
*
19+
* ```kotlin
20+
* val process = TraceProcessIO<String, MyResponse>(
21+
* processInputs = Function { input -> mapOf("query" to input) },
22+
* processOutputs = Function { resp -> mapOf("answer" to resp.text) },
23+
* )
24+
* val config = TraceConfig(name = "my-run", client = client, processTracedIO = process)
25+
* ```
26+
*
27+
* ## Example (Java)
28+
*
29+
* ```java
30+
* TraceProcessIO process = TraceProcessIO.<String, MyResponse>builder()
31+
* .processInputs(input -> Map.of("query", input))
32+
* .processOutputs(resp -> Map.of("answer", resp.getText()))
33+
* .build();
34+
* TraceConfig config = TraceConfig.builder()
35+
* .name("my-run")
36+
* .client(client)
37+
* .processTracedIO(process)
38+
* .build();
39+
* ```
40+
*
41+
* @see TraceConfig.processTracedIO
42+
*/
43+
class TraceProcessIO<PI, PO>(
44+
processInputs: Function<PI, Map<String, Any?>>? = null,
45+
processOutputs: Function<PO, Map<String, Any?>>? = null,
46+
) {
47+
// Store erased wrappers so traceable overloads can call these without casts.
48+
// erase() converts Function<T, R> to Function<Any?, R> — safe because T erases to Object.
49+
internal val inputsFn: Function<Any?, Map<String, Any?>>? = processInputs?.erase()
50+
internal val outputsFn: Function<Any?, Map<String, Any?>>? = processOutputs?.erase()
51+
52+
companion object {
53+
/** Creates a new [Builder]. */
54+
@JvmStatic fun <PI, PO> builder() = Builder<PI, PO>()
55+
56+
/**
57+
* Converts a typed [Function] to an erased `Function<Any?, R>`.
58+
*
59+
* Safe because generic type parameters erase to `Object` at runtime — the JVM never
60+
* actually checks the input type on `Function.apply`.
61+
*/
62+
@JvmSynthetic
63+
@JvmStatic
64+
@Suppress("UNCHECKED_CAST")
65+
private fun <T, R> Function<T, R>.erase(): Function<Any?, R> = this as Function<Any?, R>
66+
}
67+
68+
/**
69+
* A builder for [TraceProcessIO].
70+
*
71+
* ```java
72+
* TraceProcessIO process = TraceProcessIO.<String, String>builder()
73+
* .processInputs(input -> Map.of("query", input))
74+
* .processOutputs(output -> Map.of("answer", output))
75+
* .build();
76+
* ```
77+
*/
78+
class Builder<PI, PO> internal constructor() {
79+
private var processInputs: Function<PI, Map<String, Any?>>? = null
80+
private var processOutputs: Function<PO, Map<String, Any?>>? = null
81+
82+
/** Callback to transform the input before it is recorded on the run. */
83+
fun processInputs(processInputs: Function<PI, Map<String, Any?>>) = apply {
84+
this.processInputs = processInputs
85+
}
86+
87+
/** Callback to transform the output before it is recorded on the run. */
88+
fun processOutputs(processOutputs: Function<PO, Map<String, Any?>>) = apply {
89+
this.processOutputs = processOutputs
90+
}
91+
92+
/** Builds the [TraceProcessIO]. */
93+
fun build() = TraceProcessIO(processInputs = processInputs, processOutputs = processOutputs)
94+
}
95+
96+
override fun toString(): String = "TraceProcessIO{inputs=$inputsFn, outputs=$outputsFn}"
97+
}

langsmith-java-core/src/main/kotlin/com/langchain/smith/tracing/Traceable.kt

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,27 @@ class TraceConfig(
233233
val executor: ExecutorService? = null,
234234
/** Whether tracing is active. Inherited by children. */
235235
val tracingEnabled: Boolean? = null,
236+
/**
237+
* Optional typed processors for transforming inputs/outputs before they are recorded on a run.
238+
*
239+
* When set, the processors in [TraceProcessIO] replace the default input/output serialization.
240+
* The generic type parameters on [TraceProcessIO] control what the processor callbacks receive
241+
* — see [TraceProcessIO] for details.
242+
*
243+
* ```java
244+
* TraceProcessIO process = TraceProcessIO.<String, MyResponse>builder()
245+
* .processInputs(input -> Map.of("query", input))
246+
* .processOutputs(resp -> Map.of("answer", resp.getText()))
247+
* .build();
248+
* TraceConfig config = TraceConfig.builder()
249+
* .name("my-run")
250+
* .processTracedIO(process)
251+
* .build();
252+
* ```
253+
*
254+
* @see TraceProcessIO
255+
*/
256+
val processTracedIO: TraceProcessIO<*, *>? = null,
236257
) {
237258
companion object {
238259
/** Creates a new [Builder] for constructing a [TraceConfig]. */
@@ -250,9 +271,6 @@ class TraceConfig(
250271
* .name("my-run")
251272
* .client(LangsmithOkHttpClient.fromEnv())
252273
* .runType(RunType.LLM)
253-
* .metadata(Map.of("version", "1.0"))
254-
* .tags(List.of("prod"))
255-
* .projectName("my-project")
256274
* .build();
257275
* ```
258276
*/
@@ -265,6 +283,7 @@ class TraceConfig(
265283
private var projectName: String? = null
266284
private var executor: ExecutorService? = null
267285
private var tracingEnabled: Boolean? = null
286+
private var processTracedIO: TraceProcessIO<*, *>? = null
268287

269288
@JvmSynthetic
270289
internal fun from(config: TraceConfig) = apply {
@@ -276,6 +295,7 @@ class TraceConfig(
276295
projectName = config.projectName
277296
executor = config.executor
278297
tracingEnabled = config.tracingEnabled
298+
processTracedIO = config.processTracedIO
279299
}
280300

281301
/** The name of the run, displayed in LangSmith. */
@@ -302,6 +322,16 @@ class TraceConfig(
302322
/** Whether tracing is active. Inherited by children if not set. */
303323
fun tracingEnabled(tracingEnabled: Boolean) = apply { this.tracingEnabled = tracingEnabled }
304324

325+
/**
326+
* Typed processors for transforming inputs/outputs before they are recorded.
327+
*
328+
* @see TraceConfig.processTracedIO
329+
* @see TraceProcessIO
330+
*/
331+
fun processTracedIO(processTracedIO: TraceProcessIO<*, *>) = apply {
332+
this.processTracedIO = processTracedIO
333+
}
334+
305335
/** Builds the [TraceConfig]. */
306336
fun build() =
307337
TraceConfig(
@@ -313,12 +343,21 @@ class TraceConfig(
313343
projectName = projectName,
314344
executor = executor,
315345
tracingEnabled = tracingEnabled,
346+
processTracedIO = processTracedIO,
316347
)
317348
}
318349
}
319350

320351
private const val DEFAULT_RUN_NAME = "<lambda>"
321352

353+
/** Applies processInputs from the config, or returns null if not set. */
354+
private fun applyProcessInputs(config: TraceConfig, value: Any?): Map<String, Any?>? =
355+
config.processTracedIO?.inputsFn?.apply(value)
356+
357+
/** Applies processOutputs from the config, or returns null if not set. */
358+
private fun applyProcessOutputs(config: TraceConfig, value: Any?): Map<String, Any?>? =
359+
config.processTracedIO?.outputsFn?.apply(value)
360+
322361
/** Resolves the run name from the config and the function being wrapped. */
323362
private fun resolveName(config: TraceConfig, block: Any?): String {
324363
config.name?.let {
@@ -328,6 +367,12 @@ private fun resolveName(config: TraceConfig, block: Any?): String {
328367
return DEFAULT_RUN_NAME
329368
}
330369

370+
/** Creates a copy of [config] with the resolved name set. */
371+
private fun resolveConfig(config: TraceConfig, block: Any?): TraceConfig {
372+
val name = resolveName(config, block)
373+
return if (name == config.name) config else config.toBuilder().name(name).build()
374+
}
375+
331376
/**
332377
* Wraps a no-arg function with LangSmith tracing (Kotlin).
333378
*
@@ -339,14 +384,22 @@ private fun resolveName(config: TraceConfig, block: Any?): String {
339384
*/
340385
@JvmSynthetic
341386
fun <O> traceable(block: () -> O, config: TraceConfig): () -> O {
342-
val resolvedConfig = config.toBuilder().name(resolveName(config, block)).build()
343-
return { executeTraced(resolvedConfig, emptyMap()) { block() } }
387+
val resolvedConfig = resolveConfig(config, block)
388+
return {
389+
val packed = emptyMap<String, Any?>()
390+
val serializedInputs = applyProcessInputs(resolvedConfig, packed) ?: packed
391+
executeTraced(resolvedConfig, serializedInputs) { block() }
392+
}
344393
}
345394

346395
/** Wraps a no-arg function with LangSmith tracing (Java [Supplier]). */
347396
fun <O> traceable(block: Supplier<O>, config: TraceConfig): Supplier<O> {
348-
val resolvedConfig = config.toBuilder().name(resolveName(config, block)).build()
349-
return Supplier { executeTraced(resolvedConfig, emptyMap()) { block.get() } }
397+
val resolvedConfig = resolveConfig(config, block)
398+
return Supplier {
399+
val packed = emptyMap<String, Any?>()
400+
val serializedInputs = applyProcessInputs(resolvedConfig, packed) ?: packed
401+
executeTraced(resolvedConfig, serializedInputs) { block.get() }
402+
}
350403
}
351404

352405
/**
@@ -393,10 +446,10 @@ fun <O> traceable(block: Supplier<O>, config: TraceConfig): Supplier<O> {
393446
*/
394447
@JvmSynthetic
395448
fun <I, O> traceable(block: (I) -> O, config: TraceConfig): (I) -> O {
396-
val resolvedConfig = config.toBuilder().name(resolveName(config, block)).build()
449+
val resolvedConfig = resolveConfig(config, block)
397450
return { input ->
398-
val inputs = toInputMap(input)
399-
executeTraced(resolvedConfig, inputs) { block(input) }
451+
val serializedInputs = applyProcessInputs(resolvedConfig, input) ?: toInputMap(input)
452+
executeTraced(resolvedConfig, serializedInputs) { block(input) }
400453
}
401454
}
402455

@@ -409,10 +462,11 @@ fun <I, O> traceable(block: Function<I, O>, config: TraceConfig): Function<I, O>
409462
/** Wraps a 2-arg function with LangSmith tracing (Kotlin). */
410463
@JvmSynthetic
411464
fun <I1, I2, O> traceable(block: (I1, I2) -> O, config: TraceConfig): (I1, I2) -> O {
412-
val resolvedConfig = config.toBuilder().name(resolveName(config, block)).build()
465+
val resolvedConfig = resolveConfig(config, block)
413466
return { i1, i2 ->
414-
val inputs = mapOf("args" to listOf(i1, i2))
415-
executeTraced(resolvedConfig, inputs) { block(i1, i2) }
467+
val packed = mapOf<String, Any?>("args" to listOf(i1, i2))
468+
val serializedInputs = applyProcessInputs(resolvedConfig, packed) ?: packed
469+
executeTraced(resolvedConfig, serializedInputs) { block(i1, i2) }
416470
}
417471
}
418472

@@ -428,10 +482,11 @@ fun <I1, I2, O> traceable(
428482
/** Wraps a 3-arg function with LangSmith tracing (Kotlin). */
429483
@JvmSynthetic
430484
fun <I1, I2, I3, O> traceable(block: (I1, I2, I3) -> O, config: TraceConfig): (I1, I2, I3) -> O {
431-
val resolvedConfig = config.toBuilder().name(resolveName(config, block)).build()
485+
val resolvedConfig = resolveConfig(config, block)
432486
return { i1, i2, i3 ->
433-
val inputs = mapOf("args" to listOf(i1, i2, i3))
434-
executeTraced(resolvedConfig, inputs) { block(i1, i2, i3) }
487+
val packed = mapOf<String, Any?>("args" to listOf(i1, i2, i3))
488+
val serializedInputs = applyProcessInputs(resolvedConfig, packed) ?: packed
489+
executeTraced(resolvedConfig, serializedInputs) { block(i1, i2, i3) }
435490
}
436491
}
437492

@@ -540,7 +595,7 @@ fun <I1, I2, I3, O> traceTriFunction(
540595
*
541596
* For best tracing results, pass [Map] inputs/outputs to [traceable]. Typed SDK objects (e.g.
542597
* `ChatCompletionCreateParams`) should be converted to maps by the caller — use
543-
* `processInputs`/`processOutputs` callbacks (when available) or manual conversion.
598+
* [TraceConfig.processTracedIO] callbacks or manual conversion.
544599
*/
545600
private fun serializeValue(value: Any?): Any? =
546601
when (value) {
@@ -630,7 +685,7 @@ private fun <T> executeTraced(config: TraceConfig, inputs: Map<String, Any?>?, b
630685
try {
631686
val result = block()
632687
run.endTime = nowIso()
633-
run.outputs = toOutputMap(result)
688+
run.outputs = applyProcessOutputs(config, result) ?: toOutputMap(result)
634689
run.patchRun()
635690
result
636691
} catch (e: Throwable) {

langsmith-java-core/src/test/java/com/langchain/smith/tracing/TraceableJavaTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44

55
import com.langchain.smith.client.LangsmithClient;
6+
import java.util.Collections;
67
import java.util.function.BiFunction;
78
import java.util.function.Function;
89
import java.util.function.Supplier;
@@ -158,4 +159,50 @@ void tracingDisabled_getCurrentRunReturnsNull() {
158159
Function<String, RunTree> traced = Tracing.traceFunction(input -> Tracing.getCurrentRunTree(), disabled);
159160
assertThat(traced.apply("hello")).isNull();
160161
}
162+
163+
// ---- processInputs / processOutputs ----
164+
165+
@Test
166+
void processInputs_viaProcessTracedIO() {
167+
TraceConfig cfg = TraceConfig.builder()
168+
.name("process-inputs-test")
169+
.client(client)
170+
.tracingEnabled(true)
171+
.processTracedIO(TraceProcessIO.<String, RunTree>builder()
172+
.processInputs(input -> Collections.singletonMap("query", input))
173+
.build())
174+
.build();
175+
Function<String, RunTree> traced = Tracing.traceFunction(input -> Tracing.getCurrentRunTree(), cfg);
176+
RunTree run = traced.apply("hello");
177+
assertThat(run.getInputs()).containsEntry("query", "hello");
178+
}
179+
180+
@Test
181+
void processOutputs_viaProcessTracedIO() {
182+
TraceConfig cfg = TraceConfig.builder()
183+
.name("process-outputs-test")
184+
.client(client)
185+
.tracingEnabled(true)
186+
.processTracedIO(TraceProcessIO.<String, String>builder()
187+
.processOutputs(output -> Collections.singletonMap("answer", output))
188+
.build())
189+
.build();
190+
Function<String, String> traced = Tracing.traceFunction(input -> "result", cfg);
191+
traced.apply("hello");
192+
}
193+
194+
@Test
195+
void processInputsAndOutputs_together() {
196+
TraceConfig cfg = TraceConfig.builder()
197+
.name("both-processors")
198+
.client(client)
199+
.tracingEnabled(true)
200+
.processTracedIO(TraceProcessIO.<String, String>builder()
201+
.processInputs(input -> Collections.singletonMap("q", input))
202+
.processOutputs(output -> Collections.singletonMap("a", output))
203+
.build())
204+
.build();
205+
Function<String, String> traced = Tracing.traceFunction(input -> "answer", cfg);
206+
assertThat(traced.apply("question")).isEqualTo("answer");
207+
}
161208
}

0 commit comments

Comments
 (0)