Skip to content

Commit f6e73df

Browse files
committed
(bug): fix to queue drain
1 parent ced40bb commit f6e73df

6 files changed

Lines changed: 622 additions & 19 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ async def on_span_start(self, span: Span) -> None:
145145
items=[sgp_span.to_request_params()]
146146
)
147147

148+
# Input has been serialized and sent; clear it on the retained span to
149+
# release memory. on_span_end only needs output/metadata/end_time.
150+
sgp_span.input = None # type: ignore[assignment]
148151
self._spans[span.id] = sgp_span
149152

150153
@override

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
logger = make_logger(__name__)
1414

15+
_DEFAULT_BATCH_SIZE = 50
16+
1517

1618
class SpanEventType(str, Enum):
1719
START = "start"
@@ -28,15 +30,18 @@ class _SpanQueueItem:
2830
class AsyncSpanQueue:
2931
"""Background FIFO queue for async span processing.
3032
31-
Span events are enqueued synchronously (non-blocking) and processed
32-
sequentially by a background drain task. This keeps tracing HTTP calls
33-
off the critical request path while preserving start-before-end ordering.
33+
Span events are enqueued synchronously (non-blocking) and drained by a
34+
background task. Items are processed in batches: all START events in a
35+
batch are flushed concurrently, then all END events, so that per-span
36+
start-before-end ordering is preserved while HTTP calls for independent
37+
spans execute in parallel.
3438
"""
3539

36-
def __init__(self) -> None:
40+
def __init__(self, batch_size: int = _DEFAULT_BATCH_SIZE) -> None:
3741
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue()
3842
self._drain_task: asyncio.Task[None] | None = None
3943
self._stopping = False
44+
self._batch_size = batch_size
4045

4146
def enqueue(
4247
self,
@@ -54,9 +59,45 @@ def _ensure_drain_running(self) -> None:
5459
if self._drain_task is None or self._drain_task.done():
5560
self._drain_task = asyncio.create_task(self._drain_loop())
5661

62+
# ------------------------------------------------------------------
63+
# Drain loop
64+
# ------------------------------------------------------------------
65+
5766
async def _drain_loop(self) -> None:
5867
while True:
59-
item = await self._queue.get()
68+
# Block until at least one item is available.
69+
first = await self._queue.get()
70+
batch: list[_SpanQueueItem] = [first]
71+
72+
# Opportunistically grab more ready items (non-blocking).
73+
while len(batch) < self._batch_size:
74+
try:
75+
batch.append(self._queue.get_nowait())
76+
except asyncio.QueueEmpty:
77+
break
78+
79+
try:
80+
# Separate START and END events. Processing all STARTs before
81+
# ENDs ensures that on_span_start completes before on_span_end
82+
# for any span whose both events land in the same batch.
83+
starts = [i for i in batch if i.event_type == SpanEventType.START]
84+
ends = [i for i in batch if i.event_type == SpanEventType.END]
85+
86+
if starts:
87+
await self._process_items(starts)
88+
if ends:
89+
await self._process_items(ends)
90+
finally:
91+
for _ in batch:
92+
self._queue.task_done()
93+
# Release span data for GC.
94+
batch.clear()
95+
96+
@staticmethod
97+
async def _process_items(items: list[_SpanQueueItem]) -> None:
98+
"""Process a list of span events concurrently."""
99+
100+
async def _handle(item: _SpanQueueItem) -> None:
60101
try:
61102
if item.event_type == SpanEventType.START:
62103
coros = [p.on_span_start(item.span) for p in item.processors]
@@ -72,9 +113,15 @@ async def _drain_loop(self) -> None:
72113
exc_info=result,
73114
)
74115
except Exception:
75-
logger.exception("Unexpected error in span queue drain loop for span %s", item.span.id)
76-
finally:
77-
self._queue.task_done()
116+
logger.exception(
117+
"Unexpected error in span queue for span %s", item.span.id
118+
)
119+
120+
await asyncio.gather(*[_handle(item) for item in items])
121+
122+
# ------------------------------------------------------------------
123+
# Shutdown
124+
# ------------------------------------------------------------------
78125

79126
async def shutdown(self, timeout: float = 30.0) -> None:
80127
self._stopping = True

src/agentex/lib/core/tracing/trace.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def end_span(
109109
if span.end_time is None:
110110
span.end_time = datetime.now(UTC)
111111

112-
span.input = recursive_model_dump(span.input) if span.input else None
112+
# input was already serialized at start_span; skip redundant re-serialization
113113
span.output = recursive_model_dump(span.output) if span.output else None
114114
span.data = recursive_model_dump(span.data) if span.data else None
115115

@@ -252,12 +252,17 @@ async def end_span(
252252
if span.end_time is None:
253253
span.end_time = datetime.now(UTC)
254254

255-
span.input = recursive_model_dump(span.input) if span.input else None
255+
# input was already serialized at start_span; skip redundant re-serialization
256256
span.output = recursive_model_dump(span.output) if span.output else None
257257
span.data = recursive_model_dump(span.data) if span.data else None
258258

259259
if self.processors:
260-
self._span_queue.enqueue(SpanEventType.END, span.model_copy(deep=True), self.processors)
260+
end_copy = span.model_copy(deep=True)
261+
# input was already sent with the START event; drop it from the END
262+
# copy to avoid retaining large payloads (system prompts, full
263+
# conversation histories) in the async queue.
264+
end_copy.input = None
265+
self._span_queue.enqueue(SpanEventType.END, end_copy, self.processors)
261266

262267
return span
263268

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,18 @@ async def test_span_end_for_unknown_span_is_noop(self):
162162
await processor.on_span_end(span)
163163

164164
assert len(processor._spans) == 0
165+
166+
async def test_sgp_span_input_cleared_after_start(self):
167+
"""After on_span_start sends the data, sgp_span.input should be None to release memory."""
168+
processor, _ = self._make_processor()
169+
170+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
171+
span = _make_span()
172+
span.input = {"system_prompt": "x" * 10_000}
173+
await processor.on_span_start(span)
174+
175+
assert len(processor._spans) == 1
176+
sgp_span = next(iter(processor._spans.values()))
177+
assert sgp_span.input is None, (
178+
"SGP span input should be cleared after upsert to release memory"
179+
)

0 commit comments

Comments
 (0)