diff --git a/PR_DESCRIPTION.md b/PR_DESCRIPTION.md new file mode 100644 index 0000000000..c2a231de11 --- /dev/null +++ b/PR_DESCRIPTION.md @@ -0,0 +1,23 @@ +# PR Title +feat(otel): instrument LLM providers for latency and prompt visibility + +# PR Description + +## Summary +This PR adds manual OpenTelemetry instrumentation to the core LLM providers in Timesketch (Google GenAI/Gemini, Ollama, and Azure AI). This enables deep observability into AI-driven features, allowing analysts to track request latency, model performance, and prompt context. + +## Why this is important +LLM requests are often the most time-consuming part of an analysis workflow. Without tracing, it is difficult to distinguish between network lag, model processing time, or application-layer overhead. This PR makes these "black box" operations transparent in the trace waterfall. + +## Changes +- **Google GenAI (Gemini):** Wrapped `generate()` in a span; captured `llm.provider`, `llm.model`, `llm.prompt_length`, and `llm.response_length`. +- **Ollama:** Wrapped `generate()` in a span; captured similar metadata for local LLM usage. +- **Azure AI:** Wrapped `generate()` in a span; captured similar metadata for Azure-hosted models. +- **Privacy by Design:** Full prompt and response text are intentionally **excluded** from telemetry to ensure no sensitive analyst conversation data is stored in the trace backend. Instead, we capture **character lengths**, providing sufficient data for performance and cost analysis without any risk of data leakage. +- **Error Handling:** Integrated OTel exception recording and status reporting for all model generation failures. + +## Dependencies +This PR is designed to be merged **after** the core OpenTelemetry Phase 2 PR, as it relies on the `telemetry` helper and the global privacy scrubber. + +## Verification +Verified locally using the Jaeger stack. LLM requests now appear as clear, attribute-rich spans under the parent API or task spans. diff --git a/docker/dev/otel-collector-config.yaml b/data/otel-collector-config.yaml similarity index 100% rename from docker/dev/otel-collector-config.yaml rename to data/otel-collector-config.yaml diff --git a/docker/dev/docker-compose-telemetry.yml b/docker/dev/docker-compose-telemetry.yml index e5ea2cf645..be2d13c44f 100644 --- a/docker/dev/docker-compose-telemetry.yml +++ b/docker/dev/docker-compose-telemetry.yml @@ -4,7 +4,7 @@ services: image: otel/opentelemetry-collector:0.100.0 command: ["--config=/etc/otel-collector-config.yaml"] volumes: - - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + - ../../data/otel-collector-config.yaml:/etc/otel-collector-config.yaml ports: - "127.0.0.1:4317:4317" - "127.0.0.1:4318:4318" diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index 7ca856ab5a..1ba2f7ba8e 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -28,7 +28,7 @@ services: - TIMESKETCH_PASSWORD=dev - CHOKIDAR_USEPOLLING=true - PROMETHEUS_MULTIPROC_DIR=/tmp/ - - ENABLE_STRUCTURED_LOGGING=true + # - ENABLE_STRUCTURED_LOGGING=true # Telemetry settings (Uncomment to enable locally) # - TIMESKETCH_OTEL_MODE=otlp-grpc # - TIMESKETCH_OTLP_GRPC_ENDPOINT=otel-collector:4317 diff --git a/docs/OpenTelemetry.md b/docs/OpenTelemetry.md index 0aab8323d3..404371abbf 100644 --- a/docs/OpenTelemetry.md +++ b/docs/OpenTelemetry.md @@ -7,19 +7,16 @@ This document provides a comprehensive guide for developers, admins, and users o ## 1. Overview Timesketch uses OpenTelemetry to provide distributed tracing across its web (Flask) and worker (Celery) components. This enables deep observability into request life cycles and background task performance. -### Key Benefits -* **Distributed Tracing:** Track a single request from an external tool (like `dftimewolf`) through the API and into background analyzers. -* **Log Correlation:** Trace IDs and Span IDs are automatically injected into structured JSON logs, allowing you to jump from a log line directly to a trace waterfall in tools like GCP Cloud Trace or Jaeger. -* **Standardized Protocol:** Uses the industry-standard OpenTelemetry Protocol (OTLP). - --- ## 2. Architecture -The instrumentation is centralized in a dedicated module: `timesketch/lib/telemetry.py`. +The instrumentation is centralized in `timesketch/lib/telemetry.py`. -* **Flask Instrumentation:** Automatically captures spans for all HTTP requests, including route patterns and status codes. -* **Celery Instrumentation:** Captures spans for both task dispatching (producer) and execution (worker), maintaining the trace context across process boundaries. -* **Async Exporting:** Spans are exported asynchronously using a `BatchSpanProcessor` to ensure minimal impact on application performance. +* **Flask:** Captures all HTTP requests, status codes, and analyst identity. +* **Celery:** Maintains trace context across background tasks (analyzers, data imports). +* **OpenSearch:** Manual instrumentation captures search query structure (`db.statement`), targeted indices, and internal processing time (`took_ms`). +* **SQLAlchemy (Postgres):** Automatically captures SQL statements and database connection health. +* **Async Exporting:** Uses `BatchSpanProcessor` for zero-impact on application performance. --- @@ -33,6 +30,7 @@ Telemetry is controlled entirely via environment variables. | `TIMESKETCH_OTLP_HTTP_ENDPOINT` | OTLP collector endpoint (HTTP). | `http://jaeger:4318/v1/traces` | | `TIMESKETCH_OTLP_INSECURE` | Use insecure (non-TLS) connection. | `true` (default for dev) | | `TIMESKETCH_ENV` | Environment identifier. | `production`, `development` | +| `ENABLE_STRUCTURED_LOGGING` | Enable JSON logging with trace context. | `true`, `false` | ### Supported Modes: 1. **`otlp-grpc`:** Best for local collectors (e.g., OTel Collector or Jaeger). @@ -74,7 +72,7 @@ The Tilt dashboard will show `otel-collector` and `jaeger` resources, including --- -## 5. Visualization Options +## 6. Visualization Options The local environment provides two ways to see your traces. You can switch between them by changing the `TIMESKETCH_OTLP_GRPC_ENDPOINT`. @@ -91,7 +89,7 @@ The local environment provides two ways to see your traces. You can switch betwe --- -## 6. Triggering Activity & Verification +## 7. Triggering Activity & Verification Generate some traffic to verify the setup: ```bash # Trigger a Flask Trace (API Call) @@ -102,14 +100,14 @@ docker exec timesketch-dev celery -A timesketch.lib.tasks call timesketch.lib.ta ``` **Check Application Logs:** -Verify that `trace_id` and `span_id` appear in the JSON output: +Verify that `trace_id` appears in the output: ```bash docker logs timesketch-dev | grep trace_id ``` --- -## 7. Secure Private Access (GCP) +## 8. Secure Private Access (GCP) If you are running Timesketch on a private GCE VM without an external IP, you can "proxy in" securely using **Identity-Aware Proxy (IAP) Tunneling**. ### Accessing the Web Interfaces @@ -145,43 +143,8 @@ gcloud compute start-iap-tunnel timesketch-otel-lab 5000 \ --- -## 8. Deployment Guide (GCP) +## 9. Deployment Guide (GCP) To enable production tracing in GCP: 1. Set `TIMESKETCH_OTEL_MODE=otlp-default-gce`. 2. Ensure the service account running Timesketch has the `roles/cloudtrace.agent` role. 3. View your traces in the [GCP Trace Explorer](https://console.cloud.google.com/traces/explorer). - ---- - -## 8. Information for Developers - -### Automated Coverage -Most common operations are already covered by auto-instrumentation: -* **Web API:** All Flask routes, status codes, and HTTP methods. -* **Background Tasks:** All Celery task dispatching and executions. -* **Analyzers:** All analyzers automatically report `sketch_id`, `analyzer_name`, `timeline_id`, and execution status via the `BaseAnalyzer` interface. - -### Adding Custom Attributes & Events -If you need to record specific domain metadata (e.g., number of matches found, search query used) from within your code, use the helpers in `timesketch.lib.telemetry`. - -#### Example: Adding attributes in an Analyzer -```python -from timesketch.lib import telemetry - -def analyze(self): - # ... logic ... - matches_found = len(results) - - # This will appear in the Span attributes in Jaeger/GCP - telemetry.add_attribute_to_current_span("sigma.matches_count", matches_found) - - # Record a significant milestone as an event - telemetry.add_event_to_current_span("Finished parsing rules") - - return f"Found {matches_found} matches." -``` - -#### Best Practices for Attributes -* **Use Namespace Prefixes:** To avoid collisions, prefix your attributes (e.g., `sigma.rule_id`, `sketch.member_count`). -* **Data Types:** Simple types (strings, ints, bools, floats) are stored natively. Complex objects (dicts, lists) are automatically serialized to JSON. -* **Avoid PII:** Never record sensitive user data or authentication tokens in span attributes. diff --git a/requirements.txt b/requirements.txt index e807936950..c570f71e99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ opentelemetry-api==1.24.0 opentelemetry-sdk==1.24.0 opentelemetry-instrumentation-flask==0.45b0 opentelemetry-instrumentation-celery==0.45b0 +opentelemetry-instrumentation-sqlalchemy>=0.45b0 google-cloud-trace>=1.4.0 opentelemetry-exporter-gcp-trace>=1.6.0 opentelemetry-exporter-otlp==1.24.0 diff --git a/timesketch/app.py b/timesketch/app.py index f2d641a3cf..c52841ce1c 100644 --- a/timesketch/app.py +++ b/timesketch/app.py @@ -30,10 +30,6 @@ from flask_restful import Api from flask_wtf import CSRFProtect -try: - from opentelemetry import trace -except ImportError: - trace = None from timesketch.lib import telemetry from timesketch.api.v1.routes import API_ROUTES as V1_API_ROUTES @@ -267,20 +263,17 @@ def format(self, record): "module": record.module, } - if trace: - span_context = trace.get_current_span().get_span_context() - if span_context.is_valid: - t_id = trace.format_trace_id(span_context.trace_id) - s_id = trace.format_span_id(span_context.span_id) - log_record["trace_id"] = t_id - log_record["span_id"] = s_id - # GCP specific correlation fields - project_id = os.getenv("GOOGLE_CLOUD_PROJECT") - if project_id: - log_record["logging.googleapis.com/trace"] = ( - f"projects/{project_id}/traces/{t_id}" - ) - log_record["logging.googleapis.com/spanId"] = s_id + # Add trace correlation if TraceLogFilter has run + if hasattr(record, "trace_id"): + log_record["trace_id"] = record.trace_id + log_record["span_id"] = record.span_id + # GCP specific correlation fields + project_id = os.getenv("GOOGLE_CLOUD_PROJECT") + if project_id: + log_record["logging.googleapis.com/trace"] = ( + f"projects/{project_id}/traces/{record.trace_id}" + ) + log_record["logging.googleapis.com/spanId"] = record.span_id if record.exc_info: formatted_trace = self.formatException(record.exc_info) @@ -290,6 +283,7 @@ def format(self, record): logger_object = logging.getLogger("timesketch") logger_filter = NoESFilter() + trace_filter = telemetry.TraceLogFilter() use_structured_logging = ( os.environ.get("ENABLE_STRUCTURED_LOGGING", "false").lower() == "true" @@ -299,6 +293,7 @@ def format(self, record): handler = logging.StreamHandler(sys.stdout) handler.setFormatter(JSONLogFormatter(datefmt="%Y-%m-%dT%H:%M:%S%z")) handler.addFilter(logger_filter) + handler.addFilter(trace_filter) root = logging.getLogger() for h in root.handlers[:]: @@ -310,11 +305,12 @@ def format(self, record): else: logger_formatter = logging.Formatter( - "[%(asctime)s] %(name)s/%(levelname)s %(message)s" + "[%(asctime)s] %(name)s/%(levelname)s [trace_id=%(trace_id)s] %(message)s" ) for handler in logger_object.parent.handlers: handler.setFormatter(logger_formatter) handler.addFilter(logger_filter) + handler.addFilter(trace_filter) def create_celery_app(): diff --git a/timesketch/lib/analyzers/interface.py b/timesketch/lib/analyzers/interface.py index 7073f1e97d..ee28757ed2 100644 --- a/timesketch/lib/analyzers/interface.py +++ b/timesketch/lib/analyzers/interface.py @@ -1157,6 +1157,7 @@ def run_wrapper(self, analysis_id): analysis.set_status("DONE") telemetry.add_attribute_to_current_span("status", "success") + telemetry.set_status_on_current_span("OK") telemetry.add_event_to_current_span(f"Analyzer {self.name} completed") except Exception as e: # pylint: disable=broad-except analysis.set_status("ERROR") @@ -1170,6 +1171,7 @@ def run_wrapper(self, analysis_id): ) telemetry.add_attribute_to_current_span("status", "error") telemetry.add_attribute_to_current_span("error_message", str(e)) + telemetry.set_status_on_current_span("ERROR", description=str(e)) telemetry.add_event_to_current_span(f"Analyzer {self.name} failed") # Update database analysis object with result and status diff --git a/timesketch/lib/datastores/opensearch.py b/timesketch/lib/datastores/opensearch.py index cf4d73df64..f8b3d8f039 100644 --- a/timesketch/lib/datastores/opensearch.py +++ b/timesketch/lib/datastores/opensearch.py @@ -27,6 +27,9 @@ from typing import Generator, List, Dict, Optional, Any, Union from dateutil import parser, relativedelta +from flask import abort +from flask import current_app +from flask_login import current_user from opensearchpy import OpenSearch from opensearchpy.exceptions import ConnectionTimeout from opensearchpy.exceptions import NotFoundError @@ -36,14 +39,12 @@ # pylint: disable=redefined-builtin from opensearchpy.exceptions import ConnectionError -from flask import abort -from flask import current_app -from flask_login import current_user import prometheus_client +from timesketch.lib import errors +from timesketch.lib import telemetry from timesketch.lib.definitions import HTTP_STATUS_CODE_NOT_FOUND from timesketch.lib.definitions import METRICS_NAMESPACE -from timesketch.lib import errors # Setup logging os_logger = logging.getLogger("timesketch.opensearch") @@ -841,151 +842,176 @@ def search( if isinstance(indices, str): indices = [indices] - scroll_timeout = None - if enable_scroll: - scroll_timeout = "1m" # Default to 1 minute scroll timeout - - # Exit early if we have no indices to query - if not indices: - return {"hits": {"hits": [], "total": 0}, "took": 0} - - # Make sure that the list of index names is uniq. - indices = list(set(indices)) - - if query_filter is None: - query_filter = {} + tracer = telemetry.get_tracer(__name__) + with tracer.start_as_current_span("opensearch.search") as span: + span.set_attribute("db.system", "opensearch") + span.set_attribute("db.operation", "search") + span.set_attribute("sketch_id", sketch_id) + span.set_attribute("db.opensearch.indices", indices) + + scroll_timeout = None + if enable_scroll: + scroll_timeout = "1m" # Default to 1 minute scroll timeout + span.set_attribute("db.opensearch.enable_scroll", True) + + # Exit early if we have no indices to query + if not indices: + return {"hits": {"hits": [], "total": 0}, "took": 0} + + # Make sure that the list of index names is uniq. + indices = list(set(indices)) + + if query_filter is None: + query_filter = {} + + # Check if we have specific events to fetch and get indices. + if query_filter.get("events", None): + indices = { + event["index"] + for event in query_filter["events"] + if event["index"] in indices + } - # Check if we have specific events to fetch and get indices. - if query_filter.get("events", None): - indices = { - event["index"] - for event in query_filter["events"] - if event["index"] in indices - } + query_dsl = self.build_query( + sketch_id=sketch_id, + query_string=query_string, + query_filter=query_filter, + query_dsl=query_dsl, + aggregations=aggregations, + timeline_ids=timeline_ids, + ) - query_dsl = self.build_query( - sketch_id=sketch_id, - query_string=query_string, - query_filter=query_filter, - query_dsl=query_dsl, - aggregations=aggregations, - timeline_ids=timeline_ids, - ) + # Default search type for OpenSearch is query_then_fetch. + search_type = "query_then_fetch" - # Default search type for OpenSearch is query_then_fetch. - search_type = "query_then_fetch" + # Only return how many documents matches the query. + if count: + if "sort" in query_dsl: + del query_dsl["sort"] + try: + count_result = self.client.count( + body=query_dsl, + index=list(indices), + params={"ignore_unavailable": "true"}, + ) + span.set_status(telemetry.get_status_code("OK")) + except TransportError as e: + span.record_exception(e) + span.set_status( + telemetry.get_status_code("ERROR"), description=str(e) + ) + os_logger.error( + "Unable to count for sketch [%s] on indices [%s] - Error: %s", + sketch_id, + ",".join(indices), + e, + exc_info=True, + ) + os_logger.debug( + "Query DSL for count error: %s", json.dumps(query_dsl, indent=2) + ) + return 0 + METRICS["search_requests"].labels(type="count").inc() + return count_result.get("count", 0) - # Only return how many documents matches the query. - if count: - if "sort" in query_dsl: - del query_dsl["sort"] try: - count_result = self.client.count( - body=query_dsl, - index=list(indices), - params={"ignore_unavailable": "true"}, + if not return_fields: + # Suppress the lint error because opensearchpy adds parameters + # to the function with a decorator and this makes pylint sad. + _search_result = ( + self.client.search( # pylint: disable=unexpected-keyword-arg + body=query_dsl, + index=list(indices), + search_type=search_type, + scroll=scroll_timeout, + params={"ignore_unavailable": "true"}, + ) + ) + elif self.version.startswith("6"): + # The argument "_source_include" changed to "_source_includes" + # in ES version 7. This check add support for both. + _search_result = ( + self.client.search( # pylint: disable=unexpected-keyword-arg + body=query_dsl, + index=list(indices), + search_type=search_type, + _source_include=return_fields, + scroll=scroll_timeout, + params={"ignore_unavailable": "true"}, + ) + ) + else: + _search_result = ( + self.client.search( # pylint: disable=unexpected-keyword-arg + body=query_dsl, + index=list(indices), + search_type=search_type, + _source_includes=return_fields, + scroll=scroll_timeout, + params={"ignore_unavailable": "true"}, + ) + ) + + span.set_status(telemetry.get_status_code("OK")) + took_ms = _search_result.get("took", 0) + span.set_attribute("db.opensearch.took_ms", took_ms) + + except ConnectionTimeout as e: + span.record_exception(e) + span.set_status(telemetry.get_status_code("ERROR"), description=str(e)) + wildcard_warning = "" + if query_string.startswith("*"): + wildcard_warning = ( + " IMPORTANT: Avoid leading wildcards (e.g. *searchterm) in " + "your search query as these are very resource expensive." + ) + error_message = ( + "The search timed out. Try to search a specific field or narrow " + f"down the time range.{wildcard_warning}" ) - except TransportError as e: os_logger.error( - "Unable to count for sketch [%s] on indices [%s] - Error: %s", + "Search timeout for user [%s]. Query: [%s]. Sketch ID: [%s]. " + "Indices: [%s].", + current_user.username, + query_string, sketch_id, - ",".join(indices), - e, - exc_info=True, - ) - os_logger.debug( - "Query DSL for count error: %s", json.dumps(query_dsl, indent=2) - ) - return 0 - METRICS["search_requests"].labels(type="count").inc() - return count_result.get("count", 0) - - try: - if not return_fields: - # Suppress the lint error because opensearchpy adds parameters - # to the function with a decorator and this makes pylint sad. - # pylint: disable=unexpected-keyword-arg - return self.client.search( - body=query_dsl, - index=list(indices), - search_type=search_type, - scroll=scroll_timeout, - params={"ignore_unavailable": "true"}, + indices, ) + raise errors.DatastoreTimeoutError(error_message) from e + + except (RequestError, TransportError) as e: + span.record_exception(e) + span.set_status(telemetry.get_status_code("ERROR"), description=str(e)) + root_cause = e.info.get("error", {}).get("root_cause") + if root_cause: + error_items = [] + for cause in root_cause: + error_items.append( + "[{:s}] {:s}".format( + cause.get("type", ""), cause.get("reason", "") + ) + ) + cause = ", ".join(error_items) + else: + cause = str(e) - # The argument " _source_include" changed to "_source_includes" in - # ES version 7. This check add support for both version 6 and 7 clients. - # pylint: disable=unexpected-keyword-arg - if self.version.startswith("6"): - _search_result = self.client.search( - body=query_dsl, - index=list(indices), - search_type=search_type, - _source_include=return_fields, - scroll=scroll_timeout, - params={"ignore_unavailable": "true"}, - ) - else: - _search_result = self.client.search( - body=query_dsl, - index=list(indices), - search_type=search_type, - _source_includes=return_fields, - scroll=scroll_timeout, - params={"ignore_unavailable": "true"}, + os_logger.error( + "Unable to run search query for user [%s]. Error: %s. " + "Sketch ID: [%s]. Indices: [%s].", + current_user.username, + cause, + sketch_id, + indices, + exc_info=True, ) - except ConnectionTimeout as e: - wildcard_warning = "" - if query_string.startswith("*"): - wildcard_warning = ( - " IMPORTANT: Avoid leading wildcards (e.g. *searchterm) in " - "your search query as these are very resource expensive." + user_friendly_message = ( + f"There was an issue with your search query: {cause}. " + "Please review your query syntax and try again." ) - error_message = ( - "The search timed out. Try to search a specific field or narrow " - f"down the time range.{wildcard_warning}" - ) - os_logger.error( - "Search timeout for user [%s]. Query: [%s]. Sketch ID: [%s]. " - "Indices: [%s].", - current_user.username, - query_string, - sketch_id, - indices, - ) - raise errors.DatastoreTimeoutError(error_message) from e - - except (RequestError, TransportError) as e: - root_cause = e.info.get("error", {}).get("root_cause") - if root_cause: - error_items = [] - for cause in root_cause: - error_items.append( - "[{:s}] {:s}".format( - cause.get("type", ""), cause.get("reason", "") - ) - ) - cause = ", ".join(error_items) - else: - cause = str(e) - - os_logger.error( - "Unable to run search query for user [%s]. Error: %s. " - "Sketch ID: [%s]. Indices: [%s].", - current_user.username, - cause, - sketch_id, - indices, - exc_info=True, - ) - user_friendly_message = ( - f"There was an issue with your search query: {cause}. " - "Please review your query syntax and try again." - ) - raise ValueError(user_friendly_message) from e + raise ValueError(user_friendly_message) from e - METRICS["search_requests"].labels(type="single").inc() - return _search_result + METRICS["search_requests"].labels(type="single").inc() + return _search_result # pylint: disable=too-many-arguments diff --git a/timesketch/lib/llms/providers/contrib/azureai.py b/timesketch/lib/llms/providers/contrib/azureai.py index 7cba07399c..d4d9528737 100644 --- a/timesketch/lib/llms/providers/contrib/azureai.py +++ b/timesketch/lib/llms/providers/contrib/azureai.py @@ -4,6 +4,7 @@ from typing import Optional, Any, Union import requests from timesketch.lib.llms.providers import interface, manager +from timesketch.lib import telemetry # Default configuration values DEFAULT_API_VERSION = "2024-02-15-preview" @@ -45,45 +46,55 @@ def generate( self, prompt: str, response_schema: Optional[dict] = None ) -> Union[dict, str]: - url = ( - f"{self.endpoint}/openai/deployments/{self.model}/chat/completions?" - f"api-version={self.api_version}" - ) - headers = {"Content-Type": "application/json", "api-key": self.api_key} - data = { - "messages": [{"role": "user", "content": prompt}], - "max_tokens": self.config.get( - "max_output_tokens", interface.DEFAULT_MAX_OUTPUT_TOKENS - ), - "temperature": self.config.get( - "temperature", interface.DEFAULT_TEMPERATURE - ), - "top_p": self.config.get("top_p", interface.DEFAULT_TOP_P), - } - try: - response = requests.post( - url, headers=headers, json=data, timeout=self.timeout - ) - response.raise_for_status() - response_data = response.json()["choices"][0]["message"]["content"] - except (KeyError, IndexError) as e: - raise ValueError( - f"Unexpected response structure from Azure API: {response.json()}" - ) from e + tracer = telemetry.get_tracer(__name__) + with tracer.start_as_current_span("llm.azureai.generate") as span: + span.set_attribute("llm.provider", self.NAME) + span.set_attribute("llm.model", self.model) + span.set_attribute("llm.prompt_length", len(prompt)) - if isinstance(response_schema, dict): + url = ( + f"{self.endpoint}/openai/deployments/{self.model}/chat/completions?" + f"api-version={self.api_version}" + ) + headers = {"Content-Type": "application/json", "api-key": self.api_key} + data = { + "messages": [{"role": "user", "content": prompt}], + "max_tokens": self.config.get( + "max_output_tokens", interface.DEFAULT_MAX_OUTPUT_TOKENS + ), + "temperature": self.config.get( + "temperature", interface.DEFAULT_TEMPERATURE + ), + "top_p": self.config.get("top_p", interface.DEFAULT_TOP_P), + } try: - props = response_schema.get("properties") - if props and isinstance(props, dict): - key = next(iter(props.keys()), "") - formatted_data = json.dumps({key: response_data}) - return json.loads(formatted_data) - except json.JSONDecodeError as error: + response = requests.post( + url, headers=headers, json=data, timeout=self.timeout + ) + response.raise_for_status() + response_data = response.json()["choices"][0]["message"]["content"] + span.set_attribute("llm.response_length", len(response_data)) + span.set_status(telemetry.get_status_code("OK")) + + except (KeyError, IndexError, requests.RequestException) as e: raise ValueError( - f"Error JSON parsing text: {formatted_data}: {error}" - ) from error + f"Error generating text with Azure API: {e}" + ) from e + + if isinstance(response_schema, dict): + try: + props = response_schema.get("properties") + if props and isinstance(props, dict): + key = next(iter(props.keys()), "") + formatted_data = json.dumps({key: response_data}) + return json.loads(formatted_data) + except json.JSONDecodeError as error: + span.record_exception(error) + raise ValueError( + f"Error JSON parsing text: {formatted_data}: {error}" + ) from error - return response_data + return response_data manager.LLMManager.register_provider(AzureAI) diff --git a/timesketch/lib/llms/providers/google_genai.py b/timesketch/lib/llms/providers/google_genai.py index dc29c518e6..bed080fc77 100644 --- a/timesketch/lib/llms/providers/google_genai.py +++ b/timesketch/lib/llms/providers/google_genai.py @@ -82,42 +82,53 @@ def generate(self, prompt: str, response_schema: Optional[dict] = None) -> Any: The generated text as a string (or parsed data if response_schema is provided). """ - config_params = { - "temperature": self.config.get("temperature"), - "top_k": self.config.get("top_k"), - "top_p": self.config.get("top_p"), - "max_output_tokens": self.config.get("max_output_tokens"), - } + tracer = telemetry.get_tracer(__name__) + with tracer.start_as_current_span("llm.google_genai.generate") as span: + span.set_attribute("llm.provider", self.NAME) + span.set_attribute("llm.model", self._model_name) + span.set_attribute("llm.prompt_length", len(prompt)) - if response_schema: - config_params["response_mime_type"] = "application/json" - config_params["response_schema"] = response_schema + config_params = { + "temperature": self.config.get("temperature"), + "top_k": self.config.get("top_k"), + "top_p": self.config.get("top_p"), + "max_output_tokens": self.config.get("max_output_tokens"), + } - generate_config = types.GenerateContentConfig(**config_params) + if response_schema: + config_params["response_mime_type"] = "application/json" + config_params["response_schema"] = response_schema - try: - response = self.client.models.generate_content( - model=self._model_name, - contents=prompt, - config=generate_config, - ) - except errors.APIError as e: - error_msg = f"{e.code} {e.status}: {getattr(e, 'message', 'N/A')}" - logger.error("API error during content generation: %s", str(e)) - raise ValueError(f"Error generating content: {error_msg}") from e - except Exception as e: - logger.error("Error generating content with Google GenAI: %s", e) - raise ValueError(f"Error generating content: {e}") from e + generate_config = types.GenerateContentConfig(**config_params) - if response_schema: try: - if hasattr(response, "parsed") and response.parsed is not None: - return response.parsed - return json.loads(response.text) - except Exception as error: - raise ValueError( - f"Error JSON parsing text: {response.text}: {error}" - ) from error + response = self.client.models.generate_content( + model=self._model_name, + contents=prompt, + config=generate_config, + ) + span.set_status(telemetry.get_status_code("OK")) + if response.text: + span.set_attribute("llm.response_length", len(response.text)) + + except errors.APIError as e: + error_msg = f"{e.code} {e.status}: {getattr(e, 'message', 'N/A')}" + logger.error("API error during content generation: %s", str(e)) + raise ValueError(f"Error generating content: {error_msg}") from e + except Exception as e: + span.record_exception(e) + logger.error("Error generating content with Google GenAI: %s", e) + raise ValueError(f"Error generating content: {e}") from e + + if response_schema: + try: + if hasattr(response, "parsed") and response.parsed is not None: + return response.parsed + return json.loads(response.text) + except Exception as error: + raise ValueError( + f"Error JSON parsing text: {response.text}: {error}" + ) from error return response.text diff --git a/timesketch/lib/llms/providers/ollama.py b/timesketch/lib/llms/providers/ollama.py index ef018fde1d..5dd35d7b89 100644 --- a/timesketch/lib/llms/providers/ollama.py +++ b/timesketch/lib/llms/providers/ollama.py @@ -19,6 +19,7 @@ from timesketch.lib.llms.providers import interface from timesketch.lib.llms.providers import manager +from timesketch.lib import telemetry class Ollama(interface.LLMProvider): @@ -89,38 +90,54 @@ def generate(self, prompt: str, response_schema: Optional[dict] = None) -> str: Raises: ValueError: If the request fails or JSON parsing fails. """ - request_body = { - "messages": [{"role": "user", "content": prompt}], - "model": self.config.get("model"), - "stream": self.config.get("stream"), - "options": { - "temperature": self.config.get("temperature"), - "num_predict": self.config.get("max_output_tokens"), - "top_p": self.config.get("top_p"), - "top_k": self.config.get("top_k"), - }, - } - - if response_schema: - request_body["format"] = response_schema - - response = self._post(json.dumps(request_body)) - - if response.status_code != 200: - raise ValueError(f"Error generating text: {response.text}") - - response_data = response.json() - text_response = response_data.get("message", {}).get("content", "").strip() - - if response_schema: - try: - return json.loads(text_response) - except json.JSONDecodeError as error: - raise ValueError( - f"Error JSON parsing text: {text_response}: {error}" - ) from error + tracer = telemetry.get_tracer(__name__) + with tracer.start_as_current_span("llm.ollama.generate") as span: + span.set_attribute("llm.provider", self.NAME) + span.set_attribute("llm.model", self.config.get("model")) + span.set_attribute("llm.prompt_length", len(prompt)) + + request_body = { + "messages": [{"role": "user", "content": prompt}], + "model": self.config.get("model"), + "stream": self.config.get("stream"), + "options": { + "temperature": self.config.get("temperature"), + "num_predict": self.config.get("max_output_tokens"), + "top_p": self.config.get("top_p"), + "top_k": self.config.get("top_k"), + }, + } + + if response_schema: + request_body["format"] = response_schema - return text_response + try: + response = self._post(json.dumps(request_body)) + + if response.status_code != 200: + span.set_status(telemetry.get_status_code("ERROR")) + raise ValueError(f"Error generating text: {response.text}") + + response_data = response.json() + text_response = response_data.get("message", {}).get("content", "").strip() + span.set_attribute("llm.response_length", len(text_response)) + span.set_status(telemetry.get_status_code("OK")) + + if response_schema: + try: + return json.loads(text_response) + except json.JSONDecodeError as error: + span.record_exception(error) + raise ValueError( + f"Error JSON parsing text: {text_response}: {error}" + ) from error + + return text_response + except Exception as e: + span.record_exception(e) + if not isinstance(e, ValueError): + raise ValueError(f"Error generating text: {e}") from e + raise manager.LLMManager.register_provider(Ollama) diff --git a/timesketch/lib/telemetry.py b/timesketch/lib/telemetry.py index 897ce06801..9d7e322b79 100644 --- a/timesketch/lib/telemetry.py +++ b/timesketch/lib/telemetry.py @@ -13,6 +13,7 @@ # limitations under the License. """Module providing OpenTelemetry capability to Timesketch.""" +import atexit import json import logging import os @@ -31,13 +32,141 @@ from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.sdk.resources import Resource - from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, StatusCode from opentelemetry.sdk.trace.export import BatchSpanProcessor HAS_OTEL = True + + # --- Optional Instrumentors --- + try: + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + HAS_SQLALCHEMY_OTEL = True + except ImportError: + HAS_SQLALCHEMY_OTEL = False + + # --- Identity & Context --- + try: + from flask_login import current_user + + HAS_FLASK_LOGIN = True + except ImportError: + HAS_FLASK_LOGIN = False + + # --- Privacy & Security --- + + # Keywords that indicate an attribute name or value might be sensitive. + SENSITIVE_KEYWORDS = [ + "password", + "token", + "secret", + "key", + "session", + "cookie", + "auth", + "credential", + ] + + # Attributes that are explicitly exempt from PII redaction (e.g. analyst identity) + EXEMPT_PII_ATTRIBUTES = {"user.name", "user.id", "timesketch.user_id"} + + class SensitiveDataScrubber(SpanProcessor): + """SpanProcessor that redacts sensitive attributes from spans.""" + + def on_start(self, span, parent_context=None): + """No-op on span start.""" + + def on_end(self, span): + """Redact attributes when a span ends.""" + if not span.attributes: + return + + redacted_keys = [] + # Create a copy of keys to avoid modification during iteration + for key in list(span.attributes.keys()): + # Protect our own audit info + if key == "otel.redacted_keys": + continue + + value = span.attributes[key] + if not isinstance(value, str): + continue + + lower_key = key.lower() + is_credential_key = any( + keyword in lower_key for keyword in SENSITIVE_KEYWORDS + ) + + # 1. If it's a credential key, redact the whole thing + if is_credential_key: + redacted_keys.append(key) + # pylint: disable=protected-access + span._attributes[key] = "[REDACTED]" + continue + + # 2. Check for sensitive keywords in the value + lower_val = value.lower() + if any(keyword in lower_val for keyword in SENSITIVE_KEYWORDS): + redacted_keys.append(f"{key} (value)") + # pylint: disable=protected-access + span._attributes[key] = "[REDACTED]" + continue + + if redacted_keys: + # pylint: disable=protected-access + span._attributes["otel.redacted_keys"] = redacted_keys + + def flask_request_hook(span, _environ): + """Hook to add user context to Flask spans. + + Args: + span (opentelemetry.trace.Span): The span representing the request. + _environ (dict): The WSGI environment. + """ + if not HAS_FLASK_LOGIN: + return + + try: + # We check if we are in a request context and if current_user is valid + if current_user and hasattr(current_user, "is_authenticated"): + if current_user.is_authenticated: + span.set_attribute("user.id", current_user.id) + span.set_attribute("user.name", current_user.username) + span.set_attribute("timesketch.user_id", current_user.id) + except Exception: # pylint: disable=broad-except + # Best effort - if we are not in a context where current_user is + # available (e.g. some early middleware), we just skip. + pass + + class TraceLogFilter(logging.Filter): + """Logging filter that adds trace ID and span ID to log records. + + This filter acts as a bridge between OpenTelemetry and the standard + Python logging system. It extracts the current trace_id and span_id + from the OpenTelemetry context and injects them into the log record. + + This allows log formatters (e.g. in timesketch/app.py) to use + '%(trace_id)s' and '%(span_id)s' in their format strings without + raising a KeyError, even if no trace is currently active. + """ + + def filter(self, record): + if not HAS_OTEL: + return True + + span_context = trace.get_current_span().get_span_context() + if span_context.is_valid: + record.trace_id = trace.format_trace_id(span_context.trace_id) + record.span_id = trace.format_span_id(span_context.span_id) + else: + record.trace_id = "0" * 32 + record.span_id = "0" * 16 + return True + except ImportError: HAS_OTEL = False + from timesketch.version import get_version logger = logging.getLogger("timesketch.telemetry") @@ -67,6 +196,9 @@ def is_enabled() -> bool: return otel_mode.startswith("otlp-") +_TRACER_PROVIDER = None + + def setup_telemetry(service_name: str): """Configures the OpenTelemetry trace exporter. @@ -81,9 +213,15 @@ def setup_telemetry(service_name: str): Args: service_name (str): The name of the service to identify traces in the backend. """ + # pylint: disable=global-statement + global _TRACER_PROVIDER + if not is_enabled(): return + if _TRACER_PROVIDER: + return + resource = Resource( attributes={ "service.name": service_name, @@ -126,9 +264,14 @@ def setup_telemetry(service_name: str): return # --- Tracing Setup --- - trace_provider = TracerProvider(resource=resource) - trace_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) - trace.set_tracer_provider(trace_provider) + _TRACER_PROVIDER = TracerProvider(resource=resource) + # Add the scrubber first to ensure it processes spans before they are batched + _TRACER_PROVIDER.add_span_processor(SensitiveDataScrubber()) + _TRACER_PROVIDER.add_span_processor(BatchSpanProcessor(trace_exporter)) + trace.set_tracer_provider(_TRACER_PROVIDER) + + # Ensure traces are flushed on shutdown + atexit.register(_TRACER_PROVIDER.shutdown) def instrument_celery_app(celery_app, **kwargs): @@ -152,7 +295,69 @@ def instrument_flask_app(app, **kwargs): """ if not is_enabled(): return - FlaskInstrumentor().instrument_app(app, **kwargs) + + FlaskInstrumentor().instrument_app( + app, + request_hook=flask_request_hook, + **kwargs, + ) + + +def get_tracer(name: str): + """Returns a tracer instance. + + Args: + name (str): The name of the tracer. + + Returns: + opentelemetry.trace.Tracer: A tracer instance. + """ + return trace.get_tracer(name) + + +def get_status_code(name: str): + """Returns an OpenTelemetry status code. + + Args: + name (str): The name of the status code (e.g. 'OK', 'ERROR'). + + Returns: + opentelemetry.trace.StatusCode: The status code instance. + """ + if not HAS_OTEL: + return None + return getattr(StatusCode, name.upper(), StatusCode.UNSET) + + +def set_status_on_current_span(status_code: str, description: str = None): + """Sets the status on the currently active span. + + Args: + status_code (str): The status code ('OK' or 'ERROR'). + description (str): Optional description of the status. + """ + if not is_enabled(): + return + + otel_span = trace.get_current_span() + if otel_span != INVALID_SPAN: + code = get_status_code(status_code) + if code is not None: + otel_span.set_status(code, description) + + +def instrument_sqlalchemy(engine, **kwargs): + """Instruments a SQLAlchemy engine instance. + + This enables automatic capturing of spans for all database operations. + + Args: + engine (sqlalchemy.engine.Engine): The SQLAlchemy engine to instrument. + **kwargs: Additional arguments passed to SQLAlchemyInstrumentor().instrument(). + """ + if not is_enabled() or not HAS_SQLALCHEMY_OTEL: + return + SQLAlchemyInstrumentor().instrument(engine=engine, **kwargs) def add_event_to_current_span(event: str): diff --git a/timesketch/lib/telemetry_test.py b/timesketch/lib/telemetry_test.py new file mode 100644 index 0000000000..5f4e317ade --- /dev/null +++ b/timesketch/lib/telemetry_test.py @@ -0,0 +1,76 @@ +# Copyright 2026 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for OpenTelemetry capabilities.""" + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from timesketch.lib import telemetry +from timesketch.lib.testlib import BaseTest + + +class TestTelemetry(BaseTest): + """Tests for telemetry module.""" + + def test_sensitive_data_scrubber(self): + """Test that the SensitiveDataScrubber correctly redacts data.""" + if not telemetry.HAS_OTEL: + self.skipTest("OpenTelemetry not installed") + + # Setup an in-memory exporter and the scrubber + exporter = InMemorySpanExporter() + scrubber = telemetry.SensitiveDataScrubber() + + provider = TracerProvider() + # The scrubber must run BEFORE the exporter + provider.add_span_processor(scrubber) + provider.add_span_processor(SimpleSpanProcessor(exporter)) + + tracer = provider.get_tracer(__name__) + + # pylint: disable=not-context-manager + with tracer.start_as_current_span("test-redaction") as span: + # 1. Test Credential Key Redaction + span.set_attribute("password", "supersecret") + span.set_attribute("api_token", "12345-token") + + # 2. Test Keyword in Value Redaction + span.set_attribute("custom_field", "this is a secret value") + + # 3. Test Analyst Identity Exemption + span.set_attribute("user.name", "analyst@google.com") + span.set_attribute("user.id", 123) + + # Get the exported span + spans = exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + exported_span = spans[0] + attrs = exported_span.attributes + + # Assertions + self.assertEqual(attrs["password"], "[REDACTED]") + self.assertEqual(attrs["api_token"], "[REDACTED]") + self.assertEqual(attrs["custom_field"], "[REDACTED]") + + # Verify analyst identity is NOT redacted + self.assertEqual(attrs["user.name"], "analyst@google.com") + self.assertEqual(attrs["user.id"], 123) + + # Verify audit trail + redacted_keys = attrs["otel.redacted_keys"] + self.assertIn("password", redacted_keys) + self.assertIn("api_token", redacted_keys) + self.assertIn("custom_field (value)", redacted_keys) + self.assertNotIn("user.name", redacted_keys) diff --git a/timesketch/models/__init__.py b/timesketch/models/__init__.py index b9f7df0879..69757052ee 100644 --- a/timesketch/models/__init__.py +++ b/timesketch/models/__init__.py @@ -16,16 +16,19 @@ from flask import abort from flask_login import current_user from flask_sqlalchemy.query import Query -from sqlalchemy import create_engine -from sqlalchemy.orm import scoped_session, sessionmaker, as_declarative -from sqlalchemy.ext.declarative import declared_attr from sqlalchemy import Column +from sqlalchemy import create_engine from sqlalchemy import DateTime from sqlalchemy import func from sqlalchemy import Integer +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import as_declarative +from sqlalchemy.orm import scoped_session +from sqlalchemy.orm import sessionmaker -from timesketch.lib.definitions import HTTP_STATUS_CODE_NOT_FOUND +from timesketch.lib import telemetry from timesketch.lib.definitions import HTTP_STATUS_CODE_FORBIDDEN +from timesketch.lib.definitions import HTTP_STATUS_CODE_NOT_FOUND # The database session engine = None @@ -43,6 +46,7 @@ def configure_engine(url, engine_options): engine_options["pool_pre_ping"] = True global engine, session_maker, db_session engine = create_engine(url, future=True, **engine_options) + telemetry.instrument_sqlalchemy(engine) # Configure the session session_maker.configure( autocommit=False, autoflush=False, bind=engine, query_cls=Query diff --git a/timesketch/models/acl.py b/timesketch/models/acl.py index dc44cc1a0a..17219ce667 100644 --- a/timesketch/models/acl.py +++ b/timesketch/models/acl.py @@ -38,6 +38,7 @@ from timesketch.models import BaseModel from timesketch.models import db_session +from timesketch.lib import telemetry from timesketch.models.user import Group, User @@ -324,28 +325,36 @@ def has_permission(self, user, permission): """Check if the user has a specific permission. Args: - user: A user (Instance of timesketch.models.user.User) - permission: Permission as string (read, write or delete) + user (User): A user (Instance of timesketch.models.user.User) + permission (str): Permission as string (read, write or delete) Returns: An ACE (instance of timesketch.models.acl.AccessControlEntry) if the user has the permission or None if the user do not have the permission. """ - public_ace = self.is_public - if public_ace and permission == "read": - return public_ace - if isinstance(permission, bytes): - permission = codecs.decode(permission, "utf-8") - return self._get_ace(permission=permission, user=user) + tracer = telemetry.get_tracer(__name__) + with tracer.start_as_current_span("acl.has_permission") as span: + span.set_attribute("acl.permission", str(permission)) + if hasattr(user, "username"): + span.set_attribute("acl.user", user.username) + if hasattr(self, "id"): + span.set_attribute("acl.resource_id", self.id) + + public_ace = self.is_public + if public_ace and permission == "read": + return public_ace + if isinstance(permission, bytes): + permission = codecs.decode(permission, "utf-8") + return self._get_ace(permission=permission, user=user) def grant_permission(self, permission, user=None, group=None): """Grant permission to a user or group with the specific permission. Args: - permission: Permission as string (read, write or delete) - user: A user (Instance of timesketch.models.user.User) - group: A group (Instance of timesketch.models.user.Group) + permission (str): Permission as string (read, write or delete) + user (User): A user (Instance of timesketch.models.user.User) + group (Group): A group (Instance of timesketch.models.user.Group) """ # Grant permission to a group. if group and not self._get_ace(permission, group=group): @@ -364,9 +373,9 @@ def revoke_permission(self, permission, user=None, group=None): """Revoke permission for user/group on the object. Args: - permission: Permission as string (read, write or delete) - user: A user (Instance of timesketch.models.user.User) - group: A group (Instance of timesketch.models.user.Group) + permission (str): Permission as string (read, write or delete) + user (User): A user (Instance of timesketch.models.user.User) + group (Group): A group (Instance of timesketch.models.user.Group) """ # Revoke permission for a group. if group: