@@ -17,10 +17,12 @@ import java.util.concurrent.Executors
1717import java.util.concurrent.Phaser
1818import java.util.concurrent.RejectedExecutionException
1919import java.util.concurrent.ScheduledExecutorService
20+ import java.util.concurrent.ScheduledFuture
2021import java.util.concurrent.TimeUnit
2122import java.util.concurrent.atomic.AtomicBoolean
2223import java.util.concurrent.atomic.AtomicInteger
2324import java.util.concurrent.locks.ReentrantLock
25+ import kotlin.concurrent.withLock
2426import kotlin.jvm.optionals.getOrNull
2527import org.slf4j.LoggerFactory
2628
@@ -35,20 +37,28 @@ import org.slf4j.LoggerFactory
3537 *
3638 * @param sendBatch sends a batch and completes when the send has finished
3739 * @param batchSizeLimit max operations before auto-flush (default 100)
38- * @param aggregationDelayMs delay before timer-based flush (default 250ms)
40+ * @param aggregationDelayMs inactivity delay before timer-based flush (default 250ms, matching JS
41+ * auto-batch aggregation and Python's initial queue wait)
3942 * @param sendParallelism max number of batch requests to send concurrently (default 4)
43+ * @param maxAggregationDelayMs max delay before timer-based flush after first item in a burst
44+ * (default 500ms, matching Python's background flush interval)
45+ * @param batchSizeLimitBytes max serialized JSON body size per batch (default 20 MiB)
4046 */
4147class AutoBatchQueue (
4248 private val sendBatch : (RunIngestBatchParams , RequestOptions ) -> CompletionStage <Void ?>,
4349 private val batchSizeLimit : Int = DEFAULT_BATCH_SIZE_LIMIT ,
4450 private val aggregationDelayMs : Long = DEFAULT_AGGREGATION_DELAY_MS ,
4551 private val sendParallelism : Int = DEFAULT_SEND_PARALLELISM ,
52+ private val maxAggregationDelayMs : Long = DEFAULT_MAX_AGGREGATION_DELAY_MS ,
53+ private val batchSizeLimitBytes : Int ,
4654) {
4755 private val items = ConcurrentLinkedQueue <BatchItem >()
4856 private val queuedCount = AtomicInteger (0 )
4957 private val shutdown = AtomicBoolean (false )
50- private val delayedFlushScheduled = AtomicBoolean (false )
5158 private val enqueueShutdownLock = ReentrantLock ()
59+ private val delayedFlushLock = ReentrantLock ()
60+ private var delayedFlushFuture: ScheduledFuture <* >? = null
61+ private var firstQueuedAtNanos: Long? = null
5262 private val activeSends =
5363 object : Phaser (0 ) {
5464 override fun onAdvance (phase : Int , registeredParties : Int ): Boolean = false
@@ -91,6 +101,7 @@ class AutoBatchQueue(
91101 * Safe to call from any thread. No-op if the queue is empty.
92102 */
93103 fun flush () {
104+ cancelDelayedFlush()
94105 while (true ) {
95106 if (queuedCount.get() > 0 && ! drainOnCoordinator()) {
96107 return
@@ -112,14 +123,13 @@ class AutoBatchQueue(
112123 * After calling this, the queue will no longer accept new operations.
113124 */
114125 fun shutdown () {
115- enqueueShutdownLock.lock()
116- try {
117- // Serialize with enqueue's check-and-add so flush cannot miss an item that observed
118- // shutdown=false but has not yet been queued.
119- if (! shutdown.compareAndSet(false , true )) return
120- } finally {
121- enqueueShutdownLock.unlock()
122- }
126+ val startedShutdown =
127+ enqueueShutdownLock.withLock {
128+ // Serialize with enqueue's check-and-add so flush cannot miss an item that observed
129+ // shutdown=false but has not yet been queued.
130+ shutdown.compareAndSet(false , true )
131+ }
132+ if (! startedShutdown) return
123133
124134 flush()
125135 coordinator.shutdown()
@@ -139,6 +149,20 @@ class AutoBatchQueue(
139149 }
140150 }
141151
152+ constructor (
153+ sendBatch: (RunIngestBatchParams , RequestOptions ) -> CompletionStage <Void ?>,
154+ batchSizeLimit: Int = DEFAULT_BATCH_SIZE_LIMIT ,
155+ aggregationDelayMs: Long = DEFAULT_AGGREGATION_DELAY_MS ,
156+ sendParallelism: Int = DEFAULT_SEND_PARALLELISM ,
157+ ) : this (
158+ sendBatch = sendBatch,
159+ batchSizeLimit = batchSizeLimit,
160+ aggregationDelayMs = aggregationDelayMs,
161+ sendParallelism = sendParallelism,
162+ maxAggregationDelayMs = DEFAULT_MAX_AGGREGATION_DELAY_MS ,
163+ batchSizeLimitBytes = DEFAULT_BATCH_SIZE_LIMIT_BYTES ,
164+ )
165+
142166 /* * Returns the number of queued operations (for testing). */
143167 internal fun size (): Int = queuedCount.get()
144168
@@ -149,9 +173,8 @@ class AutoBatchQueue(
149173 queryParams : QueryParams ,
150174 requestOptions : RequestOptions ,
151175 ) {
152- val count = run {
153- enqueueShutdownLock.lock()
154- try {
176+ val count =
177+ enqueueShutdownLock.withLock {
155178 check(! shutdown.get()) { " AutoBatchQueue is shut down" }
156179 items.add(
157180 BatchItem (
@@ -163,10 +186,7 @@ class AutoBatchQueue(
163186 )
164187 )
165188 queuedCount.incrementAndGet()
166- } finally {
167- enqueueShutdownLock.unlock()
168189 }
169- }
170190
171191 afterEnqueue(count)
172192 }
@@ -180,31 +200,56 @@ class AutoBatchQueue(
180200 }
181201
182202 private fun scheduleFlush () {
183- if (! delayedFlushScheduled.compareAndSet(false , true )) return
203+ delayedFlushLock.withLock {
204+ try {
205+ val now = System .nanoTime()
206+ val firstQueuedAt = firstQueuedAtNanos ? : now.also { firstQueuedAtNanos = it }
207+ val elapsedMs = TimeUnit .NANOSECONDS .toMillis(now - firstQueuedAt)
208+ val remainingMaxDelayMs = (maxAggregationDelayMs - elapsedMs).coerceAtLeast(0 )
209+ val delayMs = minOf(aggregationDelayMs, remainingMaxDelayMs)
210+
211+ delayedFlushFuture?.cancel(false )
212+ delayedFlushFuture =
213+ coordinator.schedule(
214+ {
215+ clearDelayedFlushState()
216+ drainAndSubmitSends()
217+ },
218+ delayMs,
219+ TimeUnit .MILLISECONDS ,
220+ )
221+ } catch (e: RejectedExecutionException ) {
222+ delayedFlushFuture = null
223+ firstQueuedAtNanos = null
224+ logger.warn(" Batch queue coordinator rejected delayed flush" , e)
225+ }
226+ }
227+ }
184228
185- try {
186- coordinator.schedule(
187- {
188- delayedFlushScheduled.set(false )
189- drainAndSubmitSends()
190- },
191- aggregationDelayMs,
192- TimeUnit .MILLISECONDS ,
193- )
194- } catch (e: RejectedExecutionException ) {
195- delayedFlushScheduled.set(false )
196- logger.warn(" Batch queue coordinator rejected delayed flush" , e)
229+ private fun clearDelayedFlushState () {
230+ delayedFlushLock.withLock {
231+ delayedFlushFuture = null
232+ firstQueuedAtNanos = null
197233 }
198234 }
199235
200236 private fun triggerFlush () {
237+ cancelDelayedFlush()
201238 try {
202239 coordinator.execute { drainAndSubmitSends() }
203240 } catch (e: RejectedExecutionException ) {
204241 logger.warn(" Batch queue coordinator rejected flush" , e)
205242 }
206243 }
207244
245+ private fun cancelDelayedFlush () {
246+ delayedFlushLock.withLock {
247+ delayedFlushFuture?.cancel(false )
248+ delayedFlushFuture = null
249+ firstQueuedAtNanos = null
250+ }
251+ }
252+
208253 private fun drainOnCoordinator (): Boolean {
209254 val drainFuture =
210255 try {
@@ -225,10 +270,14 @@ class AutoBatchQueue(
225270 }
226271
227272 private fun drainAndSubmitSends () {
228- val batches = drainUpTo(batchSizeLimit)
229- if (batches.isEmpty()) return
273+ var remainingToDrain = queuedCount.get()
274+ while (remainingToDrain > 0 ) {
275+ val drainResult = drainUpTo(minOf(batchSizeLimit, remainingToDrain))
276+ if (drainResult.itemCount == 0 ) break
230277
231- batches.forEach(::submitBatch)
278+ drainResult.batches.forEach(::submitBatch)
279+ remainingToDrain - = drainResult.itemCount
280+ }
232281
233282 when {
234283 queuedCount.get() >= batchSizeLimit -> triggerFlush()
@@ -241,30 +290,36 @@ class AutoBatchQueue(
241290 * Drains up to [maxItems] queued operations and returns batch params grouped by request
242291 * options.
243292 *
244- * TODO: Also flush/split batches based on serialized payload size, not just operation count.
245293 * TODO: Support multipart ingest endpoint for large payloads with attachments.
246294 * TODO: Support gzip compression for batch requests.
247295 */
248- private fun drainUpTo (maxItems : Int ): List < Batch > {
249- val groups = linkedMapOf<RequestOptionsKey , BatchGroup >()
296+ private fun drainUpTo (maxItems : Int ): DrainResult {
297+ val openGroups = linkedMapOf<RequestOptionsKey , BatchGroup >()
250298 var drained = 0
251299
252- while (drained < maxItems) {
253- val item = items.poll() ? : break
254- queuedCount.decrementAndGet()
255- drained++
256-
257- val group =
258- groups.getOrPut(item.requestOptions.key()) { BatchGroup (item.requestOptions) }
259- when (item.op) {
260- BatchOp .Post -> group.posts.add(item.run)
261- BatchOp .Patch -> group.patches.add(item.run)
300+ val batches = buildList {
301+ while (drained < maxItems) {
302+ val item = items.poll() ? : break
303+ queuedCount.decrementAndGet()
304+ drained++
305+
306+ val key = item.requestOptions.key()
307+ val itemSerializedSize = objectMapper.writeValueAsBytes(item.run).size
308+ var group = openGroups.getOrPut(key) { BatchGroup (item.requestOptions) }
309+ if (
310+ group.isNotEmpty() &&
311+ group.serializedSizeWith(item, itemSerializedSize) > batchSizeLimitBytes
312+ ) {
313+ add(group.toBatch())
314+ group = BatchGroup (item.requestOptions)
315+ openGroups[key] = group
316+ }
317+ group.add(item, itemSerializedSize)
262318 }
263- group.headers.putAll(item.headers)
264- group.queryParams.putAll(item.queryParams)
265- }
266319
267- return groups.values.map { it.toBatch() }
320+ addAll(openGroups.values.map { it.toBatch() })
321+ }
322+ return DrainResult (batches = batches, itemCount = drained)
268323 }
269324
270325 private fun submitBatch (batch : Batch ) {
@@ -314,15 +369,71 @@ class AutoBatchQueue(
314369 Patch ,
315370 }
316371
372+ private data class DrainResult (val batches : List <Batch >, val itemCount : Int )
373+
317374 private data class Batch (val params : RunIngestBatchParams , val requestOptions : RequestOptions )
318375
319- private data class BatchGroup (
376+ private class BatchGroup (
320377 val requestOptions : RequestOptions ,
321378 val posts : MutableList <Run > = mutableListOf(),
322379 val patches : MutableList <Run > = mutableListOf(),
323380 val headers : Headers .Builder = Headers .builder(),
324381 val queryParams : QueryParams .Builder = QueryParams .builder(),
325382 ) {
383+ private var postSerializedSizeBytes = 0
384+ private var patchSerializedSizeBytes = 0
385+
386+ fun add (item : BatchItem , serializedSizeBytes : Int ) {
387+ when (item.op) {
388+ BatchOp .Post -> {
389+ posts.add(item.run)
390+ postSerializedSizeBytes + = serializedSizeBytes
391+ }
392+ BatchOp .Patch -> {
393+ patches.add(item.run)
394+ patchSerializedSizeBytes + = serializedSizeBytes
395+ }
396+ }
397+ headers.putAll(item.headers)
398+ queryParams.putAll(item.queryParams)
399+ }
400+
401+ fun isNotEmpty (): Boolean = posts.isNotEmpty() || patches.isNotEmpty()
402+
403+ fun serializedSizeWith (item : BatchItem , itemSerializedSizeBytes : Int ): Int =
404+ estimatedBodySizeBytes(
405+ postCount = posts.size + if (item.op == BatchOp .Post ) 1 else 0 ,
406+ postItemsSizeBytes =
407+ postSerializedSizeBytes +
408+ if (item.op == BatchOp .Post ) itemSerializedSizeBytes else 0 ,
409+ patchCount = patches.size + if (item.op == BatchOp .Patch ) 1 else 0 ,
410+ patchItemsSizeBytes =
411+ patchSerializedSizeBytes +
412+ if (item.op == BatchOp .Patch ) itemSerializedSizeBytes else 0 ,
413+ )
414+
415+ private fun estimatedBodySizeBytes (
416+ postCount : Int ,
417+ postItemsSizeBytes : Int ,
418+ patchCount : Int ,
419+ patchItemsSizeBytes : Int ,
420+ ): Int {
421+ var size = 2 // Opening and closing braces.
422+ var fieldCount = 0
423+ if (patchCount > 0 ) {
424+ size + = fieldSizeBytes(" patch" , patchCount, patchItemsSizeBytes)
425+ fieldCount++
426+ }
427+ if (postCount > 0 ) {
428+ if (fieldCount > 0 ) size++ // Comma between top-level fields.
429+ size + = fieldSizeBytes(" post" , postCount, postItemsSizeBytes)
430+ }
431+ return size
432+ }
433+
434+ private fun fieldSizeBytes (name : String , count : Int , itemsSizeBytes : Int ): Int =
435+ name.length + 5 + itemsSizeBytes + (count - 1 ) // `"name":[]` + item commas.
436+
326437 fun toBatch (): Batch {
327438 val mergeResult = mergePostsAndPatches()
328439 val builder = RunIngestBatchParams .builder()
@@ -394,7 +505,9 @@ class AutoBatchQueue(
394505 private val objectMapper = jsonMapper()
395506
396507 const val DEFAULT_BATCH_SIZE_LIMIT = 100
508+ const val DEFAULT_BATCH_SIZE_LIMIT_BYTES = 20_971_520
397509 const val DEFAULT_AGGREGATION_DELAY_MS = 250L
510+ const val DEFAULT_MAX_AGGREGATION_DELAY_MS = 500L
398511 const val DEFAULT_SEND_PARALLELISM = 4
399512 }
400513}
0 commit comments