gsutil_test.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/env vpython3
  2. # Copyright 2014 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Test gsutil.py."""
  6. from __future__ import unicode_literals
  7. import base64
  8. import hashlib
  9. import io
  10. import json
  11. import os
  12. import shutil
  13. import subprocess
  14. import sys
  15. import tempfile
  16. import unittest
  17. import zipfile
  18. try:
  19. import urllib2 as urllib
  20. except ImportError: # For Py3 compatibility
  21. import urllib.request as urllib
  22. # Add depot_tools to path
  23. THIS_DIR = os.path.dirname(os.path.abspath(__file__))
  24. DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
  25. sys.path.append(DEPOT_TOOLS_DIR)
  26. import gsutil
  27. class TestError(Exception):
  28. pass
  29. class FakeCall(object):
  30. def __init__(self):
  31. self.expectations = []
  32. def add_expectation(self, *args, **kwargs):
  33. returns = kwargs.pop('_returns', None)
  34. self.expectations.append((args, kwargs, returns))
  35. def __call__(self, *args, **kwargs):
  36. if not self.expectations:
  37. raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
  38. exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
  39. if args != exp_args or kwargs != exp_kwargs:
  40. message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args, exp_kwargs)
  41. message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs)
  42. raise TestError(message)
  43. return exp_returns
  44. class GsutilUnitTests(unittest.TestCase):
  45. def setUp(self):
  46. self.fake = FakeCall()
  47. self.tempdir = tempfile.mkdtemp()
  48. self.old_urlopen = getattr(urllib, 'urlopen')
  49. self.old_call = getattr(subprocess, 'call')
  50. setattr(urllib, 'urlopen', self.fake)
  51. setattr(subprocess, 'call', self.fake)
  52. def tearDown(self):
  53. self.assertEqual(self.fake.expectations, [])
  54. shutil.rmtree(self.tempdir)
  55. setattr(urllib, 'urlopen', self.old_urlopen)
  56. setattr(subprocess, 'call', self.old_call)
  57. def test_download_gsutil(self):
  58. version = '4.2'
  59. filename = 'gsutil_%s.zip' % version
  60. full_filename = os.path.join(self.tempdir, filename)
  61. fake_file = b'This is gsutil.zip'
  62. fake_file2 = b'This is other gsutil.zip'
  63. url = '%s%s' % (gsutil.GSUTIL_URL, filename)
  64. self.fake.add_expectation(url, _returns=io.BytesIO(fake_file))
  65. self.assertEqual(
  66. gsutil.download_gsutil(version, self.tempdir), full_filename)
  67. with open(full_filename, 'rb') as f:
  68. self.assertEqual(fake_file, f.read())
  69. metadata_url = gsutil.API_URL + filename
  70. md5_calc = hashlib.md5()
  71. md5_calc.update(fake_file)
  72. b64_md5 = base64.b64encode(md5_calc.hexdigest().encode('utf-8'))
  73. self.fake.add_expectation(
  74. metadata_url,
  75. _returns=io.BytesIO(
  76. json.dumps({'md5Hash': b64_md5.decode('utf-8')}).encode('utf-8')))
  77. self.assertEqual(
  78. gsutil.download_gsutil(version, self.tempdir), full_filename)
  79. with open(full_filename, 'rb') as f:
  80. self.assertEqual(fake_file, f.read())
  81. self.assertEqual(self.fake.expectations, [])
  82. self.fake.add_expectation(
  83. metadata_url,
  84. _returns=io.BytesIO(
  85. json.dumps({
  86. 'md5Hash': base64.b64encode(b'aaaaaaa').decode('utf-8') # Bad MD5
  87. }).encode('utf-8')))
  88. self.fake.add_expectation(url, _returns=io.BytesIO(fake_file2))
  89. self.assertEqual(
  90. gsutil.download_gsutil(version, self.tempdir), full_filename)
  91. with open(full_filename, 'rb') as f:
  92. self.assertEqual(fake_file2, f.read())
  93. self.assertEqual(self.fake.expectations, [])
  94. def test_ensure_gsutil_full(self):
  95. version = '4.2'
  96. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  97. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  98. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  99. os.makedirs(gsutil_dir)
  100. zip_filename = 'gsutil_%s.zip' % version
  101. url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename)
  102. _, tempzip = tempfile.mkstemp()
  103. fake_gsutil = 'Fake gsutil'
  104. with zipfile.ZipFile(tempzip, 'w') as zf:
  105. zf.writestr('gsutil/gsutil', fake_gsutil)
  106. with open(tempzip, 'rb') as f:
  107. self.fake.add_expectation(url, _returns=io.BytesIO(f.read()))
  108. # This should write the gsutil_bin with 'Fake gsutil'
  109. gsutil.ensure_gsutil(version, self.tempdir, False)
  110. self.assertTrue(os.path.exists(gsutil_bin))
  111. with open(gsutil_bin, 'r') as f:
  112. self.assertEqual(f.read(), fake_gsutil)
  113. self.assertTrue(os.path.exists(gsutil_flag))
  114. self.assertEqual(self.fake.expectations, [])
  115. def test_ensure_gsutil_short(self):
  116. version = '4.2'
  117. gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
  118. gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
  119. gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
  120. os.makedirs(gsutil_dir)
  121. with open(gsutil_bin, 'w') as f:
  122. f.write('Foobar')
  123. with open(gsutil_flag, 'w') as f:
  124. f.write('Barbaz')
  125. self.assertEqual(
  126. gsutil.ensure_gsutil(version, self.tempdir, False), gsutil_bin)
  127. if __name__ == '__main__':
  128. unittest.main()