gsutil_test.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/env vpython3
  2. # Copyright 2014 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Test gsutil.py."""
  6. from __future__ import unicode_literals
  7. import base64
  8. import hashlib
  9. import io
  10. import json
  11. import os
  12. import shutil
  13. import subprocess
  14. import sys
  15. import tempfile
  16. import unittest
  17. import zipfile
  18. import urllib.request
  19. # Add depot_tools to path
  20. THIS_DIR = os.path.dirname(os.path.abspath(__file__))
  21. DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
  22. sys.path.append(DEPOT_TOOLS_DIR)
  23. import gsutil
  24. class TestError(Exception):
  25. pass
  26. class FakeCall(object):
  27. def __init__(self):
  28. self.expectations = []
  29. def add_expectation(self, *args, **kwargs):
  30. returns = kwargs.pop('_returns', None)
  31. self.expectations.append((args, kwargs, returns))
  32. def __call__(self, *args, **kwargs):
  33. if not self.expectations:
  34. raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
  35. exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
  36. if args != exp_args or kwargs != exp_kwargs:
  37. message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args,
  38. exp_kwargs)
  39. message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs)
  40. raise TestError(message)
  41. return exp_returns
  42. class GsutilUnitTests(unittest.TestCase):
  43. def setUp(self):
  44. self.fake = FakeCall()
  45. self.tempdir = tempfile.mkdtemp()
  46. self.old_urlopen = getattr(urllib.request, 'urlopen')
  47. self.old_call = getattr(subprocess, 'call')
  48. setattr(urllib.request, 'urlopen', self.fake)
  49. setattr(subprocess, 'call', self.fake)
  50. def tearDown(self):
  51. self.assertEqual(self.fake.expectations, [])
  52. shutil.rmtree(self.tempdir)
  53. setattr(urllib.request, 'urlopen', self.old_urlopen)
  54. setattr(subprocess, 'call', self.old_call)
  55. def test_download_gsutil(self):
  56. version = gsutil.VERSION
  57. filename = 'gsutil_%s.zip' % version
  58. full_filename = os.path.join(self.tempdir, filename)
  59. fake_file = b'This is gsutil.zip'
  60. fake_file2 = b'This is other gsutil.zip'
  61. url = '%s%s' % (gsutil.GSUTIL_URL, filename)
  62. self.fake.add_expectation(url, _returns=io.BytesIO(fake_file))
  63. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  64. full_filename)
  65. with open(full_filename, 'rb') as f:
  66. self.assertEqual(fake_file, f.read())
  67. metadata_url = gsutil.API_URL + filename
  68. md5_calc = hashlib.md5()
  69. md5_calc.update(fake_file)
  70. b64_md5 = base64.b64encode(md5_calc.hexdigest().encode('utf-8'))
  71. self.fake.add_expectation(metadata_url,
  72. _returns=io.BytesIO(
  73. json.dumps({
  74. 'md5Hash':
  75. b64_md5.decode('utf-8')
  76. }).encode('utf-8')))
  77. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  78. full_filename)
  79. with open(full_filename, 'rb') as f:
  80. self.assertEqual(fake_file, f.read())
  81. self.assertEqual(self.fake.expectations, [])
  82. self.fake.add_expectation(
  83. metadata_url,
  84. _returns=io.BytesIO(
  85. json.dumps({
  86. 'md5Hash':
  87. base64.b64encode(b'aaaaaaa').decode('utf-8') # Bad MD5
  88. }).encode('utf-8')))
  89. self.fake.add_expectation(url, _returns=io.BytesIO(fake_file2))
  90. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  91. full_filename)
  92. with open(full_filename, 'rb') as f:
  93. self.assertEqual(fake_file2, f.read())
  94. self.assertEqual(self.fake.expectations, [])
  95. def test_ensure_gsutil_full(self):
  96. version = gsutil.VERSION
  97. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  98. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  99. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  100. os.makedirs(gsutil_dir)
  101. zip_filename = 'gsutil_%s.zip' % version
  102. url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename)
  103. _, tempzip = tempfile.mkstemp()
  104. fake_gsutil = 'Fake gsutil'
  105. with zipfile.ZipFile(tempzip, 'w') as zf:
  106. zf.writestr('gsutil/gsutil', fake_gsutil)
  107. with open(tempzip, 'rb') as f:
  108. self.fake.add_expectation(url, _returns=io.BytesIO(f.read()))
  109. # This should write the gsutil_bin with 'Fake gsutil'
  110. gsutil.ensure_gsutil(version, self.tempdir, False)
  111. self.assertTrue(os.path.exists(gsutil_bin))
  112. with open(gsutil_bin, 'r') as f:
  113. self.assertEqual(f.read(), fake_gsutil)
  114. self.assertTrue(os.path.exists(gsutil_flag))
  115. self.assertEqual(self.fake.expectations, [])
  116. def test_ensure_gsutil_short(self):
  117. version = gsutil.VERSION
  118. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  119. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  120. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  121. os.makedirs(gsutil_dir)
  122. with open(gsutil_bin, 'w') as f:
  123. f.write('Foobar')
  124. with open(gsutil_flag, 'w') as f:
  125. f.write('Barbaz')
  126. self.assertEqual(gsutil.ensure_gsutil(version, self.tempdir, False),
  127. gsutil_bin)
  128. if __name__ == '__main__':
  129. unittest.main()