upload_to_google_storage_unittest.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #!/usr/bin/env vpython3
  2. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit tests for upload_to_google_storage.py."""
  6. from io import StringIO
  7. import optparse
  8. import os
  9. import posixpath
  10. import queue
  11. import shutil
  12. import sys
  13. import tarfile
  14. import tempfile
  15. import threading
  16. import unittest
  17. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  18. import upload_to_google_storage
  19. from download_from_google_storage_unittest import GsutilMock
  20. from download_from_google_storage_unittest import ChangedWorkingDirectory
  21. # ../third_party/gsutil/gsutil
  22. GSUTIL_DEFAULT_PATH = os.path.join(
  23. os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'third_party',
  24. 'gsutil', 'gsutil')
  25. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  26. class UploadTests(unittest.TestCase):
  27. def setUp(self):
  28. self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None)
  29. self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
  30. self.base_path = os.path.join(self.temp_dir, 'gstools')
  31. shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path)
  32. self.base_url = 'gs://sometesturl'
  33. self.parser = optparse.OptionParser()
  34. self.ret_codes = queue.Queue()
  35. self.stdout_queue = queue.Queue()
  36. self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
  37. self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
  38. def tearDown(self):
  39. shutil.rmtree(self.temp_dir)
  40. sys.stdin = sys.__stdin__
  41. def test_upload_single_file(self):
  42. filenames = [self.lorem_ipsum]
  43. output_filename = '%s.sha1' % self.lorem_ipsum
  44. code = upload_to_google_storage.upload_to_google_storage(
  45. filenames, self.base_url, self.gsutil, True, False, 1, False, 'txt')
  46. self.assertEqual(self.gsutil.history, [
  47. ('check_call',
  48. ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
  49. ('check_call',
  50. ('-h', 'Cache-Control:public, max-age=31536000', 'cp', '-z', 'txt',
  51. filenames[0], '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))
  52. ])
  53. self.assertTrue(os.path.exists(output_filename))
  54. self.assertEqual(
  55. open(output_filename, 'rb').read().decode(),
  56. '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
  57. os.remove(output_filename)
  58. self.assertEqual(code, 0)
  59. def test_create_archive(self):
  60. work_dir = os.path.join(self.base_path, 'download_test_data')
  61. with ChangedWorkingDirectory(work_dir):
  62. dirname = 'subfolder'
  63. dirs = [dirname]
  64. tar_gz_file = '%s.tar.gz' % dirname
  65. self.assertTrue(
  66. upload_to_google_storage.validate_archive_dirs(dirs))
  67. upload_to_google_storage.create_archives(dirs)
  68. self.assertTrue(os.path.exists(tar_gz_file))
  69. with tarfile.open(tar_gz_file, 'r:gz') as tar:
  70. content = map(lambda x: x.name, tar.getmembers())
  71. self.assertIn(dirname, content)
  72. self.assertIn(posixpath.join(dirname, 'subfolder_text.txt'),
  73. content)
  74. self.assertIn(
  75. posixpath.join(dirname, 'subfolder_text.txt.sha1'), content)
  76. @unittest.skipIf(sys.platform == 'win32',
  77. 'os.symlink does not exist on win')
  78. def test_validate_archive_dirs_fails(self):
  79. work_dir = os.path.join(self.base_path, 'download_test_data')
  80. with ChangedWorkingDirectory(work_dir):
  81. symlink = 'link'
  82. os.symlink(os.path.join(self.base_path, 'subfolder'), symlink)
  83. self.assertFalse(
  84. upload_to_google_storage.validate_archive_dirs([symlink]))
  85. self.assertFalse(
  86. upload_to_google_storage.validate_archive_dirs(['foobar']))
  87. def test_upload_single_file_remote_exists(self):
  88. filenames = [self.lorem_ipsum]
  89. output_filename = '%s.sha1' % self.lorem_ipsum
  90. etag_string = b'ETag: 634d7c1ed3545383837428f031840a1e'
  91. self.gsutil.add_expected(0, b'', b'')
  92. self.gsutil.add_expected(0, etag_string, b'')
  93. code = upload_to_google_storage.upload_to_google_storage(
  94. filenames, self.base_url, self.gsutil, False, False, 1, False, None)
  95. self.assertEqual(
  96. self.gsutil.history,
  97. [('check_call',
  98. ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
  99. ('check_call',
  100. ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
  101. self.assertTrue(os.path.exists(output_filename))
  102. self.assertEqual(
  103. open(output_filename, 'rb').read().decode(),
  104. '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
  105. os.remove(output_filename)
  106. self.assertEqual(code, 0)
  107. def test_upload_worker_errors(self):
  108. work_queue = queue.Queue()
  109. work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1))
  110. work_queue.put((None, None))
  111. self.gsutil.add_expected(1, '', '') # For the first ls call.
  112. self.gsutil.add_expected(20, '', 'Expected error message')
  113. # pylint: disable=protected-access
  114. upload_to_google_storage._upload_worker(0, work_queue,
  115. self.base_url, self.gsutil,
  116. threading.Lock(), False, False,
  117. self.stdout_queue,
  118. self.ret_codes, None)
  119. expected_ret_codes = [(
  120. 20,
  121. 'Encountered error on uploading %s to %s/%s\nExpected error message'
  122. % (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))]
  123. self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
  124. def test_skip_hashing(self):
  125. filenames = [self.lorem_ipsum]
  126. output_filename = '%s.sha1' % self.lorem_ipsum
  127. fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f'
  128. with open(output_filename, 'wb') as f:
  129. f.write(fake_hash.encode()) # Fake hash.
  130. code = upload_to_google_storage.upload_to_google_storage(
  131. filenames, self.base_url, self.gsutil, False, False, 1, True, None)
  132. self.assertEqual(
  133. self.gsutil.history,
  134. [('check_call', ('ls', '%s/%s' % (self.base_url, fake_hash))),
  135. ('check_call', ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))),
  136. ('check_call',
  137. ('-h', 'Cache-Control:public, max-age=31536000', 'cp',
  138. filenames[0], '%s/%s' % (self.base_url, fake_hash)))])
  139. self.assertEqual(open(output_filename, 'rb').read().decode(), fake_hash)
  140. os.remove(output_filename)
  141. self.assertEqual(code, 0)
  142. def test_get_targets_no_args(self):
  143. try:
  144. upload_to_google_storage.get_targets([], self.parser, False)
  145. self.fail()
  146. except SystemExit as e:
  147. self.assertEqual(e.code, 2)
  148. def test_get_targets_passthrough(self):
  149. result = upload_to_google_storage.get_targets(['a', 'b', 'c', 'd', 'e'],
  150. self.parser, False)
  151. self.assertEqual(result, ['a', 'b', 'c', 'd', 'e'])
  152. def test_get_targets_multiple_stdin(self):
  153. inputs = ['a', 'b', 'c', 'd', 'e']
  154. sys.stdin = StringIO(os.linesep.join(inputs))
  155. result = upload_to_google_storage.get_targets(['-'], self.parser, False)
  156. self.assertEqual(result, inputs)
  157. def test_get_targets_multiple_stdin_null(self):
  158. inputs = ['a', 'b', 'c', 'd', 'e']
  159. sys.stdin = StringIO('\0'.join(inputs))
  160. result = upload_to_google_storage.get_targets(['-'], self.parser, True)
  161. self.assertEqual(result, inputs)
  162. if __name__ == '__main__':
  163. unittest.main()