Procházet zdrojové kódy

Add exporter to the telemetry lib

This translates the spans into protos and does the actual publishing to
the Clearcut endpoint using cros build's _LOG_SOURCE so it will be
routed to their Sawmill. The service_name on the trace is what will be
used to store our traces in an F1 table accessible by us.

Bug: 326277821
Change-Id: Id1af3b7d5cffa6d9bfb86c34185e4247e85f9619
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/5888844
Reviewed-by: Terrence Reilly <treilly@google.com>
Commit-Queue: Struan Shrimpton <sshrimp@google.com>
Struan Shrimpton před 10 měsíci
rodič
revize
5bd66c3869

+ 13 - 0
infra_lib/telemetry/anonymization_unittest.py

@@ -7,9 +7,22 @@ import getpass
 import re
 import pytest
 
+from .proto import trace_span_pb2
 from . import anonymization
 
 
+def test_anonymizing_filter_to_redact_info_from_msg() -> None:
+    """Test AnonymizingFilter to apply the passed anonymizer to msg."""
+    msg = trace_span_pb2.TraceSpan()
+    msg.name = "log-user-user1234"
+
+    anonymizer = anonymization.Anonymizer([(re.escape("user1234"), "<user>")])
+    f = anonymization.AnonymizingFilter(anonymizer)
+
+    filtered_msg = f(msg)
+    assert filtered_msg.name == "log-user-<user>"
+
+
 def test_default_anonymizer_to_remove_username_from_path(monkeypatch) -> None:
     """Test that default Anonymizer redacts username."""
     monkeypatch.setattr(getpass, "getuser", lambda: "user")

+ 320 - 0
infra_lib/telemetry/clearcut_span_exporter.py

