Skip to content

Commit e412191

Browse files
authored
feat: Add zstd compression to runs service (#136)
* Add zstd compression * Merge * Fix * nits * Feedback * Fix build
1 parent 1b82bb1 commit e412191

14 files changed

Lines changed: 671 additions & 65 deletions

File tree

langsmith-java-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies {
4343
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.6")
4444
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.18.6")
4545
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.6")
46+
implementation("com.github.luben:zstd-jni:1.5.7-7")
4647
implementation("org.apache.httpcomponents.core5:httpcore5:5.2.4")
4748
implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
4849

langsmith-java-core/src/main/kotlin/com/langchain/smith/client/AutoBatchIngestLimits.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.langchain.smith.client
22

3-
import com.langchain.smith.models.annotationqueues.info.InfoListResponse.BatchIngestConfig
3+
import com.langchain.smith.models.info.InfoListResponse.BatchIngestConfig
44
import kotlin.jvm.optionals.getOrNull
55

66
internal data class AutoBatchIngestLimits(

langsmith-java-core/src/main/kotlin/com/langchain/smith/client/AutoBatchQueue.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,10 @@ class AutoBatchQueue(
290290
* Drains up to [maxItems] queued operations and returns batch params grouped by request
291291
* options.
292292
*
293-
* TODO: Support multipart ingest endpoint for large payloads with attachments.
294-
* TODO: Support gzip compression for batch requests.
293+
* Multipart ingest compression is applied at send time; legacy `/runs/batch` remains
294+
* uncompressed.
295+
*
296+
* TODO: Support attachment parts in multipart ingest.
295297
*/
296298
private fun drainUpTo(maxItems: Int): DrainResult {
297299
val openGroups = linkedMapOf<RequestOptionsKey, BatchGroup>()

langsmith-java-core/src/main/kotlin/com/langchain/smith/core/http/HttpRequestBodies.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package com.langchain.smith.core.http
77
import com.fasterxml.jackson.databind.JsonNode
88
import com.fasterxml.jackson.databind.json.JsonMapper
99
import com.fasterxml.jackson.databind.node.JsonNodeType
10+
import com.github.luben.zstd.ZstdOutputStream
1011
import com.langchain.smith.core.MultipartField
1112
import com.langchain.smith.core.toImmutable
1213
import com.langchain.smith.errors.LangChainInvalidDataException
@@ -32,6 +33,25 @@ internal inline fun <reified T> json(jsonMapper: JsonMapper, value: T): HttpRequ
3233
override fun close() {}
3334
}
3435

36+
@JvmSynthetic
37+
internal fun zstd(body: HttpRequestBody): HttpRequestBody =
38+
object : HttpRequestBody {
39+
40+
override fun writeTo(outputStream: OutputStream) {
41+
ZstdOutputStream(NonClosingOutputStream(outputStream)).setCloseFrameOnFlush(true).use {
42+
body.writeTo(it)
43+
}
44+
}
45+
46+
override fun contentType(): String? = body.contentType()
47+
48+
override fun contentLength(): Long = -1L
49+
50+
override fun repeatable(): Boolean = body.repeatable()
51+
52+
override fun close() = body.close()
53+
}
54+
3555
@JvmSynthetic
3656
internal fun multipartFormData(
3757
jsonMapper: JsonMapper,
@@ -100,6 +120,18 @@ internal fun multipartFormData(
100120
}
101121
.build()
102122

123+
private class NonClosingOutputStream(private val delegate: OutputStream) : OutputStream() {
124+
override fun write(b: Int) = delegate.write(b)
125+
126+
override fun write(b: ByteArray) = delegate.write(b)
127+
128+
override fun write(b: ByteArray, off: Int, len: Int) = delegate.write(b, off, len)
129+
130+
override fun flush() = delegate.flush()
131+
132+
override fun close() = Unit
133+
}
134+
103135
private fun serializePart(name: String, node: JsonNode): Sequence<Pair<String, InputStream>> =
104136
when (node.nodeType) {
105137
JsonNodeType.MISSING,
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.langchain.smith.services
2+
3+
import com.github.luben.zstd.Zstd
4+
import com.langchain.smith.models.info.InfoListResponse
5+
import kotlin.jvm.optionals.getOrNull
6+
7+
internal fun isRunCompressionDisabled(): Boolean =
8+
isTruish(System.getenv("LANGSMITH_DISABLE_RUN_COMPRESSION"))
9+
10+
internal fun isZstdAvailable(): Boolean = zstdAvailable
11+
12+
internal fun shouldDefaultRunCompressionEnabled(): Boolean =
13+
!isRunCompressionDisabled() && isZstdAvailable()
14+
15+
private val zstdAvailable: Boolean by lazy {
16+
try {
17+
Zstd.compress(byteArrayOf()).isNotEmpty()
18+
} catch (_: Throwable) {
19+
false
20+
}
21+
}
22+
23+
internal fun isZstdCompressionEnabled(info: InfoListResponse): Boolean =
24+
// Default on; the server must explicitly return false to disable compression.
25+
info
26+
.instanceFlags()
27+
.getOrNull()
28+
?._additionalProperties()
29+
?.get("zstd_compression_enabled")
30+
?.asBoolean()
31+
?.getOrNull() ?: true
32+
33+
private val TRUISH_VALUES = setOf("1", "true", "t", "yes", "y", "on")
34+
35+
private fun isTruish(value: String?): Boolean = value?.trim()?.lowercase() in TRUISH_VALUES

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/async/InfoServiceAsync.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ interface InfoServiceAsync {
5252
fun withOptions(modifier: Consumer<ClientOptions.Builder>): InfoServiceAsync.WithRawResponse
5353

5454
/**
55-
* Returns a raw HTTP response for `get /api/v1/info`, but is otherwise the same as
55+
* Returns a raw HTTP response for `get /info`, but is otherwise the same as
5656
* [InfoServiceAsync.list].
5757
*/
5858
fun list(): CompletableFuture<HttpResponseFor<InfoListResponse>> =

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/async/InfoServiceAsyncImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class InfoServiceAsyncImpl internal constructor(private val clientOptions: Clien
3535
params: InfoListParams,
3636
requestOptions: RequestOptions,
3737
): CompletableFuture<InfoListResponse> =
38-
// get /api/v1/info
38+
// get /info
3939
withRawResponse().list(params, requestOptions).thenApply { it.parse() }
4040

4141
class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
@@ -62,7 +62,7 @@ class InfoServiceAsyncImpl internal constructor(private val clientOptions: Clien
6262
HttpRequest.builder()
6363
.method(HttpMethod.GET)
6464
.baseUrl(clientOptions.baseUrl())
65-
.addPathSegments("api", "v1", "info")
65+
.addPathSegment("info")
6666
.build()
6767
.prepareAsync(clientOptions, params)
6868
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/async/RunServiceAsyncImpl.kt

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ import com.langchain.smith.core.http.HttpResponse.Handler
1919
import com.langchain.smith.core.http.HttpResponseFor
2020
import com.langchain.smith.core.http.json
2121
import com.langchain.smith.core.http.parseable
22+
import com.langchain.smith.core.http.zstd
2223
import com.langchain.smith.core.prepareAsync
2324
import com.langchain.smith.errors.NotFoundException
25+
import com.langchain.smith.models.info.InfoListResponse
2426
import com.langchain.smith.models.runs.RunCreateParams
2527
import com.langchain.smith.models.runs.RunCreateResponse
2628
import com.langchain.smith.models.runs.RunIngestBatchParams
@@ -35,9 +37,10 @@ import com.langchain.smith.models.runs.RunUpdate2Params
3537
import com.langchain.smith.models.runs.RunUpdate2Response
3638
import com.langchain.smith.models.runs.RunUpdateParams
3739
import com.langchain.smith.models.runs.RunUpdateResponse
38-
import com.langchain.smith.services.async.annotationqueues.InfoServiceAsyncImpl
3940
import com.langchain.smith.services.async.runs.RuleServiceAsync
4041
import com.langchain.smith.services.async.runs.RuleServiceAsyncImpl
42+
import com.langchain.smith.services.isZstdCompressionEnabled
43+
import com.langchain.smith.services.shouldDefaultRunCompressionEnabled
4144
import java.util.concurrent.CompletableFuture
4245
import java.util.concurrent.CompletionException
4346
import java.util.concurrent.atomic.AtomicBoolean
@@ -48,7 +51,11 @@ import org.slf4j.LoggerFactory
4851
class RunServiceAsyncImpl internal constructor(private val clientOptions: ClientOptions) :
4952
RunServiceAsync {
5053

51-
private val withRawResponse: WithRawResponseImpl by lazy { WithRawResponseImpl(clientOptions) }
54+
private val serverInfo: CompletableFuture<InfoListResponse?> by lazy { fetchServerInfo() }
55+
56+
private val withRawResponse: WithRawResponseImpl by lazy {
57+
WithRawResponseImpl(clientOptions = clientOptions, getServerInfo = { serverInfo })
58+
}
5259

5360
private val rules: RuleServiceAsync by lazy { RuleServiceAsyncImpl(clientOptions) }
5461
private val multipartDisabled = AtomicBoolean(false)
@@ -110,17 +117,36 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client
110117
null
111118
}
112119

113-
private fun fetchAutoBatchIngestLimits() =
120+
private fun fetchServerInfo(): CompletableFuture<InfoListResponse?> =
121+
try {
122+
InfoServiceAsyncImpl(clientOptions).list().handle { info, throwable ->
123+
if (throwable != null) {
124+
logger.warn(
125+
"Failed to fetch LangSmith server info; using default batch limits and compression settings",
126+
throwable,
127+
)
128+
null
129+
} else {
130+
info
131+
}
132+
}
133+
} catch (e: Exception) {
134+
logger.warn(
135+
"Failed to fetch LangSmith server info; using default batch limits and compression settings",
136+
e,
137+
)
138+
CompletableFuture.completedFuture(null)
139+
}
140+
141+
private fun fetchAutoBatchIngestLimits(): AutoBatchIngestLimits =
114142
try {
115-
InfoServiceAsyncImpl(clientOptions)
116-
.list()
117-
.get()
118-
.batchIngestConfig()
119-
.getOrNull()
120-
.toAutoBatchIngestLimits()
143+
serverInfo.get()?.batchIngestConfig()?.getOrNull().toAutoBatchIngestLimits()
144+
} catch (e: InterruptedException) {
145+
Thread.currentThread().interrupt()
146+
AutoBatchIngestLimits()
121147
} catch (e: Exception) {
122148
logger.warn(
123-
"Failed to fetch LangSmith batch ingest config; using default batch limits",
149+
"Failed to fetch LangSmith server info; using default batch limits and compression settings",
124150
e,
125151
)
126152
AutoBatchIngestLimits()
@@ -226,12 +252,37 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client
226252
CompletableFuture<T>().also { it.completeExceptionally(throwable) }
227253
}
228254

229-
class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
230-
RunServiceAsync.WithRawResponse {
255+
class WithRawResponseImpl
256+
internal constructor(
257+
private val clientOptions: ClientOptions,
258+
private val getServerInfo: () -> CompletableFuture<InfoListResponse?> = {
259+
CompletableFuture.completedFuture(null)
260+
},
261+
) : RunServiceAsync.WithRawResponse {
231262

232263
private val errorHandler: Handler<HttpResponse> =
233264
errorHandler(errorBodyHandler(clientOptions.jsonMapper))
234265

266+
private val zstdCompressionEnabled: CompletableFuture<Boolean> by lazy {
267+
fetchZstdCompressionEnabled()
268+
}
269+
270+
private fun fetchZstdCompressionEnabled(): CompletableFuture<Boolean> {
271+
if (!shouldDefaultRunCompressionEnabled()) {
272+
return CompletableFuture.completedFuture(false)
273+
}
274+
return try {
275+
getServerInfo()
276+
.thenApply { info ->
277+
info?.let(::isZstdCompressionEnabled)
278+
?: shouldDefaultRunCompressionEnabled()
279+
}
280+
.exceptionally { shouldDefaultRunCompressionEnabled() }
281+
} catch (_: Exception) {
282+
CompletableFuture.completedFuture(shouldDefaultRunCompressionEnabled())
283+
}
284+
}
285+
235286
private val rules: RuleServiceAsync.WithRawResponse by lazy {
236287
RuleServiceAsyncImpl.WithRawResponseImpl(clientOptions)
237288
}
@@ -384,16 +435,25 @@ class RunServiceAsyncImpl internal constructor(private val clientOptions: Client
384435
// back to legacy JSON batch ingest for this batch only.
385436
return CompletableFuture.completedFuture(false)
386437
}
387-
val request =
388-
HttpRequest.builder()
389-
.method(HttpMethod.POST)
390-
.baseUrl(clientOptions.baseUrl())
391-
.addPathSegments("runs", "multipart")
392-
.body(body)
393-
.build()
394-
.prepareAsync(clientOptions, params)
395438
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))
396-
return request
439+
return zstdCompressionEnabled
440+
.thenCompose { zstdCompressionEnabled ->
441+
val requestBuilder =
442+
HttpRequest.builder()
443+
.method(HttpMethod.POST)
444+
.baseUrl(clientOptions.baseUrl())
445+
.addPathSegments("runs", "multipart")
446+
if (zstdCompressionEnabled) {
447+
requestBuilder.putHeader("Content-Encoding", "zstd").body(zstd(body))
448+
} else {
449+
requestBuilder.body(body)
450+
}
451+
logger.debug(
452+
"Sending LangSmith run batch to multipart ingest endpoint (zstd compression: {})",
453+
if (zstdCompressionEnabled) "enabled" else "disabled",
454+
)
455+
requestBuilder.build().prepareAsync(clientOptions, params)
456+
}
397457
.thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) }
398458
.thenApply { response ->
399459
errorHandler.handle(response).use {}

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/blocking/InfoService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ interface InfoService {
5252
fun withOptions(modifier: Consumer<ClientOptions.Builder>): InfoService.WithRawResponse
5353

5454
/**
55-
* Returns a raw HTTP response for `get /api/v1/info`, but is otherwise the same as
55+
* Returns a raw HTTP response for `get /info`, but is otherwise the same as
5656
* [InfoService.list].
5757
*/
5858
@MustBeClosed fun list(): HttpResponseFor<InfoListResponse> = list(InfoListParams.none())

langsmith-java-core/src/main/kotlin/com/langchain/smith/services/blocking/InfoServiceImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class InfoServiceImpl internal constructor(private val clientOptions: ClientOpti
3030
InfoServiceImpl(clientOptions.toBuilder().apply(modifier::accept).build())
3131

3232
override fun list(params: InfoListParams, requestOptions: RequestOptions): InfoListResponse =
33-
// get /api/v1/info
33+
// get /info
3434
withRawResponse().list(params, requestOptions).parse()
3535

3636
class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
@@ -57,7 +57,7 @@ class InfoServiceImpl internal constructor(private val clientOptions: ClientOpti
5757
HttpRequest.builder()
5858
.method(HttpMethod.GET)
5959
.baseUrl(clientOptions.baseUrl())
60-
.addPathSegments("api", "v1", "info")
60+
.addPathSegment("info")
6161
.build()
6262
.prepare(clientOptions, params)
6363
val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions))

0 commit comments

Comments
 (0)