diff --git a/src/agentex/lib/core/observability/__init__.py b/src/agentex/lib/core/observability/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/agentex/lib/core/observability/llm_metrics.py b/src/agentex/lib/core/observability/llm_metrics.py new file mode 100644 index 000000000..b15e83824 --- /dev/null +++ b/src/agentex/lib/core/observability/llm_metrics.py @@ -0,0 +1,121 @@ +"""OTel metrics for LLM calls. + +Single source of truth for LLM-call instrumentation across all agentex code +paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK +plugin in future PRs. Centralizing the instrument definitions here means +those follow-ups don't need to redefine the metric names, units, or +description strings; they import ``get_llm_metrics()`` and record values. + +The meter is no-op when the application hasn't configured a ``MeterProvider``, +so importing this module is safe for runtimes that don't use OTel. Instruments +are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider`` +configured *after* this module is imported still binds correctly. + +Cardinality is bounded: +- All metrics carry only ``model`` (the LLM model name). +- ``requests`` additionally carries ``status``, drawn from a small fixed set + (see ``classify_status``). + +Resource attributes (``service.name``, ``k8s.*``, etc.) come from the +application's OTel resource configuration and are added to every series +automatically. +""" + +from __future__ import annotations + +from typing import Optional + +from opentelemetry import metrics + + +class LLMMetrics: + """Lazily-created OTel instruments for LLM call telemetry.""" + + def __init__(self) -> None: + meter = metrics.get_meter("agentex.llm") + self.requests = meter.create_counter( + name="agentex.llm.requests", + unit="1", + description=( + "LLM call count tagged with status (success / rate_limit / " + "server_error / client_error / timeout / network_error / " + "other_error). Use to alert on 429s, 5xxs, etc." + ), + ) + self.ttft_ms = meter.create_histogram( + name="agentex.llm.ttft", + unit="ms", + description="Time from request submission to first content token (ms)", + ) + # ttat (time-to-first-answering-token) is distinct from ttft for reasoning + # models: ttft fires on the first reasoning chunk (which arrives quickly), + # while ttat fires on the first user-visible answer token (text or tool + # call). For non-reasoning models the two are equal. + self.ttat_ms = meter.create_histogram( + name="agentex.llm.ttat", + unit="ms", + description="Time from request submission to first answering token (text or tool-call delta) — excludes reasoning chunks", + ) + # Note: TPS denominator is the model-generation window + # (last_token_time - first_token_time), not total stream wall time. + # This isolates raw model throughput from event-loop / tool-call latency. + self.tps = meter.create_histogram( + name="agentex.llm.tps", + unit="tokens/s", + description="Output tokens per second over the generation window", + ) + self.input_tokens = meter.create_counter( + name="agentex.llm.input_tokens", + unit="tokens", + description="Total input tokens sent to the LLM", + ) + self.output_tokens = meter.create_counter( + name="agentex.llm.output_tokens", + unit="tokens", + description="Total output tokens returned by the LLM", + ) + self.cached_input_tokens = meter.create_counter( + name="agentex.llm.cached_input_tokens", + unit="tokens", + description="Subset of input tokens served from prompt cache", + ) + self.reasoning_tokens = meter.create_counter( + name="agentex.llm.reasoning_tokens", + unit="tokens", + description="Output tokens spent on reasoning (subset of output_tokens)", + ) + + +_llm_metrics: Optional[LLMMetrics] = None + + +def get_llm_metrics() -> LLMMetrics: + """Return the LLM metrics singleton, creating it on first use.""" + global _llm_metrics + if _llm_metrics is None: + _llm_metrics = LLMMetrics() + return _llm_metrics + + +def classify_status(exc: Optional[BaseException]) -> str: + """Categorize an LLM call's outcome into a small fixed set of status labels. + + A successful call returns ``"success"``. Exceptions are mapped by type name + so we don't depend on a specific provider SDK's exception class hierarchy: + OpenAI, Anthropic, and other providers all use names like ``RateLimitError``, + ``APITimeoutError``, ``InternalServerError``, etc. + """ + if exc is None: + return "success" + name = type(exc).__name__ + if "RateLimit" in name: + return "rate_limit" + if "Timeout" in name: + return "timeout" + if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")): + return "server_error" + if "Connection" in name: + return "network_error" + if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")): + return "client_error" + return "other_error" diff --git a/src/agentex/lib/core/observability/llm_metrics_hooks.py b/src/agentex/lib/core/observability/llm_metrics_hooks.py new file mode 100644 index 000000000..fce4b29ba --- /dev/null +++ b/src/agentex/lib/core/observability/llm_metrics_hooks.py @@ -0,0 +1,57 @@ +"""``RunHooks`` adapter that emits per-call LLM metrics. + +Used by the sync ACP path and as a base class for ``TemporalStreamingHooks`` +on the async path, so token / request / cache metrics emit consistently +across both. Streaming-only metrics (ttft, ttat, tps) are emitted from the +streaming model itself, not here — hooks don't see individual chunks. +""" + +from __future__ import annotations + +from typing import Any +from typing_extensions import override + +from agents import Agent, RunHooks, ModelResponse, RunContextWrapper + +from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics + + +class LLMMetricsHooks(RunHooks): + """Emits ``agentex.llm.requests`` + token counters on every LLM call.""" + + @override + async def on_llm_end( + self, + context: RunContextWrapper[Any], + agent: Agent[Any], + response: ModelResponse, + ) -> None: + del context # part of the RunHooks contract; unused here + m = get_llm_metrics() + attrs = {"model": str(agent.model) if agent.model else "unknown"} + # Request counter only depends on agent.model, so emit it first and + # outside the usage-extraction try block. Token counters reach into + # nested optional fields and are best-effort: a non-OpenAI provider + # (litellm-routed Anthropic, etc.) may return a Usage shape missing + # input_tokens_details / output_tokens_details — we emit zeros where + # we can and skip the rest rather than crash the caller. + try: + m.requests.add(1, {**attrs, "status": "success"}) + except Exception: + pass + try: + usage = response.usage + m.input_tokens.add(usage.input_tokens or 0, attrs) + m.output_tokens.add(usage.output_tokens or 0, attrs) + m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs) + m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs) + except Exception: + pass + + +def record_llm_failure(model: str, exc: BaseException) -> None: + """Best-effort counter bump for an LLM call that raised before ``on_llm_end``.""" + try: + get_llm_metrics().requests.add(1, {"model": model, "status": classify_status(exc)}) + except Exception: + pass diff --git a/src/agentex/lib/core/observability/tests/__init__.py b/src/agentex/lib/core/observability/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/agentex/lib/core/observability/tests/test_llm_metrics.py b/src/agentex/lib/core/observability/tests/test_llm_metrics.py new file mode 100644 index 000000000..d8ab62eba --- /dev/null +++ b/src/agentex/lib/core/observability/tests/test_llm_metrics.py @@ -0,0 +1,83 @@ +"""Tests for ``agentex.lib.core.observability.llm_metrics``.""" + +from __future__ import annotations + +import agentex.lib.core.observability.llm_metrics as llm_metrics +from agentex.lib.core.observability.llm_metrics import ( + LLMMetrics, + classify_status, + get_llm_metrics, +) + + +class TestClassifyStatus: + def test_none_is_success(self): + assert classify_status(None) == "success" + + def test_rate_limit(self): + class RateLimitError(Exception): + pass + + assert classify_status(RateLimitError()) == "rate_limit" + + def test_timeout(self): + class APITimeoutError(Exception): + pass + + assert classify_status(APITimeoutError()) == "timeout" + + def test_server_error(self): + class InternalServerError(Exception): + pass + + assert classify_status(InternalServerError()) == "server_error" + + class ServiceUnavailable(Exception): + pass + + assert classify_status(ServiceUnavailable()) == "server_error" + + def test_network_error(self): + class APIConnectionError(Exception): + pass + + assert classify_status(APIConnectionError()) == "network_error" + + def test_client_error(self): + for cls_name in ("BadRequestError", "AuthenticationError", "PermissionError"): + cls = type(cls_name, (Exception,), {}) + assert classify_status(cls()) == "client_error" + + def test_unknown_falls_back(self): + class WeirdProviderException(Exception): + pass + + assert classify_status(WeirdProviderException()) == "other_error" + + +class TestGetLLMMetrics: + def test_returns_llm_metrics_instance(self, monkeypatch): + monkeypatch.setattr(llm_metrics, "_llm_metrics", None) + m = get_llm_metrics() + assert isinstance(m, LLMMetrics) + + def test_singleton_returns_same_instance(self, monkeypatch): + monkeypatch.setattr(llm_metrics, "_llm_metrics", None) + first = get_llm_metrics() + second = get_llm_metrics() + assert first is second + + def test_instruments_exist(self, monkeypatch): + monkeypatch.setattr(llm_metrics, "_llm_metrics", None) + m = get_llm_metrics() + for name in ( + "requests", + "ttft_ms", + "ttat_ms", + "tps", + "input_tokens", + "output_tokens", + "cached_input_tokens", + "reasoning_tokens", + ): + assert hasattr(m, name), f"missing instrument: {name}" diff --git a/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py b/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py new file mode 100644 index 000000000..a2cef95b8 --- /dev/null +++ b/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py @@ -0,0 +1,215 @@ +"""Tests for ``agentex.lib.core.observability.llm_metrics_hooks``.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +import agentex.lib.core.observability.llm_metrics_hooks as hooks_module +from agentex.lib.core.observability.llm_metrics_hooks import ( + LLMMetricsHooks, + record_llm_failure, +) + + +def _mock_response( + *, + input_tokens: int = 100, + output_tokens: int = 50, + cached_tokens: int = 30, + reasoning_tokens: int = 10, +) -> MagicMock: + response = MagicMock() + response.usage.input_tokens = input_tokens + response.usage.output_tokens = output_tokens + response.usage.input_tokens_details.cached_tokens = cached_tokens + response.usage.output_tokens_details.reasoning_tokens = reasoning_tokens + return response + + +def _mock_agent(model: str = "gpt-5") -> MagicMock: + agent = MagicMock() + agent.model = model + return agent + + +class TestLLMMetricsHooksOnLLMEnd: + @pytest.mark.asyncio + async def test_emits_success_request_counter(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent("gpt-5"), + response=_mock_response(), + ) + + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"}) + + @pytest.mark.asyncio + async def test_emits_token_counters(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent("gpt-5"), + response=_mock_response( + input_tokens=200, + output_tokens=75, + cached_tokens=50, + reasoning_tokens=20, + ), + ) + + attrs = {"model": "gpt-5"} + m.input_tokens.add.assert_called_once_with(200, attrs) + m.output_tokens.add.assert_called_once_with(75, attrs) + m.cached_input_tokens.add.assert_called_once_with(50, attrs) + m.reasoning_tokens.add.assert_called_once_with(20, attrs) + + @pytest.mark.asyncio + async def test_zero_tokens_emit_zero_not_skip(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_mock_response(input_tokens=0, output_tokens=0, cached_tokens=0, reasoning_tokens=0), + ) + + m.input_tokens.add.assert_called_once_with(0, {"model": "gpt-5"}) + m.output_tokens.add.assert_called_once_with(0, {"model": "gpt-5"}) + + @pytest.mark.asyncio + async def test_unknown_model_falls_back(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + agent = MagicMock() + agent.model = None + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=agent, + response=_mock_response(), + ) + + m.requests.add.assert_called_once_with(1, {"model": "unknown", "status": "success"}) + + @pytest.mark.asyncio + async def test_swallows_exporter_failure(self, monkeypatch): + m = MagicMock() + m.requests.add.side_effect = RuntimeError("exporter exploded") + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + # Should not raise — caller's flow must not break on metric failure. + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_mock_response(), + ) + + @pytest.mark.asyncio + async def test_missing_usage_still_emits_request_counter(self, monkeypatch): + """Provider returns a response without `usage` — caller shouldn't crash, + and we should still record the success request counter.""" + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + class _Response: + @property + def usage(self): + raise AttributeError("no usage") + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_Response(), # type: ignore[arg-type] + ) + + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"}) + m.input_tokens.add.assert_not_called() + m.output_tokens.add.assert_not_called() + + @pytest.mark.asyncio + async def test_missing_token_details_skips_those_counters(self, monkeypatch): + """Provider returns Usage without input_tokens_details (e.g. some + litellm wrappers / non-OpenAI providers): top-level token counts + still emit; the nested cached/reasoning counters are skipped.""" + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + class _Usage: + input_tokens = 100 + output_tokens = 50 + + @property + def input_tokens_details(self): + raise AttributeError("no details") + + class _Response: + usage = _Usage() + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_Response(), # type: ignore[arg-type] + ) + + # Request counter still fires (it's outside the usage-extraction try). + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"}) + # input_tokens.add fires before the nested attribute access. + m.input_tokens.add.assert_called_once_with(100, {"model": "gpt-5"}) + # cached_input_tokens / reasoning_tokens skipped — the AttributeError + # bailed before they could be called. + m.cached_input_tokens.add.assert_not_called() + m.reasoning_tokens.add.assert_not_called() + + @pytest.mark.asyncio + async def test_none_token_values_emit_as_zero(self, monkeypatch): + """Some providers report None instead of 0 for fields they don't track.""" + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + response = MagicMock() + response.usage.input_tokens = None + response.usage.output_tokens = None + response.usage.input_tokens_details.cached_tokens = None + response.usage.output_tokens_details.reasoning_tokens = None + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=response, + ) + + attrs = {"model": "gpt-5"} + m.input_tokens.add.assert_called_once_with(0, attrs) + m.output_tokens.add.assert_called_once_with(0, attrs) + m.cached_input_tokens.add.assert_called_once_with(0, attrs) + m.reasoning_tokens.add.assert_called_once_with(0, attrs) + + +class TestRecordLLMFailure: + def test_emits_classified_status(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + class RateLimitError(Exception): + pass + + record_llm_failure("gpt-5", RateLimitError()) + + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "rate_limit"}) + + def test_swallows_exporter_failure(self, monkeypatch): + m = MagicMock() + m.requests.add.side_effect = RuntimeError("exporter exploded") + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + # Should not raise. + record_llm_failure("gpt-5", Exception("upstream")) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py index cc27006fc..758b0db27 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py @@ -8,18 +8,19 @@ from typing import Any, override from datetime import timedelta -from agents import Tool, Agent, RunHooks, RunContextWrapper +from agents import Tool, Agent, RunContextWrapper from temporalio import workflow from agents.tool_context import ToolContext from agentex.types.text_content import TextContent from agentex.types.task_message_content import ToolRequestContent, ToolResponseContent +from agentex.lib.core.observability.llm_metrics_hooks import LLMMetricsHooks from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content logger = logging.getLogger(__name__) -class TemporalStreamingHooks(RunHooks): +class TemporalStreamingHooks(LLMMetricsHooks): """Convenience hooks class for streaming OpenAI Agent lifecycle events to the AgentEx UI. This class automatically streams agent lifecycle events (tool calls, handoffs) to the diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 4f18ae379..7ccc6627a 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -1,6 +1,7 @@ """Custom Temporal Model Provider with streaming support for OpenAI agents.""" from __future__ import annotations +import time import uuid from typing import Any, List, Union, Optional, override @@ -31,6 +32,8 @@ # Re-export the canonical StreamingMode literal from the streaming service so # all layers share a single definition. from agentex.lib.core.services.adk.streaming import StreamingMode as StreamingMode +from agentex.lib.core.observability.llm_metrics import get_llm_metrics +from agentex.lib.core.observability.llm_metrics_hooks import record_llm_failure try: from agents.tool import ShellTool # type: ignore[attr-defined] @@ -78,6 +81,11 @@ logger = make_logger("agentex.temporal.streaming") +# LLM metrics live in agentex.lib.core.observability.llm_metrics so other +# code paths (sync ACP, Claude SDK plugin, future provider integrations) +# can share the same instrument definitions without redefining names. + + def _serialize_item(item: Any) -> dict[str, Any]: """ Universal serializer for any item type from OpenAI Agents SDK. @@ -592,7 +600,11 @@ async def get_response( # endpoints recognize this parameter, so we don't auto-inject a default. prompt_cache_key = extra_args.pop("prompt_cache_key", NOT_GIVEN) - # Create the response stream using Responses API + # Create the response stream using Responses API. + # Bookmark request start *before* the await so ttft captures the full + # user-perceived latency (HTTP round-trip + model TTFB), not just the + # post-connect event-loop delay. + stream_start_perf = time.perf_counter() logger.debug(f"[TemporalStreamingModel] Creating response stream with Responses API") stream = await self.client.responses.create( # type: ignore[call-overload] @@ -642,6 +654,16 @@ async def get_response( reasoning_summaries = [] reasoning_contents = [] event_count = 0 + # ttft / ttat / tps instrumentation. ``stream_start_perf`` is set + # above, before the responses.create() await, so it captures the full + # request-to-first-token latency. ``first_token_at`` and + # ``last_token_at`` bracket the model-generation window for tps. + # ``first_answer_at`` is set on the first user-visible answer token + # (text or tool-call delta) and excludes reasoning chunks, so ttat + # measures the latency users actually perceive on reasoning models. + first_token_at: Optional[float] = None + last_token_at: Optional[float] = None + first_answer_at: Optional[float] = None # We expect task_id to always be provided for streaming if not task_id: @@ -656,6 +678,28 @@ async def get_response( # Log event type logger.debug(f"[TemporalStreamingModel] Event {event_count}: {type(event).__name__}") + # Bookmark first/last token-producing events for ttft and tps. + # Includes function-call argument deltas so the generation window + # covers every event type whose tokens land in usage.output_tokens. + if isinstance(event, ( + ResponseTextDeltaEvent, + ResponseReasoningTextDeltaEvent, + ResponseReasoningSummaryTextDeltaEvent, + ResponseFunctionCallArgumentsDeltaEvent, + )): + now_perf = time.perf_counter() + if first_token_at is None: + first_token_at = now_perf + last_token_at = now_perf + # ttat: first user-visible answer token (text or tool call), + # excluding reasoning chunks. Equal to ttft for non-reasoning + # models; differs by reasoning duration for reasoning models. + if first_answer_at is None and isinstance(event, ( + ResponseTextDeltaEvent, + ResponseFunctionCallArgumentsDeltaEvent, + )): + first_answer_at = now_perf + # Handle different event types using isinstance for type safety if isinstance(event, ResponseOutputItemAddedEvent): # New output item (reasoning, function call, or message) @@ -983,6 +1027,25 @@ async def get_response( span.output = output_data + # Streaming-only metrics. Token counters and the success request + # counter are emitted by LLMMetricsHooks.on_llm_end so they fire + # consistently across streaming and non-streaming paths. + m = get_llm_metrics() + metric_attrs = {"model": self.model_name} + if first_token_at is not None: + m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) + if first_answer_at is not None: + m.ttat_ms.record((first_answer_at - stream_start_perf) * 1000, metric_attrs) + # Single-token responses collapse the generation window to 0; tps + # is undefined and skipped. + if ( + first_token_at is not None + and last_token_at is not None + and last_token_at > first_token_at + and (usage.output_tokens or 0) > 0 + ): + m.tps.record(usage.output_tokens / (last_token_at - first_token_at), metric_attrs) + # Return the response. response_id is the server-issued id from # ResponseCompletedEvent.response.id, or None when the stream ended # without a completed event (error path) — matching the documented @@ -998,6 +1061,10 @@ async def get_response( except Exception as e: logger.error(f"Error using Responses API: {e}") + # LLMMetricsHooks.on_llm_end doesn't fire on error, so emit the + # failure counter here. Best-effort so the typed LLM exception + # always propagates intact for retry / circuit-breaker logic. + record_llm_failure(self.model_name, e) raise # The _get_response_with_responses_api method has been merged into get_response above