@@ -0,0 +1,320 @@
+# Copyright 2024 The Chromium Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Defines the telemetry exporter for exporting to ClearCut."""
+
+import datetime
+import logging
+import time
+import urllib.error
+import urllib.request
+
+from typing import Callable, Dict, Optional, Pattern, Sequence, Tuple
+from google.protobuf import (
+    json_format,
+    message as proto_msg,
+    struct_pb2,
+)
+from opentelemetry import trace as otel_trace_api
+from opentelemetry.sdk import (
+    trace as otel_trace_sdk,
+    resources as otel_resources,
+)
+from opentelemetry.sdk.trace import export as otel_export
+from opentelemetry.util import types as otel_types
+
+from . import anonymization
+from . import detector
+from .proto import trace_span_pb2
+from .proto import clientanalytics_pb2
+
+_DEFAULT_ENDPOINT = 'https://play.googleapis.com/log'
+_DEFAULT_TIMEOUT = 15
+_DEFAULT_FLUSH_TIMEOUT_MILLIS = 30000
+_DEAULT_MAX_WAIT_SECS = 60
+# Preallocated in Clearcut proto to cros Build.
+_LOG_SOURCE = 2044
+# Preallocated in Clearcut proto to Python clients.
+_CLIENT_TYPE = 33
+_DEFAULT_MAX_QUEUE_SIZE = 1000
+
+
+class ClearcutSpanExporter(otel_export.SpanExporter):
+    """Exports the spans to google http endpoint."""
+
+    def __init__(
+        self,
+        endpoint: str = _DEFAULT_ENDPOINT,
+        timeout: int = _DEFAULT_TIMEOUT,
+        max_wait_secs: int = _DEAULT_MAX_WAIT_SECS,
+        max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
+        prefilter: Optional[Callable[[trace_span_pb2.TraceSpan],
+                                     trace_span_pb2.TraceSpan]] = None,
+    ) -> None:
+        self._endpoint = endpoint
+        self._timeout = timeout
+        self._prefilter = prefilter or anonymization.AnonymizingFilter(
+            anonymization.Anonymizer())
+        self._log_source = _LOG_SOURCE
+        self._next_request_dt = datetime.datetime.now()
+        self._max_wait_secs = max_wait_secs
+        self._queue = []
+        self._max_queue_size = max_queue_size
+
+    def export(
+        self, spans: Sequence[otel_trace_sdk.ReadableSpan]
+    ) -> otel_export.SpanExportResult:
+        spans = (self._prefilter(self._translate_span(s)) for s in spans)
+        self._queue.extend(spans)
+
+        if len(self._queue) >= self._max_queue_size:
+            return (otel_export.SpanExportResult.SUCCESS
+                    if self._export_batch() else
+                    otel_export.SpanExportResult.FAILURE)
+
+        return otel_export.SpanExportResult.SUCCESS
+
+    def shutdown(self) -> None:
+        self.force_flush()
+
+    def force_flush(self,
+                    timeout_millis: int = _DEFAULT_FLUSH_TIMEOUT_MILLIS
+                    ) -> bool:
+        if self._queue:
+            return self._export_batch(timeout=timeout_millis / 1000)
+
+        return True
+
+    def _translate_context(
+            self, data: otel_trace_api.SpanContext
+    ) -> trace_span_pb2.TraceSpan.Context:
+        ctx = trace_span_pb2.TraceSpan.Context()
+        ctx.trace_id = f'0x{otel_trace_api.format_trace_id(data.trace_id)}'
+        ctx.span_id = f'0x{otel_trace_api.format_span_id(data.span_id)}'
+        ctx.trace_state = repr(data.trace_state)
+        return ctx
+
+    def _translate_attributes(self,
+                              data: otel_types.Attributes) -> struct_pb2.Struct:
+        patch = {}
+        for key, value in data.items():
+            if isinstance(value, tuple):
+                value = list(value)
+            patch[key] = value
+
+        struct = struct_pb2.Struct()
+        try:
+            struct.update(patch)
+        except Exception as exception:
+            logging.debug('Set attribute failed: %s', exception)
+        return struct
+
+    def _translate_span_attributes(
+            self, data: otel_trace_sdk.ReadableSpan) -> struct_pb2.Struct:
+        return self._translate_attributes(data.attributes)
+
+    def _translate_links(
+            self,
+            data: otel_trace_sdk.ReadableSpan) -> trace_span_pb2.TraceSpan.Link:
+        links = []
+
+        for link_data in data.links:
+            link = trace_span_pb2.TraceSpan.Link()
+            link.context.MergeFrom(self._translate_context(link_data.context))
+            link.attributes.MergeFrom(
+                self._translate_attributes(link_data.attributes))
+            links.append(link)
+
+        return links
+
+    def _translate_events(
+            self, data: otel_trace_sdk.ReadableSpan
+    ) -> trace_span_pb2.TraceSpan.Event:
+        events = []
+        for event_data in data.events:
+            event = trace_span_pb2.TraceSpan.Event()
+            event.event_time_millis = int(event_data.timestamp / 1e6)
+            event.name = event_data.name
+            event.attributes.MergeFrom(
+                self._translate_attributes(event_data.attributes))
+            events.append(event)
+        return events
+
+    def _translate_instrumentation_scope(
+        self, data: otel_trace_sdk.ReadableSpan
+    ) -> trace_span_pb2.TraceSpan.InstrumentationScope:
+        instrumentation_scope = data.instrumentation_scope
+        scope = trace_span_pb2.TraceSpan.InstrumentationScope()
+        scope.name = instrumentation_scope.name
+        scope.version = instrumentation_scope.version
+        return scope
+
+    def _translate_env(self, data: Dict[str, str]) -> Dict[str, str]:
+        environ = {}
+        for key, value in data.items():
+            if key.startswith('process.env.'):
+                key = key.split('process.env.')[1]
+                environ[key] = value
+        return environ
+
+    def _translate_resource(
+            self, data: otel_trace_sdk.ReadableSpan
+    ) -> trace_span_pb2.TraceSpan.Resource:
+        attrs = dict(data.resource.attributes)
+        resource = trace_span_pb2.TraceSpan.Resource()
+        resource.system.cpu = attrs.pop(detector.CPU_NAME, '')
+        resource.system.host_architecture = attrs.pop(detector.CPU_ARCHITECTURE,
+                                                      '')
+        resource.system.os_name = attrs.pop(detector.OS_NAME, '')
+        resource.system.os_version = attrs.pop(otel_resources.OS_DESCRIPTION,
+                                               '')
+        resource.system.os_type = attrs.pop(otel_resources.OS_TYPE, '')
+        resource.process.pid = str(attrs.pop(otel_resources.PROCESS_PID, ''))
+        resource.process.executable_name = attrs.pop(
+            otel_resources.PROCESS_EXECUTABLE_NAME, '')
+        resource.process.executable_path = attrs.pop(
+            otel_resources.PROCESS_EXECUTABLE_PATH, '')
+        resource.process.command = attrs.pop(otel_resources.PROCESS_COMMAND, '')
+        resource.process.command_args.extend(
+            attrs.pop(otel_resources.PROCESS_COMMAND_ARGS, []))
+        resource.process.owner_is_root = (attrs.pop(
+            otel_resources.PROCESS_OWNER, 9999) == 0)
+        resource.process.runtime_name = attrs.pop(
+            otel_resources.PROCESS_RUNTIME_NAME, '')
+        resource.process.runtime_version = attrs.pop(
+            otel_resources.PROCESS_RUNTIME_VERSION, '')
+        resource.process.runtime_description = attrs.pop(
+            otel_resources.PROCESS_RUNTIME_DESCRIPTION, '')
+        resource.process.api_version = str(
+            attrs.pop(detector.PROCESS_RUNTIME_API_VERSION, ''))
+        resource.process.env.update(self._translate_env(attrs))
+        resource.attributes.MergeFrom(self._translate_attributes(attrs))
+        return resource
+
+    def _translate_status(
+            self, data: otel_trace_sdk.ReadableSpan
+    ) -> trace_span_pb2.TraceSpan.Status:
+        status = trace_span_pb2.TraceSpan.Status()
+
+        if data.status.status_code == otel_trace_sdk.StatusCode.ERROR:
+            status.status_code = (
+                trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_ERROR)
+        else:
+            status.status_code = (
+                trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_OK)
+
+        if data.status.description:
+            status.message = data.status.description
+
+        return status
+
+    def _translate_sdk(
+        self, data: otel_trace_sdk.ReadableSpan
+    ) -> trace_span_pb2.TraceSpan.TelemetrySdk:
+        attrs = data.resource.attributes
+        sdk = trace_span_pb2.TraceSpan.TelemetrySdk()
+        sdk.name = attrs.get(otel_resources.TELEMETRY_SDK_NAME)
+        sdk.version = attrs.get(otel_resources.TELEMETRY_SDK_VERSION)
+        sdk.language = attrs.get(otel_resources.TELEMETRY_SDK_LANGUAGE)
+        return sdk
+
+    def _translate_kind(
+            self,
+            data: otel_trace_api.SpanKind) -> trace_span_pb2.TraceSpan.SpanKind:
+        if data == otel_trace_api.SpanKind.INTERNAL:
+            return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_INTERNAL
+        elif data == otel_trace_api.SpanKind.CLIENT:
+            return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_CLIENT
+        elif data == otel_trace_api.SpanKind.SERVER:
+            return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_SERVER
+        return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_UNSPECIFIED
+
+    def _translate_span(
+            self,
+            data: otel_trace_sdk.ReadableSpan) -> trace_span_pb2.TraceSpan:
+        span = trace_span_pb2.TraceSpan()
+        span.name = data.name
+        span.context.MergeFrom(self._translate_context(data.get_span_context()))
+
+        if isinstance(data.parent, otel_trace_api.Span):
+            ctx = data.parent.context
+            span.parent_span_id = (
+                f'0x{otel_trace_api.format_span_id(ctx.span_id)}')
+        elif isinstance(data.parent, otel_trace_api.SpanContext):
+            span.parent_span_id = (
+                f'0x{otel_trace_api.format_span_id(data.parent.span_id)}')
+
+        span.start_time_millis = int(data.start_time / 1e6)
+        span.end_time_millis = int(data.end_time / 1e6)
+        span.span_kind = self._translate_kind(data.kind)
+        span.instrumentation_scope.MergeFrom(
+            self._translate_instrumentation_scope(data))
+        span.events.extend(self._translate_events(data))
+        span.links.extend(self._translate_links(data))
+        span.attributes.MergeFrom(self._translate_span_attributes(data))
+        span.status.MergeFrom(self._translate_status(data))
+        span.resource.MergeFrom(self._translate_resource(data))
+        span.telemetry_sdk.MergeFrom(self._translate_sdk(data))
+
+        return span
+
+    def _export_batch(self, timeout: Optional[int] = None) -> bool:
+        """Export the spans to clearcut via http api."""
+
+        spans = self._queue[:self._max_queue_size]
+        self._queue = self._queue[self._max_queue_size:]
+
+        wait_delta = self._next_request_dt - datetime.datetime.now()
+        wait_time = wait_delta.total_seconds()
+
+        # Drop the packets if wait time is more than threshold.
+        if wait_time > self._max_wait_secs:
+            logging.warning(
+                'dropping %d spans for large wait: %d',
+                len(spans),
+                wait_time,
+            )
+            return True
+
+        if wait_time > 0:
+            time.sleep(wait_time)
+
+        logrequest = self._prepare_request_body(spans)
+
+        req = urllib.request.Request(
+            self._endpoint,
+            data=logrequest.SerializeToString(),
+            method='POST',
+        )
+        logresponse = clientanalytics_pb2.LogResponse()
+
+        try:
+            with urllib.request.urlopen(req, timeout=timeout
+                                        or self._timeout) as f:
+                logresponse.ParseFromString(f.read())
+        except urllib.error.URLError as url_exception:
+            logging.warning(url_exception)
+            return False
+        except proto_msg.DecodeError as decode_error:
+            logging.warning('could not decode data into proto: %s',
+                            decode_error)
+            return False
+
+        now = datetime.datetime.now()
+        delta = datetime.timedelta(
+            milliseconds=logresponse.next_request_wait_millis)
+        self._next_request_dt = now + delta
+        return True
+
+    def _prepare_request_body(self, spans) -> clientanalytics_pb2.LogRequest:
+        log_request = clientanalytics_pb2.LogRequest()
+        log_request.request_time_ms = int(time.time() * 1000)
+        log_request.client_info.client_type = _CLIENT_TYPE
+        log_request.log_source = self._log_source
+
+        for span in spans:
+            log_event = log_request.log_event.add()
+            log_event.event_time_ms = int(time.time() * 1000)
+            log_event.source_extension = span.SerializeToString()
+
+        return log_request

