Skip to content

Commit 885ec87

Browse files
committed
feat: end to end tracing
1 parent c9d5307 commit 885ec87

File tree

11 files changed

+135
-25
lines changed

11 files changed

+135
-25
lines changed

google/cloud/spanner_v1/_helpers.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
from google.cloud.spanner_v1 import JsonObject
3535
from google.cloud.spanner_v1.request_id_header import with_request_id
3636
from google.rpc.error_details_pb2 import RetryInfo
37-
37+
from opentelemetry.propagate import inject
38+
from opentelemetry.propagators.textmap import Setter, Getter
39+
from typing import List, Tuple
3840
import random
3941

4042
# Validation error messages
@@ -47,6 +49,27 @@
4749
)
4850

4951

52+
class OpenTelemetryContextSetter(Setter):
53+
"""
54+
Used by Open Telemetry for context propagation.
55+
"""
56+
57+
def set(self, carrier: List[Tuple[str, str]], key: str, value: str) -> None:
58+
"""
59+
Injects trace context into Spanner metadata
60+
61+
Args:
62+
carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
63+
data.
64+
key(str): The key for which the Open Telemetry context data needs to be set.
65+
value(str): The Open Telemetry context value to be set.
66+
67+
Returns:
68+
None
69+
"""
70+
carrier.append((key, value))
71+
72+
5073
def _try_to_coerce_bytes(bytestring):
5174
"""Try to coerce a byte string into the right thing based on Python
5275
version and whether or not it is base64 encoded.
@@ -550,6 +573,20 @@ def _metadata_with_leader_aware_routing(value, **kw):
550573
return ("x-goog-spanner-route-to-leader", str(value).lower())
551574

552575

576+
def _metadata_with_span_context(metadata: List[Tuple[str, str]], **kw) -> None:
577+
"""
578+
Appends metadata with end to end tracing header and OpenTelemetry span context .
579+
580+
Args:
581+
metadata (list[tuple[str, str]]): The metadata carrier where the OpenTelemetry context
582+
should be injected.
583+
Returns:
584+
None
585+
"""
586+
metadata.append(("x-goog-spanner-end-to-end-tracing", "true"))
587+
inject(setter=OpenTelemetryContextSetter(), carrier=metadata)
588+
589+
553590
def _delay_until_retry(exc, deadline, attempts):
554591
"""Helper for :meth:`Session.run_in_transaction`.
555592

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
from google.cloud.spanner_v1 import SpannerClient
2222
from google.cloud.spanner_v1 import gapic_version
23+
from google.cloud.spanner_v1._helpers import (
24+
_metadata_with_span_context,
25+
)
2326

