clearcut_span_exporter.py 12 KB


  1. # Copyright 2024 The Chromium Authors
  2. # Use of this source code is governed by a BSD-style license that can be
  3. # found in the LICENSE file.
  4. """Defines the telemetry exporter for exporting to ClearCut."""
  5. import datetime
  6. import logging
  7. import time
  8. import urllib.error
  9. import urllib.request
  10. from typing import Callable, Dict, Optional, Pattern, Sequence, Tuple
  11. from google.protobuf import (
  12. json_format,
  13. message as proto_msg,
  14. struct_pb2,
  15. )
  16. from opentelemetry import trace as otel_trace_api
  17. from opentelemetry.sdk import (
  18. trace as otel_trace_sdk,
  19. resources as otel_resources,
  20. )
  21. from opentelemetry.sdk.trace import export as otel_export
  22. from opentelemetry.util import types as otel_types
  23. from . import anonymization
  24. from . import detector
  25. from .proto import trace_span_pb2
  26. from .proto import clientanalytics_pb2
  27. _DEFAULT_ENDPOINT = 'https://play.googleapis.com/log'
  28. _DEFAULT_TIMEOUT = 15
  29. _DEFAULT_FLUSH_TIMEOUT_MILLIS = 30000
  30. _DEAULT_MAX_WAIT_SECS = 60
  31. # Preallocated in Clearcut proto to cros Build.
  32. _LOG_SOURCE = 2044
  33. # Preallocated in Clearcut proto to Python clients.
  34. _CLIENT_TYPE = 33
  35. _DEFAULT_MAX_QUEUE_SIZE = 1000
  36. class ClearcutSpanExporter(otel_export.SpanExporter):
  37. """Exports the spans to google http endpoint."""
  38. def __init__(
  39. self,
  40. endpoint: str = _DEFAULT_ENDPOINT,
  41. timeout: int = _DEFAULT_TIMEOUT,
  42. max_wait_secs: int = _DEAULT_MAX_WAIT_SECS,
  43. max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
  44. prefilter: Optional[Callable[[trace_span_pb2.TraceSpan],
  45. trace_span_pb2.TraceSpan]] = None,
  46. ) -> None:
  47. self._endpoint = endpoint
  48. self._timeout = timeout
  49. self._prefilter = prefilter or anonymization.AnonymizingFilter(
  50. anonymization.Anonymizer())
  51. self._log_source = _LOG_SOURCE
  52. self._next_request_dt = datetime.datetime.now()
  53. self._max_wait_secs = max_wait_secs
  54. self._queue = []
  55. self._max_queue_size = max_queue_size
  56. def export(
  57. self, spans: Sequence[otel_trace_sdk.ReadableSpan]
  58. ) -> otel_export.SpanExportResult:
  59. spans = (self._prefilter(self._translate_span(s)) for s in spans)
  60. self._queue.extend(spans)
  61. if len(self._queue) >= self._max_queue_size:
  62. return (otel_export.SpanExportResult.SUCCESS
  63. if self._export_batch() else
  64. otel_export.SpanExportResult.FAILURE)
  65. return otel_export.SpanExportResult.SUCCESS
  66. def shutdown(self) -> None:
  67. self.force_flush()
  68. def force_flush(self,
  69. timeout_millis: int = _DEFAULT_FLUSH_TIMEOUT_MILLIS
  70. ) -> bool:
  71. if self._queue:
  72. return self._export_batch(timeout=timeout_millis / 1000)
  73. return True
  74. def _translate_context(
  75. self, data: otel_trace_api.SpanContext
  76. ) -> trace_span_pb2.TraceSpan.Context:
  77. ctx = trace_span_pb2.TraceSpan.Context()
  78. ctx.trace_id = f'0x{otel_trace_api.format_trace_id(data.trace_id)}'
  79. ctx.span_id = f'0x{otel_trace_api.format_span_id(data.span_id)}'
  80. ctx.trace_state = repr(data.trace_state)
  81. return ctx
  82. def _translate_attributes(self,
  83. data: otel_types.Attributes) -> struct_pb2.Struct:
  84. patch = {}
  85. for key, value in data.items():
  86. if isinstance(value, tuple):
  87. value = list(value)
  88. patch[key] = value
  89. struct = struct_pb2.Struct()
  90. try:
  91. struct.update(patch)
  92. except Exception as exception:
  93. logging.debug('Set attribute failed: %s', exception)
  94. return struct
  95. def _translate_span_attributes(
  96. self, data: otel_trace_sdk.ReadableSpan) -> struct_pb2.Struct:
  97. return self._translate_attributes(data.attributes)
  98. def _translate_links(
  99. self,
  100. data: otel_trace_sdk.ReadableSpan) -> trace_span_pb2.TraceSpan.Link:
  101. links = []
  102. for link_data in data.links:
  103. link = trace_span_pb2.TraceSpan.Link()
  104. link.context.MergeFrom(self._translate_context(link_data.context))
  105. link.attributes.MergeFrom(
  106. self._translate_attributes(link_data.attributes))
  107. links.append(link)
  108. return links
  109. def _translate_events(
  110. self, data: otel_trace_sdk.ReadableSpan
  111. ) -> trace_span_pb2.TraceSpan.Event:
  112. events = []
  113. for event_data in data.events:
  114. event = trace_span_pb2.TraceSpan.Event()
  115. event.event_time_millis = int(event_data.timestamp / 1e6)
  116. event.name = event_data.name
  117. event.attributes.MergeFrom(
  118. self._translate_attributes(event_data.attributes))
  119. events.append(event)
  120. return events
  121. def _translate_instrumentation_scope(
  122. self, data: otel_trace_sdk.ReadableSpan
  123. ) -> trace_span_pb2.TraceSpan.InstrumentationScope:
  124. instrumentation_scope = data.instrumentation_scope
  125. scope = trace_span_pb2.TraceSpan.InstrumentationScope()
  126. scope.name = instrumentation_scope.name
  127. scope.version = instrumentation_scope.version
  128. return scope
  129. def _translate_env(self, data: Dict[str, str]) -> Dict[str, str]:
  130. environ = {}
  131. for key, value in data.items():
  132. if key.startswith('process.env.'):
  133. key = key.split('process.env.')[1]
  134. environ[key] = value
  135. return environ
  136. def _translate_resource(
  137. self, data: otel_trace_sdk.ReadableSpan
  138. ) -> trace_span_pb2.TraceSpan.Resource:
  139. attrs = dict(data.resource.attributes)
  140. resource = trace_span_pb2.TraceSpan.Resource()
  141. resource.system.cpu = attrs.pop(detector.CPU_NAME, '')
  142. resource.system.host_architecture = attrs.pop(detector.CPU_ARCHITECTURE,
  143. '')
  144. resource.system.os_name = attrs.pop(detector.OS_NAME, '')
  145. resource.system.os_version = attrs.pop(otel_resources.OS_DESCRIPTION,
  146. '')
  147. resource.system.os_type = attrs.pop(otel_resources.OS_TYPE, '')
  148. resource.process.pid = str(attrs.pop(otel_resources.PROCESS_PID, ''))
  149. resource.process.executable_name = attrs.pop(
  150. otel_resources.PROCESS_EXECUTABLE_NAME, '')
  151. resource.process.executable_path = attrs.pop(
  152. otel_resources.PROCESS_EXECUTABLE_PATH, '')
  153. resource.process.command = attrs.pop(otel_resources.PROCESS_COMMAND, '')
  154. resource.process.command_args.extend(
  155. attrs.pop(otel_resources.PROCESS_COMMAND_ARGS, []))
  156. resource.process.owner_is_root = (attrs.pop(
  157. otel_resources.PROCESS_OWNER, 9999) == 0)
  158. resource.process.runtime_name = attrs.pop(
  159. otel_resources.PROCESS_RUNTIME_NAME, '')
  160. resource.process.runtime_version = attrs.pop(
  161. otel_resources.PROCESS_RUNTIME_VERSION, '')
  162. resource.process.runtime_description = attrs.pop(
  163. otel_resources.PROCESS_RUNTIME_DESCRIPTION, '')
  164. resource.process.api_version = str(
  165. attrs.pop(detector.PROCESS_RUNTIME_API_VERSION, ''))
  166. resource.process.env.update(self._translate_env(attrs))
  167. resource.attributes.MergeFrom(self._translate_attributes(attrs))
  168. return resource
  169. def _translate_status(
  170. self, data: otel_trace_sdk.ReadableSpan
  171. ) -> trace_span_pb2.TraceSpan.Status:
  172. status = trace_span_pb2.TraceSpan.Status()
  173. if data.status.status_code == otel_trace_sdk.StatusCode.ERROR:
  174. status.status_code = (
  175. trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_ERROR)
  176. else:
  177. status.status_code = (
  178. trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_OK)
  179. if data.status.description:
  180. status.message = data.status.description
  181. return status
  182. def _translate_sdk(
  183. self, data: otel_trace_sdk.ReadableSpan
  184. ) -> trace_span_pb2.TraceSpan.TelemetrySdk:
  185. attrs = data.resource.attributes
  186. sdk = trace_span_pb2.TraceSpan.TelemetrySdk()
  187. sdk.name = attrs.get(otel_resources.TELEMETRY_SDK_NAME)
  188. sdk.version = attrs.get(otel_resources.TELEMETRY_SDK_VERSION)
  189. sdk.language = attrs.get(otel_resources.TELEMETRY_SDK_LANGUAGE)
  190. return sdk
  191. def _translate_kind(
  192. self,
  193. data: otel_trace_api.SpanKind) -> trace_span_pb2.TraceSpan.SpanKind:
  194. if data == otel_trace_api.SpanKind.INTERNAL:
  195. return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_INTERNAL
  196. elif data == otel_trace_api.SpanKind.CLIENT:
  197. return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_CLIENT
  198. elif data == otel_trace_api.SpanKind.SERVER:
  199. return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_SERVER
  200. return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_UNSPECIFIED
  201. def _translate_span(
  202. self,
  203. data: otel_trace_sdk.ReadableSpan) -> trace_span_pb2.TraceSpan:
  204. span = trace_span_pb2.TraceSpan()
  205. span.name = data.name
  206. span.context.MergeFrom(self._translate_context(data.get_span_context()))
  207. if isinstance(data.parent, otel_trace_api.Span):
  208. ctx = data.parent.context
  209. span.parent_span_id = (
  210. f'0x{otel_trace_api.format_span_id(ctx.span_id)}')
  211. elif isinstance(data.parent, otel_trace_api.SpanContext):
  212. span.parent_span_id = (
  213. f'0x{otel_trace_api.format_span_id(data.parent.span_id)}')
  214. span.start_time_millis = int(data.start_time / 1e6)
  215. span.end_time_millis = int(data.end_time / 1e6)
  216. span.span_kind = self._translate_kind(data.kind)
  217. span.instrumentation_scope.MergeFrom(
  218. self._translate_instrumentation_scope(data))
  219. span.events.extend(self._translate_events(data))
  220. span.links.extend(self._translate_links(data))
  221. span.attributes.MergeFrom(self._translate_span_attributes(data))
  222. span.status.MergeFrom(self._translate_status(data))
  223. span.resource.MergeFrom(self._translate_resource(data))
  224. span.telemetry_sdk.MergeFrom(self._translate_sdk(data))
  225. return span
  226. def _export_batch(self, timeout: Optional[int] = None) -> bool:
  227. """Export the spans to clearcut via http api."""
  228. spans = self._queue[:self._max_queue_size]
  229. self._queue = self._queue[self._max_queue_size:]
  230. wait_delta = self._next_request_dt - datetime.datetime.now()
  231. wait_time = wait_delta.total_seconds()
  232. # Drop the packets if wait time is more than threshold.
  233. if wait_time > self._max_wait_secs:
  234. logging.warning(
  235. 'dropping %d spans for large wait: %d',
  236. len(spans),
  237. wait_time,
  238. )
  239. return True
  240. if wait_time > 0:
  241. time.sleep(wait_time)
  242. logrequest = self._prepare_request_body(spans)
  243. req = urllib.request.Request(
  244. self._endpoint,
  245. data=logrequest.SerializeToString(),
  246. method='POST',
  247. )
  248. logresponse = clientanalytics_pb2.LogResponse()
  249. try:
  250. with urllib.request.urlopen(req, timeout=timeout
  251. or self._timeout) as f:
  252. logresponse.ParseFromString(f.read())
  253. except urllib.error.URLError as url_exception:
  254. logging.warning(url_exception)
  255. return False
  256. except proto_msg.DecodeError as decode_error:
  257. logging.warning('could not decode data into proto: %s',
  258. decode_error)
  259. return False
  260. now = datetime.datetime.now()
  261. delta = datetime.timedelta(
  262. milliseconds=logresponse.next_request_wait_millis)
  263. self._next_request_dt = now + delta
  264. return True
  265. def _prepare_request_body(self, spans) -> clientanalytics_pb2.LogRequest:
  266. log_request = clientanalytics_pb2.LogRequest()
  267. log_request.request_time_ms = int(time.time() * 1000)
  268. log_request.client_info.client_type = _CLIENT_TYPE
  269. log_request.log_source = self._log_source
  270. for span in spans:
  271. log_event = log_request.log_event.add()
  272. log_event.event_time_ms = int(time.time() * 1000)
  273. log_event.source_extension = span.SerializeToString()
  274. return log_request