+ 250 - 0
infra_lib/telemetry/clearcut_span_exporter_unittest.py

@@ -0,0 +1,250 @@
+# Copyright 2024 The Chromium Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Unittests for SpanExporter classes."""
+
+import datetime
+import re
+import time
+import urllib.request
+
+from opentelemetry.sdk import trace
+from opentelemetry.sdk.trace import export
+
+from .proto import clientanalytics_pb2
+from .proto import trace_span_pb2
+from . import anonymization
+from . import clearcut_span_exporter
+
+
+class MockResponse:
+    """Mock requests.Response."""
+
+    def __init__(self, status, text) -> None:
+        self._status = status
+        self._text = text
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args) -> None:
+        pass
+
+    def read(self):
+        return self._text
+
+
+tracer = trace.TracerProvider().get_tracer(__name__)
+
+
+def test_otel_span_translation(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to translate otel spans to TraceSpan."""
+    requests = []
+
+    def mock_urlopen(request, timeout=0):
+        requests.append((request, timeout))
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 1
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("name")
+    span.end()
+
+    e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=1)
+
+    assert e.export([span]) == export.SpanExportResult.SUCCESS
+    req, _ = requests[0]
+    log_request = clientanalytics_pb2.LogRequest()
+    log_request.ParseFromString(req.data)
+
+    assert log_request.request_time_ms <= int(time.time() * 1000)
+    assert len(log_request.log_event) == 1
+
+    # The following constants are defined in clearcut_span_exporter
+    # as _CLIENT_TYPE and _LOG_SOURCE respectively.
+    assert log_request.client_info.client_type == 33
+    assert log_request.log_source == 2044
+
+    tspan = trace_span_pb2.TraceSpan()
+    tspan.ParseFromString(log_request.log_event[0].source_extension)
+
+    assert tspan.name == span.name
+    assert tspan.start_time_millis == int(span.start_time / 1e6)
+    assert tspan.end_time_millis == int(span.end_time / 1e6)
+
+
+def test_otel_span_translation_with_anonymization(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to anonymize spans to before export."""
+    requests = []
+
+    def mock_urlopen(request, timeout=0):
+        requests.append((request, timeout))
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 1
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("span-user4321")
+    span.set_attributes({"username": "user4321"})
+    span.add_event("event-for-user4321")
+    span.end()
+
+    anonymizer = anonymization.Anonymizer([(re.escape("user4321"), "<user>")])
+    f = anonymization.AnonymizingFilter(anonymizer)
+    e = clearcut_span_exporter.ClearcutSpanExporter(prefilter=f,
+                                                    max_queue_size=1)
+
+    assert e.export([span]) == export.SpanExportResult.SUCCESS
+    req, _ = requests[0]
+    log_request = clientanalytics_pb2.LogRequest()
+    log_request.ParseFromString(req.data)
+
+    tspan = trace_span_pb2.TraceSpan()
+    tspan.ParseFromString(log_request.log_event[0].source_extension)
+
+    assert tspan.name == "span-<user>"
+    assert tspan.events[0].name == "event-for-<user>"
+    assert tspan.attributes["username"] == "<user>"
+
+
+def test_export_to_http_api(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to export spans over http."""
+    requests = []
+
+    def mock_urlopen(request, timeout=0):
+        requests.append((request, timeout))
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 1
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("name")
+    span.end()
+    endpoint = "http://domain.com/path"
+
+    e = clearcut_span_exporter.ClearcutSpanExporter(endpoint=endpoint,
+                                                    timeout=7,
+                                                    max_queue_size=1)
+
+    assert e.export([span])
+    req, timeout = requests[0]
+    assert req.full_url == endpoint
+    assert timeout == 7
+
+
+def test_export_to_http_api_throttle(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to throttle based on prev response."""
+    mock_open_times = []
+
+    def mock_urlopen(request, timeout=0):
+        mock_open_times.append(datetime.datetime.now())
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 1000
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("name")
+    span.end()
+
+    e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=1)
+
+    assert e.export([span])
+    assert e.export([span])
+
+    # We've called export() on the same exporter instance twice, so we expect
+    # the following things to be true:
+    #   1. The request.urlopen() function has been called exactly twice, and
+    #   2. The calls to urlopen() are more than 1000 ms apart (due to the
+    #      value in the mock_urlopen response).
+    # The mock_open_times list is a proxy for observing this behavior directly.
+    assert len(mock_open_times) == 2
+    assert (mock_open_times[1] - mock_open_times[0]).total_seconds() > 1
+
+
+def test_export_to_drop_spans_if_wait_more_than_threshold(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to drop span if wait is more than threshold."""
+    mock_open_times = []
+
+    def mock_urlopen(request, timeout=0):
+        nonlocal mock_open_times
+        mock_open_times.append(datetime.datetime.now())
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 900000
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("name")
+    span.end()
+
+    e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=1)
+
+    assert e.export([span])
+    assert e.export([span])
+
+    # We've called export() on the same exporter instance twice, so we expect
+    # the following things to be true:
+    #   1. The request.urlopen() function has been called exactly once
+    assert len(mock_open_times) == 1
+
+
+def test_flush_to_clear_export_queue_to_http_api(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to export spans on flush."""
+    requests = []
+
+    def mock_urlopen(request, timeout=0):
+        requests.append((request, timeout))
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 1
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("name")
+    span.end()
+
+    e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=3)
+
+    assert e.export([span])
+    assert e.export([span])
+    assert len(requests) == 0
+
+    assert e.force_flush()
+    assert len(requests) == 1
+
+
+def test_shutdown_to_clear_export_queue_to_http_api(monkeypatch) -> None:
+    """Test ClearcutSpanExporter to export spans on shutdown."""
+    requests = []
+
+    def mock_urlopen(request, timeout=0):
+        requests.append((request, timeout))
+        resp = clientanalytics_pb2.LogResponse()
+        resp.next_request_wait_millis = 1
+        body = resp.SerializeToString()
+        return MockResponse(200, body)
+
+    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
+
+    span = tracer.start_span("name")
+    span.end()
+
+    e = clearcut_span_exporter.ClearcutSpanExporter(max_queue_size=3)
+
+    assert e.export([span])
+    assert e.export([span])
+    assert len(requests) == 0
+
+    e.shutdown()
+    assert len(requests) == 1