2427
try:
2528
from opentelemetry import trace
@@ -38,6 +41,9 @@
3841
extended_tracing_globally_disabled = (
3942
os.getenv("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false"
4043
)
44+
end_to_end_tracing_globally_enabled = (
45+
os.getenv("SPANNER_ENABLE_END_TO_END_TRACING", "").lower() == "true"
46+
)
4147

4248

4349
def get_tracer(tracer_provider=None):
@@ -56,7 +62,9 @@ def get_tracer(tracer_provider=None):
5662

5763

5864
@contextmanager
59-
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
65+
def trace_call(
66+
name, session=None, extra_attributes=None, observability_options=None, metadata=None
67+
):
6068
if session:
6169
session._last_use_time = datetime.now()
6270

@@ -72,6 +80,8 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
7280
# on by default.
7381
enable_extended_tracing = True
7482

83+
enable_end_to_end_tracing = False
84+
7585
db_name = ""
7686
if session and getattr(session, "_database", None):
7787
db_name = session._database.name
@@ -81,6 +91,9 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
8191
enable_extended_tracing = observability_options.get(
8292
"enable_extended_tracing", enable_extended_tracing
8393
)
94+
enable_end_to_end_tracing = observability_options.get(
95+
"enable_end_to_end_tracing", enable_end_to_end_tracing
96+
)
8497
db_name = observability_options.get("db_name", db_name)
8598

8699
tracer = get_tracer(tracer_provider)
@@ -104,10 +117,15 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
104117
if not enable_extended_tracing:
105118
attributes.pop("db.statement", False)
106119

120+
if end_to_end_tracing_globally_enabled:
121+
enable_end_to_end_tracing = True
122+
107123
with tracer.start_as_current_span(
108124
name, kind=trace.SpanKind.CLIENT, attributes=attributes
109125
) as span:
110126
try:
127+
if enable_end_to_end_tracing:
128+
_metadata_with_span_context(metadata)
111129
yield span
112130
except Exception as error:
113131
span.set_status(Status(StatusCode.ERROR, str(error)))

google/cloud/spanner_v1/batch.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ def commit(
226226
self._session,
227227
trace_attributes,
228228
observability_options=observability_options,
229+
metadata=metadata,
229230
):
230231
method = functools.partial(
231232
api.commit,
@@ -348,6 +349,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
348349
self._session,
349350
trace_attributes,
350351
observability_options=observability_options,
352+
metadata=metadata,
351353
):
352354
method = functools.partial(
353355
api.batch_write,

google/cloud/spanner_v1/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ class Client(ClientWithProject):
135135
Default `True`, please set it to `False` to turn it off
136136
or you can use the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=<boolean>`
137137
to control it.
138+
enable_end_to_end_tracing: :type:boolean when set to true will allow for spans from Spanner server side.
139+
Default `False`, please set it to `True` to turn it on
140+
or you can use the environment variable `SPANNER_ENABLE_END_TO_END_TRACING=<boolean>`
141+
to control it.
138142
139143
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
140144
and ``admin`` are :data:`True`

google/cloud/spanner_v1/database.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,7 @@ def execute_pdml():
728728
method=method,
729729
trace_name="CloudSpanner.ExecuteStreamingSql",
730730
request=request,
731+
metadata=metadata,
731732
transaction_selector=txn_selector,
732733
observability_options=self.observability_options,
733734
)

google/cloud/spanner_v1/pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ def bind(self, database):
242242
with trace_call(
243243
"CloudSpanner.FixedPool.BatchCreateSessions",
244244
observability_options=observability_options,
245+
metadata=metadata,
245246
) as span:
246247
returned_session_count = 0
247248
while not self._sessions.full():
@@ -552,6 +553,7 @@ def bind(self, database):
552553
with trace_call(
553554
"CloudSpanner.PingingPool.BatchCreateSessions",
554555
observability_options=observability_options,
556+
metadata=metadata,
555557
) as span:
556558
returned_session_count = 0
557559
while returned_session_count < self.size:

google/cloud/spanner_v1/session.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def create(self):
165165
self,
166166
self._labels,
167167
observability_options=observability_options,
168+
metadata=metadata,
168169
):
169170
session_pb = api.create_session(
170171
request=request,
@@ -204,7 +205,10 @@ def exists(self):
204205

205206
observability_options = getattr(self._database, "observability_options", None)
206207
with trace_call(
207-
"CloudSpanner.GetSession", self, observability_options=observability_options
208+
"CloudSpanner.GetSession",
209+
self,
210+
observability_options=observability_options,
211+
metadata=metadata,
208212
) as span:
209213
try:
210214
api.get_session(name=self.name, metadata=metadata)
@@ -248,6 +252,7 @@ def delete(self):
248252
"session.name": self.name,
249253
},
250254
observability_options=observability_options,
255+
metadata=metadata,
251256
):
252257
api.delete_session(name=self.name, metadata=metadata)
253258

google/cloud/spanner_v1/snapshot.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
def _restart_on_unavailable(
5353
method,
5454
request,
55+
metadata=None,
5556
trace_name=None,
5657
session=None,
5758
attributes=None,
@@ -96,8 +97,9 @@ def _restart_on_unavailable(
9697
session,
9798
attributes,
9899
observability_options=observability_options,
100+
metadata=metadata,
99101
):
100-
iterator = method(request=request)
102+
iterator = method(request=request, metadata=metadata)
101103
for item in iterator:
102104
item_buffer.append(item)
103105
# Setting the transaction id because the transaction begin was inlined for first rpc.
@@ -119,6 +121,7 @@ def _restart_on_unavailable(
119121
session,
120122
attributes,
121123
observability_options=observability_options,
124+
metadata=metadata,
122125
):
123126
request.resume_token = resume_token
124127
if transaction is not None:
@@ -139,6 +142,7 @@ def _restart_on_unavailable(
139142
session,
140143
attributes,
141144
observability_options=observability_options,
145+
metadata=metadata,
142146
):
143147
request.resume_token = resume_token
144148
if transaction is not None:
@@ -340,6 +344,7 @@ def read(
340344
iterator = _restart_on_unavailable(
341345
restart,
342346
request,
347+
metadata,
343348
f"CloudSpanner.{type(self).__name__}.read",
344349
self._session,
345350
trace_attributes,
@@ -362,6 +367,7 @@ def read(
362367
iterator = _restart_on_unavailable(
363368
restart,
364369
request,
370+
metadata,
365371
f"CloudSpanner.{type(self).__name__}.read",
366372
self._session,
367373
trace_attributes,
@@ -556,6 +562,7 @@ def execute_sql(
556562
return self._get_streamed_result_set(
557563
restart,
558564
request,
565+
metadata,
559566
trace_attributes,
560567
column_info,
561568
observability_options,
@@ -565,6 +572,7 @@ def execute_sql(
565572
return self._get_streamed_result_set(
566573
restart,
567574
request,
575+
metadata,
568576
trace_attributes,
569577
column_info,
570578
observability_options,
@@ -575,6 +583,7 @@ def _get_streamed_result_set(
575583
self,
576584
restart,
577585
request,
586+
metadata,
578587
trace_attributes,
579588
column_info,
580589
observability_options=None,
@@ -583,6 +592,7 @@ def _get_streamed_result_set(
583592
iterator = _restart_on_unavailable(
584593
restart,
585594
request,
595+
metadata,
586596
f"CloudSpanner.{type(self).__name__}.execute_sql",
587597
self._session,
588598
trace_attributes,
@@ -689,6 +699,7 @@ def partition_read(
689699
self._session,
690700
extra_attributes=trace_attributes,
691701
observability_options=getattr(database, "observability_options", None),
702+
metadata=metadata,
692703
):
693704
method = functools.partial(
694705
api.partition_read,
@@ -792,6 +803,7 @@ def partition_query(
792803
self._session,
793804
trace_attributes,
794805
observability_options=getattr(database, "observability_options", None),
806+
metadata=metadata,
795807
):
796808
method = functools.partial(
797809
api.partition_query,
@@ -938,6 +950,7 @@ def begin(self):
938950
f"CloudSpanner.{type(self).__name__}.begin",
939951
self._session,
940952
observability_options=getattr(database, "observability_options", None),
953+
metadata=metadata,
941954
):
942955
method = functools.partial(
943956
api.begin_transaction,

0 commit comments

Comments
 (0)