upload_to_google_storage.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Uploads files to Google Storage content addressed."""
  6. from __future__ import print_function
  7. import hashlib
  8. import optparse
  9. import os
  10. import queue
  11. import re
  12. import stat
  13. import sys
  14. import tarfile
  15. import threading
  16. import time
  17. from download_from_google_storage import get_sha1
  18. from download_from_google_storage import Gsutil
  19. from download_from_google_storage import PrinterThread
  20. from download_from_google_storage import GSUTIL_DEFAULT_PATH
  21. USAGE_STRING = """%prog [options] target [target2 ...].
  22. Target is the file intended to be uploaded to Google Storage.
  23. If target is "-", then a list of files will be taken from standard input
  24. This script will generate a file (original filename).sha1 containing the
  25. sha1 sum of the uploaded file.
  26. It is recommended that the .sha1 file is checked into the repository,
  27. the original file removed from the repository, and a hook added to the
  28. DEPS file to call download_from_google_storage.py.
  29. Example usages
  30. --------------
  31. Scan the current directory and upload all files larger than 1MB:
  32. find . -name .svn -prune -o -size +1000k -type f -print0 | %prog -0 -b bkt -
  33. (Replace "bkt" with the name of a writable bucket.)
  34. """
  35. def get_md5(filename):
  36. md5_calculator = hashlib.md5()
  37. with open(filename, 'rb') as f:
  38. while True:
  39. chunk = f.read(1024 * 1024)
  40. if not chunk:
  41. break
  42. md5_calculator.update(chunk)
  43. return md5_calculator.hexdigest()
  44. def get_md5_cached(filename):
  45. """Don't calculate the MD5 if we can find a .md5 file."""
  46. # See if we can find an existing MD5 sum stored in a file.
  47. if os.path.exists('%s.md5' % filename):
  48. with open('%s.md5' % filename, 'rb') as f:
  49. md5_match = re.search('([a-z0-9]{32})', f.read().decode())
  50. if md5_match:
  51. return md5_match.group(1)
  52. else:
  53. md5_hash = get_md5(filename)
  54. with open('%s.md5' % filename, 'wb') as f:
  55. f.write(md5_hash.encode())
  56. return md5_hash
  57. def _upload_worker(thread_num, upload_queue, base_url, gsutil, md5_lock, force,
  58. use_md5, stdout_queue, ret_codes, gzip):
  59. while True:
  60. filename, sha1_sum = upload_queue.get()
  61. if not filename:
  62. break
  63. file_url = '%s/%s' % (base_url, sha1_sum)
  64. if gsutil.check_call('ls', file_url)[0] == 0 and not force:
  65. # File exists, check MD5 hash.
  66. _, out, _ = gsutil.check_call_with_retries('ls', '-L', file_url)
  67. etag_match = re.search(r'ETag:\s+\S+', out)
  68. if etag_match:
  69. stdout_queue.put('%d> File with url %s already exists' %
  70. (thread_num, file_url))
  71. remote_md5 = etag_match.group(0).split()[1]
  72. # Calculate the MD5 checksum to match it to Google Storage's
  73. # ETag.
  74. with md5_lock:
  75. if use_md5:
  76. local_md5 = get_md5_cached(filename)
  77. else:
  78. local_md5 = get_md5(filename)
  79. if local_md5 == remote_md5:
  80. stdout_queue.put(
  81. '%d> File %s already exists and MD5 matches, upload '
  82. 'skipped' % (thread_num, filename))
  83. continue
  84. stdout_queue.put('%d> Uploading %s...' % (thread_num, filename))
  85. gsutil_args = ['-h', 'Cache-Control:public, max-age=31536000', 'cp']
  86. if gzip:
  87. gsutil_args.extend(['-z', gzip])
  88. gsutil_args.extend([filename, file_url])
  89. code, _, err = gsutil.check_call_with_retries(*gsutil_args)
  90. if code != 0:
  91. ret_codes.put((code, 'Encountered error on uploading %s to %s\n%s' %
  92. (filename, file_url, err)))
  93. continue
  94. # Mark executable files with the header "x-goog-meta-executable: 1"
  95. # which the download script will check for to preserve the executable
  96. # bit.
  97. if not sys.platform.startswith('win'):
  98. if os.stat(filename).st_mode & stat.S_IEXEC:
  99. code, _, err = gsutil.check_call_with_retries(
  100. 'setmeta', '-h', 'x-goog-meta-executable:1', file_url)
  101. if code != 0:
  102. ret_codes.put(
  103. (code,
  104. 'Encountered error on setting metadata on %s\n%s' %
  105. (file_url, err)))
  106. def get_targets(args, parser, use_null_terminator):
  107. if not args:
  108. parser.error('Missing target.')
  109. if len(args) == 1 and args[0] == '-':
  110. # Take stdin as a newline or null separated list of files.
  111. if use_null_terminator:
  112. return sys.stdin.read().split('\0')
  113. return sys.stdin.read().splitlines()
  114. return args
  115. def upload_to_google_storage(input_filenames, base_url, gsutil, force, use_md5,
  116. num_threads, skip_hashing, gzip):
  117. # We only want one MD5 calculation happening at a time to avoid HD
  118. # thrashing.
  119. md5_lock = threading.Lock()
  120. # Start up all the worker threads plus the printer thread.
  121. all_threads = []
  122. ret_codes = queue.Queue()
  123. ret_codes.put((0, None))
  124. upload_queue = queue.Queue()
  125. upload_timer = time.time()
  126. stdout_queue = queue.Queue()
  127. printer_thread = PrinterThread(stdout_queue)
  128. printer_thread.daemon = True
  129. printer_thread.start()
  130. for thread_num in range(num_threads):
  131. t = threading.Thread(target=_upload_worker,
  132. args=[
  133. thread_num, upload_queue, base_url, gsutil,
  134. md5_lock, force, use_md5, stdout_queue,
  135. ret_codes, gzip
  136. ])
  137. t.daemon = True
  138. t.start()
  139. all_threads.append(t)
  140. # We want to hash everything in a single thread since its faster.
  141. # The bottleneck is in disk IO, not CPU.
  142. hashing_start = time.time()
  143. has_missing_files = False
  144. for filename in input_filenames:
  145. if not os.path.exists(filename):
  146. stdout_queue.put('Main> Error: %s not found, skipping.' % filename)
  147. has_missing_files = True
  148. continue
  149. if os.path.exists('%s.sha1' % filename) and skip_hashing:
  150. stdout_queue.put(
  151. 'Main> Found hash for %s, sha1 calculation skipped.' % filename)
  152. with open(filename + '.sha1', 'rb') as f:
  153. sha1_file = f.read(1024)
  154. if not re.match('^([a-z0-9]{40})$', sha1_file.decode()):
  155. print('Invalid sha1 hash file %s.sha1' % filename,
  156. file=sys.stderr)
  157. return 1
  158. upload_queue.put((filename, sha1_file.decode()))
  159. continue
  160. stdout_queue.put('Main> Calculating hash for %s...' % filename)
  161. sha1_sum = get_sha1(filename)
  162. with open(filename + '.sha1', 'wb') as f:
  163. f.write(sha1_sum.encode())
  164. stdout_queue.put('Main> Done calculating hash for %s.' % filename)
  165. upload_queue.put((filename, sha1_sum))
  166. hashing_duration = time.time() - hashing_start
  167. # Wait for everything to finish.
  168. for _ in all_threads:
  169. upload_queue.put((None, None)) # To mark the end of the work queue.
  170. for t in all_threads:
  171. t.join()
  172. stdout_queue.put(None)
  173. printer_thread.join()
  174. # Print timing information.
  175. print('Hashing %s files took %1f seconds' %
  176. (len(input_filenames), hashing_duration))
  177. print('Uploading took %1f seconds' % (time.time() - upload_timer))
  178. # See if we ran into any errors.
  179. max_ret_code = 0
  180. for ret_code, message in ret_codes.queue:
  181. max_ret_code = max(ret_code, max_ret_code)
  182. if message:
  183. print(message, file=sys.stderr)
  184. if has_missing_files:
  185. print('One or more input files missing', file=sys.stderr)
  186. max_ret_code = max(1, max_ret_code)
  187. if not max_ret_code:
  188. print('Success!')
  189. return max_ret_code
  190. def create_archives(dirs):
  191. archive_names = []
  192. for name in dirs:
  193. tarname = '%s.tar.gz' % name
  194. with tarfile.open(tarname, 'w:gz') as tar:
  195. tar.add(name)
  196. archive_names.append(tarname)
  197. return archive_names
  198. def validate_archive_dirs(dirs):
  199. for d in dirs:
  200. # We don't allow .. in paths in our archives.
  201. if d == '..':
  202. return False
  203. # We only allow dirs.
  204. if not os.path.isdir(d):
  205. return False
  206. # We don't allow sym links in our archives.
  207. if os.path.islink(d):
  208. return False
  209. # We required that the subdirectories we are archiving are all just
  210. # below cwd.
  211. if d not in next(os.walk('.'))[1]:
  212. return False
  213. return True
  214. def main():
  215. parser = optparse.OptionParser(USAGE_STRING)
  216. parser.add_option('-b',
  217. '--bucket',
  218. help='Google Storage bucket to upload to.')
  219. parser.add_option('-e', '--boto', help='Specify a custom boto file.')
  220. parser.add_option('-a',
  221. '--archive',
  222. action='store_true',
  223. help='Archive directory as a tar.gz file')
  224. parser.add_option('-f',
  225. '--force',
  226. action='store_true',
  227. help='Force upload even if remote file exists.')
  228. parser.add_option('-g',
  229. '--gsutil_path',
  230. default=GSUTIL_DEFAULT_PATH,
  231. help='Path to the gsutil script.')
  232. parser.add_option('-m',
  233. '--use_md5',
  234. action='store_true',
  235. help='Generate MD5 files when scanning, and don\'t check '
  236. 'the MD5 checksum if a .md5 file is found.')
  237. parser.add_option('-t',
  238. '--num_threads',
  239. default=1,
  240. type='int',
  241. help='Number of uploader threads to run.')
  242. parser.add_option('-s',
  243. '--skip_hashing',
  244. action='store_true',
  245. help='Skip hashing if .sha1 file exists.')
  246. parser.add_option('-0',
  247. '--use_null_terminator',
  248. action='store_true',
  249. help='Use \\0 instead of \\n when parsing '
  250. 'the file list from stdin. This is useful if the input '
  251. 'is coming from "find ... -print0".')
  252. parser.add_option('-z',
  253. '--gzip',
  254. metavar='ext',
  255. help='Gzip files which end in ext. '
  256. 'ext is a comma-separated list')
  257. (options, args) = parser.parse_args()
  258. # Enumerate our inputs.
  259. input_filenames = get_targets(args, parser, options.use_null_terminator)
  260. if options.archive:
  261. if not validate_archive_dirs(input_filenames):
  262. parser.error(
  263. 'Only directories just below cwd are valid entries when '
  264. 'using the --archive argument. Entries can not contain .. '
  265. ' and entries can not be symlinks. Entries was %s' %
  266. input_filenames)
  267. return 1
  268. input_filenames = create_archives(input_filenames)
  269. # Make sure we can find a working instance of gsutil.
  270. if os.path.exists(GSUTIL_DEFAULT_PATH):
  271. gsutil = Gsutil(GSUTIL_DEFAULT_PATH, boto_path=options.boto)
  272. else:
  273. gsutil = None
  274. for path in os.environ["PATH"].split(os.pathsep):
  275. if os.path.exists(path) and 'gsutil' in os.listdir(path):
  276. gsutil = Gsutil(os.path.join(path, 'gsutil'),
  277. boto_path=options.boto)
  278. if not gsutil:
  279. parser.error('gsutil not found in %s, bad depot_tools checkout?' %
  280. GSUTIL_DEFAULT_PATH)
  281. base_url = 'gs://%s' % options.bucket
  282. return upload_to_google_storage(input_filenames, base_url, gsutil,
  283. options.force, options.use_md5,
  284. options.num_threads, options.skip_hashing,
  285. options.gzip)
  286. if __name__ == '__main__':
  287. try:
  288. sys.exit(main())
  289. except KeyboardInterrupt:
  290. sys.stderr.write('interrupted\n')
  291. sys.exit(1)