Skip to content

Commit 76da93f

Browse files
fix: fix streaming in java sdk (#95)
* fix: fix streaming in java sdk * chore: cleanups * chore: setting gen_ai.completion to json text * chore: lint fix * chore: comments * chore: lint * fix: type mismatch * chore: lint * chore: clean up building JSON
1 parent cf4696d commit 76da93f

5 files changed

Lines changed: 475 additions & 209 deletions

File tree

langsmith-java-core/src/main/kotlin/com/langchain/smith/wrappers/openai/OpenTelemetryConfig.kt

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.opentelemetry.sdk.OpenTelemetrySdk
66
import io.opentelemetry.sdk.common.CompletableResultCode
77
import io.opentelemetry.sdk.resources.Resource
88
import io.opentelemetry.sdk.trace.SdkTracerProvider
9+
import io.opentelemetry.sdk.trace.SpanProcessor
910
import io.opentelemetry.sdk.trace.data.SpanData
1011
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
1112
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor
@@ -84,11 +85,13 @@ object OpenTelemetryConfig {
8485
}
8586

8687
private fun buildOtlpEndpoint(baseUrl: String?): String {
87-
var effectiveBaseUrl = baseUrl
88-
if (effectiveBaseUrl.isNullOrEmpty()) effectiveBaseUrl = System.getenv("LANGSMITH_ENDPOINT")
89-
if (effectiveBaseUrl.isNullOrEmpty()) effectiveBaseUrl = DEFAULT_BASE_URL
90-
if (effectiveBaseUrl!!.endsWith("/")) effectiveBaseUrl = effectiveBaseUrl.dropLast(1)
91-
return effectiveBaseUrl + OTLP_TRACES_PATH
88+
val base =
89+
(baseUrl?.takeIf { it.isNotBlank() }
90+
?: System.getenv("LANGSMITH_ENDPOINT")?.takeIf { it.isNotBlank() }
91+
?: DEFAULT_BASE_URL)
92+
.trim()
93+
.removeSuffix("/")
94+
return base + OTLP_TRACES_PATH
9295
}
9396

9497
class Builder {
@@ -113,6 +116,31 @@ object OpenTelemetryConfig {
113116

114117
fun maxBatchSize(maxBatchSize: Int) = apply { this.maxBatchSize = maxBatchSize }
115118

119+
fun buildSpanProcessor(): SpanProcessor {
120+
require(!apiKey.isNullOrEmpty()) {
121+
"LangSmith API key is required. Set it using apiKey() or LANGSMITH_API_KEY environment variable."
122+
}
123+
val endpointUrl = buildOtlpEndpoint(baseUrl)
124+
val exporterBuilder =
125+
OtlpHttpSpanExporter.builder()
126+
.setEndpoint(endpointUrl)
127+
.addHeader("x-api-key", apiKey!!)
128+
if (!projectName.isNullOrEmpty()) {
129+
exporterBuilder.addHeader("Langsmith-Project", projectName!!)
130+
}
131+
val spanExporter = exporterBuilder.build()
132+
val loggingExporter = LoggingSpanExporter(spanExporter)
133+
return when (processorType) {
134+
SpanProcessorType.SIMPLE -> SimpleSpanProcessor.create(loggingExporter)
135+
SpanProcessorType.BATCH ->
136+
BatchSpanProcessor.builder(loggingExporter)
137+
.setScheduleDelay(100, TimeUnit.MILLISECONDS)
138+
.setMaxExportBatchSize(maxBatchSize)
139+
.setExporterTimeout(5, TimeUnit.SECONDS)
140+
.build()
141+
}
142+
}
143+
116144
fun build(): io.opentelemetry.api.OpenTelemetry {
117145
require(!apiKey.isNullOrEmpty()) {
118146
"LangSmith API key is required. Set it using apiKey() or LANGSMITH_API_KEY environment variable."
@@ -169,39 +197,35 @@ object OpenTelemetryConfig {
169197
}
170198
}
171199
val result = delegate.export(spans)
172-
if (DEBUG) {
173-
try {
174-
result.join(5, TimeUnit.SECONDS)
175-
if (!result.isSuccess) {
176-
logger.error(
177-
"[LangSmith ERROR] Failed to export ${spans.size} span(s) to LangSmith"
178-
)
179-
logExportException(result)
180-
for (span in spans) {
181-
logger.error(
182-
" - ${span.name} (traceId=${span.traceId}, spanId=${span.spanId})"
183-
)
184-
}
185-
logger.error(
186-
" This usually indicates a network error, authentication problem, or invalid span data"
187-
)
188-
logger.error(" Check your LANGSMITH_API_KEY and network connectivity")
189-
} else {
190-
logger.debug("[LangSmith] Successfully exported ${spans.size} span(s)")
191-
}
192-
} catch (e: Exception) {
193-
logger.error("[LangSmith ERROR] Exception waiting for export result", e)
194-
}
195-
} else {
196-
result.whenComplete {
197-
if (!result.isSuccess) {
200+
// Always wait for export to complete so flush/shutdown don't run before the HTTP
201+
// request finishes. Without this, shutdown can abort in-flight exports and traces are
202+
// lost.
203+
try {
204+
result.join(5, TimeUnit.SECONDS)
205+
} catch (e: Exception) {
206+
if (DEBUG) logger.error("[LangSmith ERROR] Exception waiting for export result", e)
207+
else logger.error("[LangSmith] Exception waiting for export result", e)
208+
}
209+
if (!result.isSuccess) {
210+
logger.error(
211+
"[LangSmith ERROR] Failed to export ${spans.size} span(s) to LangSmith"
212+
)
213+
logExportException(result)
214+
if (DEBUG) {
215+
for (span in spans) {
198216
logger.error(
199-
"[LangSmith ERROR] Failed to export ${spans.size} span(s) to LangSmith"
217+
" - ${span.name} (traceId=${span.traceId}, spanId=${span.spanId})"
200218
)
201-
logExportException(result)
202-
logger.error(" Set LANGSMITH_DEBUG=true for more details")
203219
}
220+
logger.error(
221+
" This usually indicates a network error, authentication problem, or invalid span data"
222+
)
223+
logger.error(" Check your LANGSMITH_API_KEY and network connectivity")
224+
} else {
225+
logger.error(" Set LANGSMITH_DEBUG=true for more details")
204226
}
227+
} else if (DEBUG) {
228+
logger.debug("[LangSmith] Successfully exported ${spans.size} span(s)")
205229
}
206230
return result
207231
}
@@ -222,6 +246,7 @@ object OpenTelemetryConfig {
222246
}
223247

224248
companion object {
249+
private val logger = LoggerFactory.getLogger(LoggingSpanExporter::class.java)
225250
private val DEBUG =
226251
java.lang.Boolean.getBoolean("langsmith.debug") ||
227252
"true".equals(System.getenv("LANGSMITH_DEBUG"), ignoreCase = true)
Lines changed: 36 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.langchain.smith.wrappers.openai
22

3+
import com.fasterxml.jackson.databind.ObjectMapper
4+
import io.opentelemetry.api.common.AttributeKey
35
import io.opentelemetry.api.trace.Span
46
import io.opentelemetry.api.trace.SpanBuilder
57
import io.opentelemetry.api.trace.SpanKind
@@ -8,26 +10,10 @@ import io.opentelemetry.api.trace.Tracer
810
/** Internal utility for OpenTelemetry span creation and management. */
911
internal object TracingUtils {
1012
private const val INSTRUMENTATION_NAME = "langsmith-java-otel-wrappers"
13+
private val jsonMapper = ObjectMapper()
1114

12-
fun getTracer(): Tracer {
13-
return try {
14-
val tracer =
15-
io.opentelemetry.api.GlobalOpenTelemetry.get().getTracer(INSTRUMENTATION_NAME)
16-
val debug =
17-
java.lang.Boolean.getBoolean("langsmith.debug") ||
18-
"true".equals(System.getenv("LANGSMITH_DEBUG"), ignoreCase = true)
19-
if (debug) {
20-
val otel = io.opentelemetry.api.GlobalOpenTelemetry.get()
21-
val isNoop = otel.javaClass.name.contains("Noop")
22-
println(
23-
"[TracingUtils] Tracer obtained: ${tracer.javaClass.name}, OpenTelemetry isNoop: $isNoop"
24-
)
25-
}
26-
tracer
27-
} catch (e: Exception) {
28-
io.opentelemetry.api.GlobalOpenTelemetry.get().getTracer(INSTRUMENTATION_NAME)
29-
}
30-
}
15+
fun getTracer(): Tracer =
16+
io.opentelemetry.api.GlobalOpenTelemetry.get().getTracer(INSTRUMENTATION_NAME)
3117

3218
fun createSpanBuilder(
3319
model: String?,
@@ -40,29 +26,33 @@ internal object TracingUtils {
4026
tracer
4127
.spanBuilder(spanName)
4228
.setSpanKind(SpanKind.CLIENT)
43-
.setAttribute("gen_ai.system", "openai")
44-
.setAttribute("gen_ai.operation.name", operationType)
45-
.setAttribute("gen_ai.provider.name", "openai")
46-
spanKind?.let { builder.setAttribute("langsmith.span.kind", it) }
29+
.setAttribute(AttributeKey.stringKey("gen_ai.system"), "openai")
30+
.setAttribute(AttributeKey.stringKey("gen_ai.operation.name"), operationType)
31+
.setAttribute(AttributeKey.stringKey("gen_ai.provider.name"), "openai")
32+
spanKind?.let { builder.setAttribute(AttributeKey.stringKey("langsmith.span.kind"), it) }
4733
return builder
4834
}
4935

5036
fun setRequestAttributes(span: Span, model: String?) {
51-
model?.let { span.setAttribute("gen_ai.request.model", it) }
37+
model?.let { span.setAttribute(AttributeKey.stringKey("gen_ai.request.model"), it) }
5238
}
5339

5440
fun setRequestParameters(span: Span, temperature: Double?, topP: Double?, maxTokens: Long?) {
55-
temperature?.let { span.setAttribute("gen_ai.request.temperature", it) }
56-
topP?.let { span.setAttribute("gen_ai.request.top_p", it) }
57-
maxTokens?.let { span.setAttribute("gen_ai.request.max_tokens", it) }
41+
temperature?.let {
42+
span.setAttribute(AttributeKey.doubleKey("gen_ai.request.temperature"), it)
43+
}
44+
topP?.let { span.setAttribute(AttributeKey.doubleKey("gen_ai.request.top_p"), it) }
45+
maxTokens?.let { span.setAttribute(AttributeKey.longKey("gen_ai.request.max_tokens"), it) }
5846
}
5947

6048
fun setInputMessages(span: Span, messagesJson: String?) {
61-
messagesJson?.let { span.setAttribute("gen_ai.input.messages", it) }
49+
messagesJson?.let { span.setAttribute(AttributeKey.stringKey("gen_ai.input.messages"), it) }
6250
}
6351

6452
fun setOutputMessages(span: Span, messagesJson: String?) {
65-
messagesJson?.let { span.setAttribute("gen_ai.output.messages", it) }
53+
messagesJson?.let {
54+
span.setAttribute(AttributeKey.stringKey("gen_ai.output.messages"), it)
55+
}
6656
}
6757

6858
fun setResponseAttributes(
@@ -71,27 +61,30 @@ internal object TracingUtils {
7161
outputTokens: Long?,
7262
totalTokens: Long?,
7363
) {
74-
inputTokens?.let { span.setAttribute("gen_ai.usage.input_tokens", it) }
75-
outputTokens?.let { span.setAttribute("gen_ai.usage.output_tokens", it) }
76-
totalTokens?.let { span.setAttribute("gen_ai.usage.total_tokens", it) }
64+
inputTokens?.let {
65+
span.setAttribute(AttributeKey.longKey("gen_ai.usage.input_tokens"), it)
66+
}
67+
outputTokens?.let {
68+
span.setAttribute(AttributeKey.longKey("gen_ai.usage.output_tokens"), it)
69+
}
70+
totalTokens?.let {
71+
span.setAttribute(AttributeKey.longKey("gen_ai.usage.total_tokens"), it)
72+
}
7773
}
7874

7975
fun setResponseMetadata(span: Span, responseModel: String?, finishReason: String?) {
80-
responseModel?.let { span.setAttribute("gen_ai.response.model", it) }
81-
finishReason?.let { span.setAttribute("gen_ai.response.finish_reason", it) }
76+
responseModel?.let {
77+
span.setAttribute(AttributeKey.stringKey("gen_ai.response.model"), it)
78+
}
79+
finishReason?.let {
80+
span.setAttribute(AttributeKey.stringKey("gen_ai.response.finish_reason"), it)
81+
}
8282
}
8383

8484
fun recordException(span: Span, exception: Throwable) {
8585
span.recordException(exception)
86-
span.setAttribute("error", true)
86+
span.setAttribute(AttributeKey.booleanKey("error"), true)
8787
}
8888

89-
fun escapeJsonString(str: String?): String {
90-
if (str == null) return ""
91-
return str.replace("\\", "\\\\")
92-
.replace("\"", "\\\"")
93-
.replace("\n", "\\n")
94-
.replace("\r", "\\r")
95-
.replace("\t", "\\t")
96-
}
89+
fun writeJson(value: Any): String = jsonMapper.writeValueAsString(value)
9790
}

0 commit comments

Comments
 (0)