anonymization.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. # Copyright 2024 The Chromium Authors
  2. # Use of this source code is governed by a BSD-style license that can be
  3. # found in the LICENSE file.
  4. """Util for anonymizing telemetry spans."""
  5. import getpass
  6. import re
  7. from typing import Optional, Pattern, Sequence, Tuple
  8. from google.protobuf import json_format
  9. from .proto import trace_span_pb2
  10. class Anonymizer:
  11. """Redact the personally identifiable information."""
  12. def __init__(
  13. self,
  14. replacements: Optional[Sequence[Tuple[Pattern[str],
  15. str]]] = None) -> None:
  16. self._replacements = list(replacements or [])
  17. if getpass.getuser() != "root":
  18. # Substituting the root user doesn't actually anonymize anything.
  19. self._replacements.append(
  20. (re.compile(re.escape(getpass.getuser())), "<user>"))
  21. def __call__(self, *args, **kwargs):
  22. return self.apply(*args, **kwargs)
  23. def apply(self, data: str) -> str:
  24. """Applies the replacement rules to data text."""
  25. if not data:
  26. return data
  27. for repl_from, repl_to in self._replacements:
  28. data = re.sub(repl_from, repl_to, data)
  29. return data
  30. class AnonymizingFilter:
  31. """Applies the anonymizer to TraceSpan messages."""
  32. def __init__(self, anonymizer: Anonymizer) -> None:
  33. self._anonymizer = anonymizer
  34. def __call__(self,
  35. msg: trace_span_pb2.TraceSpan) -> trace_span_pb2.TraceSpan:
  36. """Applies the anonymizer to TraceSpan message."""
  37. raw = json_format.MessageToJson(msg)
  38. json_msg = self._anonymizer.apply(raw)
  39. output = trace_span_pb2.TraceSpan()
  40. json_format.Parse(json_msg, output)
  41. return output