watchlists.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2011 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Watchlists
  6. Watchlists is a mechanism that allow a developer (a "watcher") to watch over
  7. portions of code that they are interested in. A "watcher" will be cc-ed to
  8. changes that modify that portion of code, thereby giving them an opportunity
  9. to make comments on chromium-review.googlesource.com even before the change is
  10. committed.
  11. Refer:
  12. https://chromium.googlesource.com/chromium/src/+/HEAD/docs/infra/watchlists.md
  13. When invoked directly from the base of a repository, this script lists out
  14. the watchers for files given on the command line. This is useful to verify
  15. changes to WATCHLISTS files.
  16. """
  17. from __future__ import print_function
  18. import logging
  19. import os
  20. import re
  21. import sys
  22. class Watchlists(object):
  23. """Manage Watchlists.
  24. This class provides mechanism to load watchlists for a repo and identify
  25. watchers.
  26. Usage:
  27. wl = Watchlists("/path/to/repo/root")
  28. watchers = wl.GetWatchersForPaths(["/path/to/file1",
  29. "/path/to/file2",])
  30. """
  31. _RULES = "WATCHLISTS"
  32. _RULES_FILENAME = _RULES
  33. _repo_root = None
  34. _defns = {} # Definitions
  35. _path_regexps = {} # Name -> Regular expression mapping
  36. _watchlists = {} # name to email mapping
  37. def __init__(self, repo_root):
  38. self._repo_root = repo_root
  39. self._LoadWatchlistRules()
  40. def _GetRulesFilePath(self):
  41. """Returns path to WATCHLISTS file."""
  42. return os.path.join(self._repo_root, self._RULES_FILENAME)
  43. def _HasWatchlistsFile(self):
  44. """Determine if watchlists are available for this repo."""
  45. return os.path.exists(self._GetRulesFilePath())
  46. def _ContentsOfWatchlistsFile(self):
  47. """Read the WATCHLISTS file and return its contents."""
  48. try:
  49. watchlists_file = open(self._GetRulesFilePath())
  50. contents = watchlists_file.read()
  51. watchlists_file.close()
  52. return contents
  53. except IOError as e:
  54. logging.error("Cannot read %s: %s" % (self._GetRulesFilePath(), e))
  55. return ''
  56. def _LoadWatchlistRules(self):
  57. """Load watchlists from WATCHLISTS file. Does nothing if not present."""
  58. if not self._HasWatchlistsFile():
  59. return
  60. contents = self._ContentsOfWatchlistsFile()
  61. watchlists_data = None
  62. try:
  63. watchlists_data = eval(contents, {'__builtins__': None}, None)
  64. except SyntaxError as e:
  65. logging.error("Cannot parse %s. %s" % (self._GetRulesFilePath(), e))
  66. return
  67. defns = watchlists_data.get("WATCHLIST_DEFINITIONS")
  68. if not defns:
  69. logging.error("WATCHLIST_DEFINITIONS not defined in %s" %
  70. self._GetRulesFilePath())
  71. return
  72. watchlists = watchlists_data.get("WATCHLISTS")
  73. if not watchlists:
  74. logging.error("WATCHLISTS not defined in %s" % self._GetRulesFilePath())
  75. return
  76. self._defns = defns
  77. self._watchlists = watchlists
  78. # Compile the regular expressions ahead of time to avoid creating them
  79. # on-the-fly multiple times per file.
  80. self._path_regexps = {}
  81. for name, rule in defns.items():
  82. filepath = rule.get('filepath')
  83. if not filepath:
  84. continue
  85. self._path_regexps[name] = re.compile(filepath)
  86. # Verify that all watchlist names are defined
  87. for name in watchlists:
  88. if name not in defns:
  89. logging.error("%s not defined in %s" % (name, self._GetRulesFilePath()))
  90. def GetWatchersForPaths(self, paths):
  91. """Fetch the list of watchers for |paths|
  92. Args:
  93. paths: [path1, path2, ...]
  94. Returns:
  95. [u1@chromium.org, u2@gmail.com, ...]
  96. """
  97. watchers = set() # A set, to avoid duplicates
  98. for path in paths:
  99. path = path.replace(os.sep, '/')
  100. for name, rule in self._path_regexps.items():
  101. if name not in self._watchlists:
  102. continue
  103. if rule.search(path):
  104. for watchlist in self._watchlists[name]:
  105. watchers.add(watchlist)
  106. return sorted(watchers)
  107. def main(argv):
  108. # Confirm that watchlists can be parsed and spew out the watchers
  109. if len(argv) < 2:
  110. print("Usage (from the base of repo):")
  111. print(" %s [file-1] [file-2] ...." % argv[0])
  112. return 1
  113. wl = Watchlists(os.getcwd())
  114. watchers = wl.GetWatchersForPaths(argv[1:])
  115. print(watchers)
  116. if __name__ == '__main__':
  117. main(sys.argv)