Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions datadog_lambda/durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ def extract_durable_function_tags(event):
execution_name, execution_id = parsed
# Use the number of operations to determine if it's the first invocation. This is
# what the durable execution SDK does to determine the replay status.
operations = event.get("InitialExecutionState", {}).get("Operations", [])
is_first_invocation = len(operations) == 1
operations = event.get("InitialExecutionState", {}).get("Operations")
operation_count = (
len(operations) if isinstance(operations, (list, dict)) else 0
)
is_first_invocation = operation_count == 1
return {
"aws_lambda.durable_function.execution_name": execution_name,
"aws_lambda.durable_function.execution_id": execution_id,
Expand Down
81 changes: 81 additions & 0 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,12 +919,93 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
)


def _durable_execution_start_ns(event):
operations = _durable_operations(event)
if not operations:
return None

first_operation = operations[0]
if not isinstance(first_operation, dict):
return None

start_timestamp = first_operation.get("StartTimestamp")
if isinstance(start_timestamp, str):
start_timestamp = start_timestamp.strip()

try:
start_ms = int(start_timestamp)
except (TypeError, ValueError):
try:
start_ms = int(float(start_timestamp))
except (TypeError, ValueError):
return None

return start_ms * 1000000


def create_inferred_span_from_durable_execution_event(
event, context, durable_function_tags
):
if not durable_function_tags:
return None
if durable_function_tags.get("aws_lambda.durable_function.first_invocation") != "true":
return None

inferred_span_start_ns = _durable_execution_start_ns(event)
if inferred_span_start_ns is None:
return None

service_name = os.environ.get(
"DD_DURABLE_EXECUTION_SERVICE", "aws.durable-execution"
)
execution_name = durable_function_tags.get(
"aws_lambda.durable_function.execution_name"
)
execution_id = durable_function_tags.get("aws_lambda.durable_function.execution_id")
durable_execution_arn = event.get("DurableExecutionArn")

tags = {
"operation_name": "aws.durable.execution_init",
"resource_names": execution_name,
"request_id": context.aws_request_id if context else None,
"service": service_name,
"service.name": service_name,
"span.type": "serverless",
"resource.name": execution_name,
"span.kind": "server",
"durable.execution_arn": durable_execution_arn,
"durable.execution_name": execution_name,
"durable.execution_id": execution_id,
}
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="async")

tracer.set_tags(_dd_origin)
span = tracer.trace(
"aws.durable.execution_init",
service=service_name,
resource=execution_name,
span_type="serverless",
)
if span:
span.set_tags(tags)
span.set_metric(InferredSpanInfo.METRIC, 1.0)
span.start_ns = inferred_span_start_ns
return span


def create_inferred_span(
event,
context,
event_source: _EventSource = None,
decode_authorizer_context: bool = True,
durable_function_tags=None,
):
if durable_function_tags:
logger.debug("Durable execution event detected. Inferring a span")
return create_inferred_span_from_durable_execution_event(
event, context, durable_function_tags
)

if event_source is None:
event_source = parse_event_source(event)
try:
Expand Down
6 changes: 5 additions & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ def _before(self, event, context):
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
if config.make_inferred_span:
self.inferred_span = create_inferred_span(
event, context, event_source, config.decode_authorizer_context
event,
context,
event_source,
config.decode_authorizer_context,
self.durable_function_tags,
)

if config.appsec_enabled:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ def test_sets_first_invocation_false_when_multiple_operations(self):
},
)

def test_sets_first_invocation_false_when_operations_is_a_map(self):
event = {
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
"CheckpointToken": "some-token",
"InitialExecutionState": {
"Operations": {
"0": {"Type": "EXECUTION"},
"1": {"Type": "STEP"},
}
},
}
result = extract_durable_function_tags(event)
self.assertEqual(
result,
{
"aws_lambda.durable_function.execution_name": "my-execution",
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"aws_lambda.durable_function.first_invocation": "false",
},
)

def test_returns_empty_dict_for_regular_lambda_event(self):
event = {
"body": '{"key": "value"}',
Expand Down
92 changes: 92 additions & 0 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,98 @@ def test_authorizer_span_no_negative_duration_when_clock_skew(mock_span_finish):
)


class TestDurableExecutionInferredSpan(unittest.TestCase):
def test_creates_execution_init_span_for_first_invocation(self):
event = {
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
"InitialExecutionState": {
"Operations": [{"StartTimestamp": "1778088546775"}]
},
}
durable_function_tags = {
"aws_lambda.durable_function.execution_name": "my-execution",
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"aws_lambda.durable_function.first_invocation": "true",
}
ctx = get_mock_context(aws_request_id="abc-123")

with patch.dict(
os.environ, {"DD_DURABLE_EXECUTION_SERVICE": "durable-svc"}, clear=False
):
span = create_inferred_span(
event, ctx, durable_function_tags=durable_function_tags
)

self.assertIsNotNone(span)
self.assertEqual(span.name, "aws.durable.execution_init")
self.assertEqual(span.service, "durable-svc")
self.assertEqual(span.resource, "my-execution")
self.assertEqual(span.start_ns, 1778088546775000000)
self.assertEqual(
span.get_tag("operation_name"), "aws.durable.execution_init"
)
self.assertEqual(
span.get_tag("durable.execution_arn"), event["DurableExecutionArn"]
)
self.assertEqual(span.get_tag("durable.execution_name"), "my-execution")
self.assertEqual(
span.get_tag("durable.execution_id"),
"550e8400-e29b-41d4-a716-446655440004",
)
span.finish()

def test_does_not_create_execution_init_span_for_replay_invocation(self):
event = {
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
"InitialExecutionState": {
"Operations": [{"StartTimestamp": "1778088546775"}]
},
}
durable_function_tags = {
"aws_lambda.durable_function.execution_name": "my-execution",
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"aws_lambda.durable_function.first_invocation": "false",
}
ctx = get_mock_context()

span = create_inferred_span(event, ctx, durable_function_tags=durable_function_tags)
self.assertIsNone(span)

def test_parents_lambda_span_to_execution_init_span(self):
event = {
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
"InitialExecutionState": {
"Operations": [{"StartTimestamp": "1778088546775"}]
},
}
durable_function_tags = {
"aws_lambda.durable_function.execution_name": "my-execution",
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"aws_lambda.durable_function.first_invocation": "true",
}
ctx = get_mock_context()

inferred_span = create_inferred_span(
event, ctx, durable_function_tags=durable_function_tags
)
lambda_span = create_function_execution_span(
context=ctx,
function_name="Function",
is_cold_start=False,
is_proactive_init=False,
trace_context_source={"source": ""},
merge_xray_traces=False,
trigger_tags={},
durable_function_tags=durable_function_tags,
parent_span=inferred_span,
span_pointers=None,
)

self.assertEqual(lambda_span.parent_id, inferred_span.span_id)
lambda_span.finish()
inferred_span.finish()


class TestInferredSpans(unittest.TestCase):
@patch("datadog_lambda.tracing.submit_errors_metric")
def test_mark_trace_as_error_for_5xx_responses_getting_400_response_code(
Expand Down
Loading