gsutil_test.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #!/usr/bin/env vpython3
  2. # Copyright 2014 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Test gsutil.py."""
  6. import base64
  7. import hashlib
  8. import io
  9. import json
  10. import os
  11. import shutil
  12. import subprocess
  13. import sys
  14. import tempfile
  15. import unittest
  16. import zipfile
  17. import urllib.request
  18. # Add depot_tools to path
  19. THIS_DIR = os.path.dirname(os.path.abspath(__file__))
  20. DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
  21. sys.path.append(DEPOT_TOOLS_DIR)
  22. import gsutil
  23. class TestError(Exception):
  24. pass
  25. class FakeCall(object):
  26. def __init__(self):
  27. self.expectations = []
  28. def add_expectation(self, *args, **kwargs):
  29. returns = kwargs.pop('_returns', None)
  30. self.expectations.append((args, kwargs, returns))
  31. def __call__(self, *args, **kwargs):
  32. if not self.expectations:
  33. raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
  34. exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
  35. if args != exp_args or kwargs != exp_kwargs:
  36. message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args,
  37. exp_kwargs)
  38. message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs)
  39. raise TestError(message)
  40. return exp_returns
  41. class GsutilUnitTests(unittest.TestCase):
  42. def setUp(self):
  43. self.fake = FakeCall()
  44. self.tempdir = tempfile.mkdtemp()
  45. self.old_urlopen = getattr(urllib.request, 'urlopen')
  46. self.old_call = getattr(subprocess, 'call')
  47. setattr(urllib.request, 'urlopen', self.fake)
  48. setattr(subprocess, 'call', self.fake)
  49. def tearDown(self):
  50. self.assertEqual(self.fake.expectations, [])
  51. shutil.rmtree(self.tempdir)
  52. setattr(urllib.request, 'urlopen', self.old_urlopen)
  53. setattr(subprocess, 'call', self.old_call)
  54. def test_download_gsutil(self):
  55. version = gsutil.VERSION
  56. filename = 'gsutil_%s.zip' % version
  57. full_filename = os.path.join(self.tempdir, filename)
  58. fake_file = b'This is gsutil.zip'
  59. fake_file2 = b'This is other gsutil.zip'
  60. url = '%s%s' % (gsutil.GSUTIL_URL, filename)
  61. self.fake.add_expectation(url, _returns=io.BytesIO(fake_file))
  62. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  63. full_filename)
  64. with open(full_filename, 'rb') as f:
  65. self.assertEqual(fake_file, f.read())
  66. metadata_url = gsutil.API_URL + filename
  67. md5_calc = hashlib.md5()
  68. md5_calc.update(fake_file)
  69. b64_md5 = base64.b64encode(md5_calc.hexdigest().encode('utf-8'))
  70. self.fake.add_expectation(metadata_url,
  71. _returns=io.BytesIO(
  72. json.dumps({
  73. 'md5Hash':
  74. b64_md5.decode('utf-8')
  75. }).encode('utf-8')))
  76. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  77. full_filename)
  78. with open(full_filename, 'rb') as f:
  79. self.assertEqual(fake_file, f.read())
  80. self.assertEqual(self.fake.expectations, [])
  81. self.fake.add_expectation(
  82. metadata_url,
  83. _returns=io.BytesIO(
  84. json.dumps({
  85. 'md5Hash':
  86. base64.b64encode(b'aaaaaaa').decode('utf-8') # Bad MD5
  87. }).encode('utf-8')))
  88. self.fake.add_expectation(url, _returns=io.BytesIO(fake_file2))
  89. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  90. full_filename)
  91. with open(full_filename, 'rb') as f:
  92. self.assertEqual(fake_file2, f.read())
  93. self.assertEqual(self.fake.expectations, [])
  94. def test_ensure_gsutil_full(self):
  95. version = gsutil.VERSION
  96. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  97. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  98. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  99. os.makedirs(gsutil_dir)
  100. zip_filename = 'gsutil_%s.zip' % version
  101. url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename)
  102. _, tempzip = tempfile.mkstemp()
  103. fake_gsutil = 'Fake gsutil'
  104. with zipfile.ZipFile(tempzip, 'w') as zf:
  105. zf.writestr('gsutil/gsutil', fake_gsutil)
  106. with open(tempzip, 'rb') as f:
  107. self.fake.add_expectation(url, _returns=io.BytesIO(f.read()))
  108. # This should write the gsutil_bin with 'Fake gsutil'
  109. gsutil.ensure_gsutil(version, self.tempdir, False)
  110. self.assertTrue(os.path.exists(gsutil_bin))
  111. with open(gsutil_bin, 'r') as f:
  112. self.assertEqual(f.read(), fake_gsutil)
  113. self.assertTrue(os.path.exists(gsutil_flag))
  114. self.assertEqual(self.fake.expectations, [])
  115. def test_ensure_gsutil_short(self):
  116. version = gsutil.VERSION
  117. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  118. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  119. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  120. os.makedirs(gsutil_dir)
  121. with open(gsutil_bin, 'w') as f:
  122. f.write('Foobar')
  123. with open(gsutil_flag, 'w') as f:
  124. f.write('Barbaz')
  125. self.assertEqual(gsutil.ensure_gsutil(version, self.tempdir, False),
  126. gsutil_bin)
  127. if __name__ == '__main__':
  128. unittest.main()