Skip to content

Commit 0217521

Browse files
authored
feat: Adds support for tracing streams with traceable (#117)
* Adds support for tracing streams with traceable * Polish * Make stream tracing opt-in * Rework to use a passthrough instead of a proxy * Record stream cancellations as errors * Feedback * Format and add to AGENTS.md
1 parent 32b3fee commit 0217521

4 files changed

Lines changed: 336 additions & 6 deletions

File tree

AGENTS.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ override fun toString(): String =
9191
"}"
9292
```
9393

94+
### Prefer extension functions over type casts
95+
96+
When adding behavior to a type you don't own, use a `private` extension function instead of casting to an implementation type:
97+
98+
```kotlin
99+
// Good — extension function, no cast needed
100+
private fun Stream<*>.withErrorTracking(
101+
errorRef: AtomicReference<Throwable>,
102+
exhaustedRef: AtomicBoolean,
103+
): Stream<Any?> { ... }
104+
105+
val instrumented = result.withErrorTracking(iterationError, streamExhausted)
106+
107+
// Bad — casting to implementation type
108+
val instrumented = wrapStreamWithErrorCapture(result, iterationError, streamExhausted)
109+
// or worse:
110+
(runs as? RunServiceImpl)?.flush()
111+
```
112+
94113
### Use `partition` instead of double `filter`
95114

96115
```kotlin
@@ -211,7 +230,7 @@ Tests skip gracefully via `assumeTrue` if keys are missing.
211230
## Code style
212231

213232
- `toString()` should be single-line, following the `ClassName{field=value, field=value}` convention used by the rest of the SDK.
214-
- Avoid `@Suppress("UNCHECKED_CAST")` — restructure code to use safe patterns (`as? String`, `is Map<*, *>` with `entries.associate`, etc).
233+
- Avoid `@Suppress("UNCHECKED_CAST")` — restructure code to use safe patterns (`as? String`, `is Map<*, *>` with `entries.associate`, etc). When unavoidable (e.g. generic type erasure after an `is` check), add a comment explaining why the cast is safe.
215234
- Use named arguments for constructor/function calls with 2+ parameters, especially when types could be confused:
216235
```kotlin
217236
// Good
@@ -224,6 +243,7 @@ Tests skip gracefully via `assumeTrue` if keys are missing.
224243
// Bad — positional args are ambiguous
225244
PromptMessage(PromptMessage.Role.HUMAN, template, templateFormat = templateFormat)
226245
```
246+
- Name functions from the caller's perspective — describe what the caller gets, not what the function does internally. Prefer `stream.withErrorTracking()` over `wrapStreamWithErrorCapture(stream)`.
227247
- When an `Optional` has a fallback default, use `orElse(default)` directly instead of `orElse(null) ?: default`:
228248
```kotlin
229249
// Good — default goes straight into orElse

langsmith-java-core/src/main/kotlin/com/langchain/smith/tracing/TraceProcessIO.kt

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import java.util.function.Function
1313
* - **0-arg** ([traceSupplier]): `Map<String, Any?>` (empty map)
1414
* - **2-arg** ([traceBiFunction]): `Pair<I1, I2>`
1515
* - **3-arg** ([traceTriFunction]): `Triple<I1, I2, I3>`
16-
* - [PO] — the type passed to `processOutputs`. This is always the raw output type of the traced
17-
* function.
16+
* - [PO] — the type passed to `processOutputs`. For non-streaming functions, this is the raw return
17+
* type. For streaming functions (when [aggregator] is set), this is the **aggregated output
18+
* type** returned by the aggregator — not the stream type itself.
1819
*
19-
* When [inputs] is set, it replaces the default input serialization entirely. Same for [outputs].
20+
* When `processInputs` is set, it replaces the default input serialization entirely. Same for
21+
* `processOutputs`.
2022
*
2123
* ## Example (Kotlin)
2224
*
@@ -47,12 +49,32 @@ import java.util.function.Function
4749
class TraceProcessIO<PI, PO>(
4850
processInputs: Function<PI, Map<String, Any?>>? = null,
4951
processOutputs: Function<PO, Map<String, Any?>>? = null,
52+
aggregator: Function<List<Any?>, Any?>? = null,
5053
) {
5154
// Store erased wrappers so traceable overloads can call these without casts.
5255
// erase() converts Function<T, R> to Function<Any?, R> — safe because T erases to Object.
5356
internal val inputsFn: Function<Any?, Map<String, Any?>>? = processInputs?.erase()
5457
internal val outputsFn: Function<Any?, Map<String, Any?>>? = processOutputs?.erase()
5558

59+
/**
60+
* Aggregator for streaming results. Setting this opts into stream tracing.
61+
*
62+
* When set and the traced function returns a [java.util.stream.Stream], `traceable` will:
63+
* 1. Tee the stream via `peek()` to collect elements as the caller iterates
64+
* 2. Register an `onClose()` handler that calls this aggregator with the collected chunks
65+
* 3. Pass the aggregated result through [processOutputs] and record it as run output
66+
*
67+
* The caller gets back a real [java.util.stream.Stream] — no proxy, no type changes.
68+
*
69+
* Without an aggregator, `Stream` return values are treated normally — the run is completed
70+
* immediately on return, same as any other value.
71+
*
72+
* **Lifecycle:** The run is finalized when the stream's `onClose` handler runs. Callers must
73+
* close the stream (via `use {}` in Kotlin or try-with-resources in Java) to ensure the run is
74+
* completed. Abandoned streams will leave runs open.
75+
*/
76+
internal val aggregatorFn: Function<List<Any?>, Any?>? = aggregator
77+
5678
companion object {
5779
/** Creates a new [Builder]. */
5880
@JvmStatic fun <PI, PO> builder() = Builder<PI, PO>()
@@ -82,6 +104,7 @@ class TraceProcessIO<PI, PO>(
82104
class Builder<PI, PO> internal constructor() {
83105
private var processInputs: Function<PI, Map<String, Any?>>? = null
84106
private var processOutputs: Function<PO, Map<String, Any?>>? = null
107+
private var aggregator: Function<List<Any?>, Any?>? = null
85108

86109
/** Callback to transform the input before it is recorded on the run. */
87110
fun processInputs(processInputs: Function<PI, Map<String, Any?>>) = apply {
@@ -93,9 +116,23 @@ class TraceProcessIO<PI, PO>(
93116
this.processOutputs = processOutputs
94117
}
95118

119+
/**
120+
* Aggregator for streaming results. Reduces collected chunks into a single output that is
121+
* then passed through [processOutputs].
122+
*/
123+
fun aggregator(aggregator: Function<List<Any?>, Any?>) = apply {
124+
this.aggregator = aggregator
125+
}
126+
96127
/** Builds the [TraceProcessIO]. */
97-
fun build() = TraceProcessIO(processInputs = processInputs, processOutputs = processOutputs)
128+
fun build() =
129+
TraceProcessIO(
130+
processInputs = processInputs,
131+
processOutputs = processOutputs,
132+
aggregator = aggregator,
133+
)
98134
}
99135

100-
override fun toString(): String = "TraceProcessIO{inputs=$inputsFn, outputs=$outputsFn}"
136+
override fun toString(): String =
137+
"TraceProcessIO{inputs=$inputsFn, outputs=$outputsFn, aggregator=$aggregatorFn}"
101138
}

langsmith-java-core/src/main/kotlin/com/langchain/smith/tracing/Traceable.kt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,54 @@ private fun <T> executeTraced(config: TraceConfig, inputs: Map<String, Any?>?, b
686686
return CURRENT_RUN.runWith(run) {
687687
try {
688688
val result = block()
689+
690+
// If the result is a java.util.stream.Stream and an aggregator is configured,
691+
// instrument it: collect chunks via peek(), finalize the run via onClose().
692+
val aggregator = config.processTracedIO?.aggregatorFn
693+
if (aggregator != null && result is java.util.stream.Stream<*>) {
694+
val chunks = mutableListOf<Any?>()
695+
val iterationError = java.util.concurrent.atomic.AtomicReference<Throwable>(null)
696+
val streamExhausted = java.util.concurrent.atomic.AtomicBoolean(false)
697+
val instrumented =
698+
result
699+
.withErrorTracking(iterationError, streamExhausted)
700+
.peek { chunks.add(it) }
701+
.onClose {
702+
try {
703+
// Always try to aggregate whatever chunks we have
704+
var aggregationError: Throwable? = null
705+
if (chunks.isNotEmpty()) {
706+
try {
707+
val output = aggregator.apply(chunks)
708+
run.outputs =
709+
applyProcessOutputs(config, output)
710+
?: toOutputMap(output)
711+
} catch (e: Throwable) {
712+
aggregationError = e
713+
}
714+
}
715+
716+
val error = iterationError.get()
717+
run.endTime = nowIso()
718+
if (error != null) {
719+
run.error = error.stackTraceToString()
720+
} else if (!streamExhausted.get()) {
721+
run.error = "Stream cancelled"
722+
} else if (aggregationError != null) {
723+
run.error = aggregationError.stackTraceToString()
724+
}
725+
run.patchRun()
726+
} catch (e: Throwable) {
727+
run.endTime = nowIso()
728+
run.error = e.stackTraceToString()
729+
run.patchRun()
730+
}
731+
}
732+
// Safe: T is Stream<something>, instrumented is Stream<Any?> — same erasure.
733+
@Suppress("UNCHECKED_CAST")
734+
return@runWith instrumented as T
735+
}
736+
689737
run.endTime = nowIso()
690738
run.outputs = applyProcessOutputs(config, result) ?: toOutputMap(result)
691739
run.patchRun()
@@ -698,3 +746,32 @@ private fun <T> executeTraced(config: TraceConfig, inputs: Map<String, Any?>?, b
698746
}
699747
}
700748
}
749+
750+
/**
751+
* Returns a new [java.util.stream.Stream] that captures exceptions during iteration into [errorRef]
752+
* before rethrowing them. This lets the `onClose` handler record the real error instead of
753+
* attempting aggregation on partial data.
754+
*/
755+
private fun java.util.stream.Stream<*>.withErrorTracking(
756+
errorRef: java.util.concurrent.atomic.AtomicReference<Throwable>,
757+
exhaustedRef: java.util.concurrent.atomic.AtomicBoolean,
758+
): java.util.stream.Stream<Any?> {
759+
val delegate = this.spliterator()
760+
val capturing =
761+
object : java.util.Spliterator<Any?> {
762+
override fun tryAdvance(action: java.util.function.Consumer<in Any?>): Boolean =
763+
try {
764+
delegate.tryAdvance(action).also { if (!it) exhaustedRef.set(true) }
765+
} catch (e: Throwable) {
766+
errorRef.compareAndSet(null, e)
767+
throw e
768+
}
769+
770+
override fun trySplit(): java.util.Spliterator<Any?>? = null
771+
772+
override fun estimateSize(): Long = delegate.estimateSize()
773+
774+
override fun characteristics(): Int = delegate.characteristics()
775+
}
776+
return java.util.stream.StreamSupport.stream(capturing, false).onClose { this.close() }
777+
}

0 commit comments

Comments
 (0)