fake_cipd.py 6.1 KB


  1. #!/usr/bin/env python3
  2. # Copyright (c) 2018 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. from string import Template
  6. import argparse
  7. import io
  8. import json
  9. import os
  10. import re
  11. import shutil
  12. import sys
  13. ARCH_VAR = 'arch'
  14. OS_VAR = 'os'
  15. PLATFORM_VAR = 'platform'
  16. CIPD_SUBDIR_RE = '@Subdir (.*)'
  17. CIPD_DESCRIBE = 'describe'
  18. CIPD_ENSURE = 'ensure'
  19. CIPD_ENSURE_FILE_RESOLVE = 'ensure-file-resolve'
  20. CIPD_EXPAND_PKG = 'expand-package-name'
  21. CIPD_EXPORT = 'export'
  22. DESCRIBE_STDOUT_TEMPLATE = """\
  23. Package: ${package}
  24. Instance ID: ${package}-fake-instance-id
  25. Registered by: user:fake-testing-support
  26. Registered at: 2023-05-10 18:53:55.078574 +0000 UTC
  27. Refs:
  28. ${package}-latest
  29. Tags:
  30. git_revision:${package}-fake-git-revision
  31. ${package}-fake-tag:1.0
  32. ${package}-fake-tag:2.0
  33. """
  34. DESCRIBE_JSON_TEMPLATE = """{
  35. "result": {
  36. "pin": {
  37. "package": "${package}",
  38. "instance_id": "${package}-fake-instance-id"
  39. },
  40. "registered_by": "user:fake-testing-support",
  41. "registered_ts": 1683744835,
  42. "refs": [
  43. {
  44. "ref": "${package}-latest",
  45. "instance_id": "${package}-fake-instance-id",
  46. "modified_by": "user:fake-testing-support",
  47. "modified_ts": 1683744835
  48. }
  49. ],
  50. "tags": [
  51. {
  52. "tag": "git_revision:${package}-fake-git-revision",
  53. "registered_by": "user:fake-testing-support",
  54. "registered_ts": 1683744835
  55. },
  56. {
  57. "tag": "${package}-fake-tag:1.0",
  58. "registered_by": "user:fake-testing-support",
  59. "registered_ts": 1683744835
  60. },
  61. {
  62. "tag": "${package}-fake-tag:2.0",
  63. "registered_by": "user:fake-testing-support",
  64. "registered_ts": 1683744835
  65. }
  66. ]
  67. }
  68. }"""
  69. def parse_cipd(root, contents):
  70. tree = {}
  71. current_subdir = None
  72. for line in contents:
  73. line = line.strip()
  74. match = re.match(CIPD_SUBDIR_RE, line)
  75. if match:
  76. print('match')
  77. current_subdir = os.path.join(root, *match.group(1).split('/'))
  78. if not root:
  79. current_subdir = match.group(1)
  80. elif line and current_subdir:
  81. print('no match')
  82. tree.setdefault(current_subdir, []).append(line)
  83. return tree
  84. def expand_package_name_cmd(package_name):
  85. package_split = package_name.split("/")
  86. suffix = package_split[-1]
  87. # Any use of var equality should return empty for testing.
  88. if "=" in suffix:
  89. if suffix != "${platform=fake-platform-ok}":
  90. return ""
  91. package_name = "/".join(package_split[:-1] + ["${platform}"])
  92. for v in [ARCH_VAR, OS_VAR, PLATFORM_VAR]:
  93. var = "${%s}" % v
  94. if package_name.endswith(var):
  95. package_name = package_name.replace(var,
  96. "%s-expanded-test-only" % v)
  97. return package_name
  98. def ensure_file_resolve():
  99. resolved = {"result": {}}
  100. parser = argparse.ArgumentParser()
  101. parser.add_argument('-ensure-file', required=True)
  102. parser.add_argument('-json-output')
  103. args, _ = parser.parse_known_args()
  104. with io.open(args.ensure_file, 'r', encoding='utf-8') as f:
  105. new_content = parse_cipd("", f.readlines())
  106. for path, packages in new_content.items():
  107. resolved_packages = []
  108. for package in packages:
  109. package_name = expand_package_name_cmd(package.split(" ")[0])
  110. resolved_packages.append({
  111. "package": package_name,
  112. "pin": {
  113. "package": package_name,
  114. "instance_id": package_name + "-fake-resolved-id",
  115. }
  116. })
  117. resolved["result"][path] = resolved_packages
  118. with io.open(args.json_output, 'w', encoding='utf-8') as f:
  119. f.write(json.dumps(resolved, indent=4))
  120. def describe_cmd(package_name):
  121. parser = argparse.ArgumentParser()
  122. parser.add_argument('-json-output')
  123. parser.add_argument('-version', required=True)
  124. args, _ = parser.parse_known_args()
  125. json_template = Template(DESCRIBE_JSON_TEMPLATE).substitute(
  126. package=package_name)
  127. cli_out = Template(DESCRIBE_STDOUT_TEMPLATE).substitute(
  128. package=package_name)
  129. json_out = json.loads(json_template)
  130. found = False
  131. for tag in json_out['result']['tags']:
  132. if tag['tag'] == args.version:
  133. found = True
  134. break
  135. for tag in json_out['result']['refs']:
  136. if tag['ref'] == args.version:
  137. found = True
  138. break
  139. if found:
  140. if args.json_output:
  141. with io.open(args.json_output, 'w', encoding='utf-8') as f:
  142. f.write(json.dumps(json_out, indent=4))
  143. sys.stdout.write(cli_out)
  144. return 0
  145. sys.stdout.write('Error: no such ref.\n')
  146. return 1
  147. def main():
  148. cmd = sys.argv[1]
  149. assert cmd in [
  150. CIPD_DESCRIBE, CIPD_ENSURE, CIPD_ENSURE_FILE_RESOLVE, CIPD_EXPAND_PKG,
  151. CIPD_EXPORT
  152. ]
  153. # Handle cipd expand-package-name
  154. if cmd == CIPD_EXPAND_PKG:
  155. # Expecting argument after cmd
  156. assert len(sys.argv) == 3
  157. # Write result to stdout
  158. sys.stdout.write(expand_package_name_cmd(sys.argv[2]))
  159. return 0
  160. if cmd == CIPD_DESCRIBE:
  161. # Expecting argument after cmd
  162. assert len(sys.argv) >= 3
  163. return describe_cmd(sys.argv[2])
  164. if cmd == CIPD_ENSURE_FILE_RESOLVE:
  165. return ensure_file_resolve()
  166. parser = argparse.ArgumentParser()
  167. parser.add_argument('-ensure-file')
  168. parser.add_argument('-root')
  169. args, _ = parser.parse_known_args()
  170. with io.open(args.ensure_file, 'r', encoding='utf-8') as f:
  171. new_content = parse_cipd(args.root, f.readlines())
  172. # Install new packages
  173. for path, packages in new_content.items():
  174. if not os.path.exists(path):
  175. os.makedirs(path)
  176. with io.open(os.path.join(path, '_cipd'), 'w', encoding='utf-8') as f:
  177. f.write('\n'.join(packages))
  178. # Save the ensure file that we got
  179. shutil.copy(args.ensure_file, os.path.join(args.root, '_cipd'))
  180. return 0
  181. if __name__ == '__main__':
  182. sys.exit(main())