watchlists.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2011 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Watchlists
  6. Watchlists is a mechanism that allow a developer (a "watcher") to watch over
  7. portions of code that they are interested in. A "watcher" will be cc-ed to
  8. changes that modify that portion of code, thereby giving them an opportunity
  9. to make comments on chromium-review.googlesource.com even before the change is
  10. committed.
  11. Refer:
  12. https://chromium.googlesource.com/chromium/src/+/HEAD/docs/infra/watchlists.md
  13. When invoked directly from the base of a repository, this script lists out
  14. the watchers for files given on the command line. This is useful to verify
  15. changes to WATCHLISTS files.
  16. """
  17. import logging
  18. import os
  19. import re
  20. import sys
  21. class Watchlists(object):
  22. """Manage Watchlists.
  23. This class provides mechanism to load watchlists for a repo and identify
  24. watchers.
  25. Usage:
  26. wl = Watchlists("/path/to/repo/root")
  27. watchers = wl.GetWatchersForPaths(["/path/to/file1",
  28. "/path/to/file2",])
  29. """
  30. _RULES = "WATCHLISTS"
  31. _RULES_FILENAME = _RULES
  32. _repo_root = None
  33. _defns = {} # Definitions
  34. _path_regexps = {} # Name -> Regular expression mapping
  35. _watchlists = {} # name to email mapping
  36. def __init__(self, repo_root):
  37. self._repo_root = repo_root
  38. self._LoadWatchlistRules()
  39. def _GetRulesFilePath(self):
  40. """Returns path to WATCHLISTS file."""
  41. return os.path.join(self._repo_root, self._RULES_FILENAME)
  42. def _HasWatchlistsFile(self):
  43. """Determine if watchlists are available for this repo."""
  44. return os.path.exists(self._GetRulesFilePath())
  45. def _ContentsOfWatchlistsFile(self):
  46. """Read the WATCHLISTS file and return its contents."""
  47. try:
  48. watchlists_file = open(self._GetRulesFilePath())
  49. contents = watchlists_file.read()
  50. watchlists_file.close()
  51. return contents
  52. except IOError as e:
  53. logging.error("Cannot read %s: %s" % (self._GetRulesFilePath(), e))
  54. return ''
  55. def _LoadWatchlistRules(self):
  56. """Load watchlists from WATCHLISTS file. Does nothing if not present."""
  57. if not self._HasWatchlistsFile():
  58. return
  59. contents = self._ContentsOfWatchlistsFile()
  60. watchlists_data = None
  61. try:
  62. watchlists_data = eval(contents, {'__builtins__': None}, None)
  63. except SyntaxError as e:
  64. logging.error("Cannot parse %s. %s" % (self._GetRulesFilePath(), e))
  65. return
  66. defns = watchlists_data.get("WATCHLIST_DEFINITIONS")
  67. if not defns:
  68. logging.error("WATCHLIST_DEFINITIONS not defined in %s" %
  69. self._GetRulesFilePath())
  70. return
  71. watchlists = watchlists_data.get("WATCHLISTS")
  72. if not watchlists:
  73. logging.error("WATCHLISTS not defined in %s" %
  74. self._GetRulesFilePath())
  75. return
  76. self._defns = defns
  77. self._watchlists = watchlists
  78. # Compile the regular expressions ahead of time to avoid creating them
  79. # on-the-fly multiple times per file.
  80. self._path_regexps = {}
  81. for name, rule in defns.items():
  82. filepath = rule.get('filepath')
  83. if not filepath:
  84. continue
  85. self._path_regexps[name] = re.compile(filepath)
  86. # Verify that all watchlist names are defined
  87. for name in watchlists:
  88. if name not in defns:
  89. logging.error("%s not defined in %s" %
  90. (name, self._GetRulesFilePath()))
  91. def GetWatchersForPaths(self, paths):
  92. """Fetch the list of watchers for |paths|
  93. Args:
  94. paths: [path1, path2, ...]
  95. Returns:
  96. [u1@chromium.org, u2@gmail.com, ...]
  97. """
  98. watchers = set() # A set, to avoid duplicates
  99. for path in paths:
  100. path = path.replace(os.sep, '/')
  101. for name, rule in self._path_regexps.items():
  102. if name not in self._watchlists:
  103. continue
  104. if rule.search(path):
  105. for watchlist in self._watchlists[name]:
  106. watchers.add(watchlist)
  107. return sorted(watchers)
  108. def main(argv):
  109. # Confirm that watchlists can be parsed and spew out the watchers
  110. if len(argv) < 2:
  111. print("Usage (from the base of repo):")
  112. print(" %s [file-1] [file-2] ...." % argv[0])
  113. return 1
  114. wl = Watchlists(os.getcwd())
  115. watchers = wl.GetWatchersForPaths(argv[1:])
  116. print(watchers)
  117. if __name__ == '__main__':
  118. main(sys.argv)