@@ -68,6 +68,94 @@ def list_subscriptions_in_project(project_id: str) -> None:
6868 # [END pubsub_list_subscriptions]
6969
7070
71+ def pubsub_subscribe_otel_tracing (
72+ subscription_project_id : str ,
73+ cloud_trace_project_id : str ,
74+ subscription_id : str ,
75+ timeout : Optional [float ] = None ,
76+ ) -> None :
77+ """
78+ Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled.
79+ Export the OpenTelemetry traces to Google Cloud Trace in project
80+ `trace_project_id`
81+ Args:
82+ subscription_project_id: project ID of the subscription.
83+ cloud_trace_project_id: project ID to export Cloud Trace to.
84+ subscription_id: subscription ID to subscribe from.
85+ timeout: time until which to subscribe to.
86+ Returns:
87+ None
88+ """
89+ # [START pubsub_subscribe_otel_tracing]
90+ from opentelemetry import trace
91+ from opentelemetry .sdk .trace import TracerProvider
92+ from opentelemetry .sdk .trace .export import (
93+ BatchSpanProcessor ,
94+ )
95+ from opentelemetry .exporter .cloud_trace import CloudTraceSpanExporter
96+ from opentelemetry .sdk .trace .sampling import TraceIdRatioBased , ParentBased
97+
98+ from google .cloud import pubsub_v1
99+ from google .cloud .pubsub_v1 import SubscriberClient
100+ from google .cloud .pubsub_v1 .types import SubscriberOptions
101+
102+ # TODO(developer)
103+ # subscription_project_id = "your-subscription-project-id"
104+ # subscription_id = "your-subscription-id"
105+ # cloud_trace_project_id = "your-cloud-trace-project-id"
106+ # timeout = 300.0
107+
108+ # In this sample, we use a Google Cloud Trace to export the OpenTelemetry
109+ # traces: https://cloud.google.com/trace/docs/setup/python-ot
110+ # Choose and configure the exporter for your set up accordingly.
111+
112+ sampler = ParentBased (root = TraceIdRatioBased (1 ))
113+ trace .set_tracer_provider (TracerProvider (sampler = sampler ))
114+
115+ # Export to Google Trace
116+ cloud_trace_exporter = CloudTraceSpanExporter (
117+ project_id = cloud_trace_project_id ,
118+ )
119+ trace .get_tracer_provider ().add_span_processor (
120+ BatchSpanProcessor (cloud_trace_exporter )
121+ )
122+ # Set the `enable_open_telemetry_tracing` option to True when creating
123+ # the subscriber client. This in itself is necessary and sufficient for
124+ # the library to export OpenTelemetry traces. However, where the traces
125+ # must be exported to needs to be configured based on your OpenTelemetry
126+ # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
127+ subscriber = SubscriberClient (
128+ subscriber_options = SubscriberOptions (enable_open_telemetry_tracing = True )
129+ )
130+
131+ # The `subscription_path` method creates a fully qualified identifier
132+ # in the form `projects/{project_id}/subscriptions/{subscription_id}`
133+ subscription_path = subscriber .subscription_path (
134+ subscription_project_id , subscription_id
135+ )
136+
137+ # Define callback to be called when a message is received.
138+ def callback (message : pubsub_v1 .subscriber .message .Message ) -> None :
139+ # Ack message after processing it.
140+ print (message .data )
141+ message .ack ()
142+
143+ # Wrap subscriber in a 'with' block to automatically call close() when done.
144+ with subscriber :
145+ try :
146+ # Optimistically subscribe to messages on the subscription.
147+ streaming_pull_future = subscriber .subscribe (
148+ subscription_path , callback = callback
149+ )
150+ streaming_pull_future .result (timeout = timeout )
151+ except TimeoutError :
152+ print ("Successfully subscribed until the timeout passed." )
153+ streaming_pull_future .cancel () # Trigger the shutdown.
154+ streaming_pull_future .result () # Block until the shutdown is complete.
155+
156+ # [END pubsub_subscribe_otel_tracing]
157+
158+
71159def create_subscription (project_id : str , topic_id : str , subscription_id : str ) -> None :
72160 """Create a new pull subscription on the given topic."""
73161 # [START pubsub_create_pull_subscription]
0 commit comments