gsutil_test.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. #!/usr/bin/env vpython3
  2. # Copyright 2014 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Test gsutil.py."""
  6. import base64
  7. import hashlib
  8. import io
  9. import json
  10. import os
  11. import shutil
  12. import subprocess
  13. import sys
  14. import tempfile
  15. import unittest
  16. import zipfile
  17. import urllib.request
  18. from unittest import mock
  19. # Add depot_tools to path
  20. THIS_DIR = os.path.dirname(os.path.abspath(__file__))
  21. DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
  22. sys.path.append(DEPOT_TOOLS_DIR)
  23. import gsutil
  24. class TestError(Exception):
  25. pass
  26. class FakeCall(object):
  27. def __init__(self):
  28. self.expectations = []
  29. def add_expectation(self, *args, **kwargs):
  30. returns = kwargs.pop('_returns', None)
  31. self.expectations.append((args, kwargs, returns))
  32. def __call__(self, *args, **kwargs):
  33. if not self.expectations:
  34. raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
  35. exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
  36. if args != exp_args or kwargs != exp_kwargs:
  37. message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args,
  38. exp_kwargs)
  39. message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs)
  40. raise TestError(message)
  41. return exp_returns
  42. class MockResponse(io.BytesIO):
  43. def info(self):
  44. # urlretrieve expects info() to return a dictionary.
  45. return {}
  46. class GsutilUnitTests(unittest.TestCase):
  47. def setUp(self):
  48. self.fake = FakeCall()
  49. self.tempdir = tempfile.mkdtemp()
  50. self.old_urlopen = getattr(urllib.request, 'urlopen')
  51. self.old_call = getattr(subprocess, 'call')
  52. setattr(urllib.request, 'urlopen', self.fake)
  53. setattr(subprocess, 'call', self.fake)
  54. def tearDown(self):
  55. self.assertEqual(self.fake.expectations, [])
  56. shutil.rmtree(self.tempdir)
  57. setattr(urllib.request, 'urlopen', self.old_urlopen)
  58. setattr(subprocess, 'call', self.old_call)
  59. def add_md5_expectation(self, url, data):
  60. md5_calc = hashlib.md5()
  61. md5_calc.update(data)
  62. b64_md5 = base64.b64encode(md5_calc.digest()).decode('utf-8')
  63. response_data = json.dumps({'md5Hash': b64_md5}).encode('utf-8')
  64. self.fake.add_expectation(url, _returns=MockResponse(response_data))
  65. def add_file_expectation(self, url, data):
  66. self.fake.add_expectation(url, None, _returns=MockResponse(data))
  67. def test_download_gsutil(self):
  68. version = gsutil.VERSION
  69. filename = 'gsutil_%s.zip' % version
  70. full_filename = os.path.join(self.tempdir, filename)
  71. fake_file = b'This is gsutil.zip'
  72. fake_file2 = b'This is other gsutil.zip'
  73. metadata_url = gsutil.API_URL + filename
  74. url = gsutil.GSUTIL_URL + filename
  75. # The md5 is valid, so download_gsutil should download the file.
  76. self.add_md5_expectation(metadata_url, fake_file)
  77. self.add_file_expectation(url, fake_file)
  78. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  79. full_filename)
  80. with open(full_filename, 'rb') as f:
  81. self.assertEqual(fake_file, f.read())
  82. self.assertEqual(self.fake.expectations, [])
  83. # The md5 is valid, so download_gsutil should use the existing file.
  84. self.add_md5_expectation(metadata_url, fake_file)
  85. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  86. full_filename)
  87. with open(full_filename, 'rb') as f:
  88. self.assertEqual(fake_file, f.read())
  89. self.assertEqual(self.fake.expectations, [])
  90. # The md5 is invalid for a new file, so download_gsutil should raise an
  91. # error.
  92. self.add_md5_expectation(metadata_url, b'aaaaaaa')
  93. self.add_file_expectation(url, fake_file2)
  94. self.assertRaises(gsutil.InvalidGsutilError, gsutil.download_gsutil,
  95. version, self.tempdir)
  96. self.assertEqual(self.fake.expectations, [])
  97. # The md5 is valid and the new file is already downloaded, so it should
  98. # be used without downloading again.
  99. self.add_md5_expectation(metadata_url, fake_file2)
  100. self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
  101. full_filename)
  102. with open(full_filename, 'rb') as f:
  103. self.assertEqual(fake_file2, f.read())
  104. self.assertEqual(self.fake.expectations, [])
  105. def test_ensure_gsutil_full(self):
  106. version = gsutil.VERSION
  107. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  108. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  109. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  110. os.makedirs(gsutil_dir)
  111. zip_filename = 'gsutil_%s.zip' % version
  112. metadata_url = gsutil.API_URL + zip_filename
  113. url = gsutil.GSUTIL_URL + zip_filename
  114. _, tempzip = tempfile.mkstemp()
  115. fake_gsutil = 'Fake gsutil'
  116. with zipfile.ZipFile(tempzip, 'w') as zf:
  117. zf.writestr('gsutil/gsutil', fake_gsutil)
  118. with open(tempzip, 'rb') as f:
  119. fake_file = f.read()
  120. self.add_md5_expectation(metadata_url, fake_file)
  121. self.add_file_expectation(url, fake_file)
  122. # This should write the gsutil_bin with 'Fake gsutil'
  123. gsutil.ensure_gsutil(version, self.tempdir, False)
  124. self.assertTrue(os.path.exists(gsutil_bin))
  125. with open(gsutil_bin, 'r') as f:
  126. self.assertEqual(f.read(), fake_gsutil)
  127. self.assertTrue(os.path.exists(gsutil_flag))
  128. self.assertEqual(self.fake.expectations, [])
  129. def test_ensure_gsutil_short(self):
  130. version = gsutil.VERSION
  131. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  132. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  133. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  134. os.makedirs(gsutil_dir)
  135. with open(gsutil_bin, 'w') as f:
  136. f.write('Foobar')
  137. with open(gsutil_flag, 'w') as f:
  138. f.write('Barbaz')
  139. self.assertEqual(gsutil.ensure_gsutil(version, self.tempdir, False),
  140. gsutil_bin)
  141. @mock.patch('sys.platform', 'linux')
  142. def test__is_supported_platform_returns_true_for_supported_platform(self):
  143. self.assertTrue(gsutil._is_luci_auth_supported_platform())
  144. @mock.patch('sys.platform', 'aix')
  145. def test__is_supported_platform_returns_false_for_unsupported_platform(
  146. self):
  147. self.assertFalse(gsutil._is_luci_auth_supported_platform())
  148. if __name__ == '__main__':
  149. unittest.main()