watchlists.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #!/usr/bin/env python
  2. # Copyright (c) 2011 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Watchlists
  6. Watchlists is a mechanism that allow a developer (a "watcher") to watch over
  7. portions of code that they are interested in. A "watcher" will be cc-ed to
  8. changes that modify that portion of code, thereby giving them an opportunity
  9. to make comments on codereview.chromium.org even before the change is
  10. committed.
  11. Refer: http://dev.chromium.org/developers/contributing-code/watchlists
  12. When invoked directly from the base of a repository, this script lists out
  13. the watchers for files given on the command line. This is useful to verify
  14. changes to WATCHLISTS files.
  15. """
  16. from __future__ import print_function
  17. import logging
  18. import os
  19. import re
  20. import sys
  21. class Watchlists(object):
  22. """Manage Watchlists.
  23. This class provides mechanism to load watchlists for a repo and identify
  24. watchers.
  25. Usage:
  26. wl = Watchlists("/path/to/repo/root")
  27. watchers = wl.GetWatchersForPaths(["/path/to/file1",
  28. "/path/to/file2",])
  29. """
  30. _RULES = "WATCHLISTS"
  31. _RULES_FILENAME = _RULES
  32. _repo_root = None
  33. _defns = {} # Definitions
  34. _path_regexps = {} # Name -> Regular expression mapping
  35. _watchlists = {} # name to email mapping
  36. def __init__(self, repo_root):
  37. self._repo_root = repo_root
  38. self._LoadWatchlistRules()
  39. def _GetRulesFilePath(self):
  40. """Returns path to WATCHLISTS file."""
  41. return os.path.join(self._repo_root, self._RULES_FILENAME)
  42. def _HasWatchlistsFile(self):
  43. """Determine if watchlists are available for this repo."""
  44. return os.path.exists(self._GetRulesFilePath())
  45. def _ContentsOfWatchlistsFile(self):
  46. """Read the WATCHLISTS file and return its contents."""
  47. try:
  48. watchlists_file = open(self._GetRulesFilePath())
  49. contents = watchlists_file.read()
  50. watchlists_file.close()
  51. return contents
  52. except IOError as e:
  53. logging.error("Cannot read %s: %s" % (self._GetRulesFilePath(), e))
  54. return ''
  55. def _LoadWatchlistRules(self):
  56. """Load watchlists from WATCHLISTS file. Does nothing if not present."""
  57. if not self._HasWatchlistsFile():
  58. return
  59. contents = self._ContentsOfWatchlistsFile()
  60. watchlists_data = None
  61. try:
  62. watchlists_data = eval(contents, {'__builtins__': None}, None)
  63. except SyntaxError as e:
  64. logging.error("Cannot parse %s. %s" % (self._GetRulesFilePath(), e))
  65. return
  66. defns = watchlists_data.get("WATCHLIST_DEFINITIONS")
  67. if not defns:
  68. logging.error("WATCHLIST_DEFINITIONS not defined in %s" %
  69. self._GetRulesFilePath())
  70. return
  71. watchlists = watchlists_data.get("WATCHLISTS")
  72. if not watchlists:
  73. logging.error("WATCHLISTS not defined in %s" % self._GetRulesFilePath())
  74. return
  75. self._defns = defns
  76. self._watchlists = watchlists
  77. # Compile the regular expressions ahead of time to avoid creating them
  78. # on-the-fly multiple times per file.
  79. self._path_regexps = {}
  80. for name, rule in defns.items():
  81. filepath = rule.get('filepath')
  82. if not filepath:
  83. continue
  84. self._path_regexps[name] = re.compile(filepath)
  85. # Verify that all watchlist names are defined
  86. for name in watchlists:
  87. if name not in defns:
  88. logging.error("%s not defined in %s" % (name, self._GetRulesFilePath()))
  89. def GetWatchersForPaths(self, paths):
  90. """Fetch the list of watchers for |paths|
  91. Args:
  92. paths: [path1, path2, ...]
  93. Returns:
  94. [u1@chromium.org, u2@gmail.com, ...]
  95. """
  96. watchers = set() # A set, to avoid duplicates
  97. for path in paths:
  98. path = path.replace(os.sep, '/')
  99. for name, rule in self._path_regexps.items():
  100. if name not in self._watchlists:
  101. continue
  102. if rule.search(path):
  103. for watchlist in self._watchlists[name]:
  104. watchers.add(watchlist)
  105. return sorted(watchers)
  106. def main(argv):
  107. # Confirm that watchlists can be parsed and spew out the watchers
  108. if len(argv) < 2:
  109. print("Usage (from the base of repo):")
  110. print(" %s [file-1] [file-2] ...." % argv[0])
  111. return 1
  112. wl = Watchlists(os.getcwd())
  113. watchers = wl.GetWatchersForPaths(argv[1:])
  114. print(watchers)
  115. if __name__ == '__main__':
  116. main(sys.argv)