123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496 |
- #!/usr/bin/env vpython3
- # Copyright (c) 2012 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- # pylint: disable=protected-access
- """Unit tests for download_from_google_storage.py."""
- import optparse
- import os
- import queue
- import shutil
- import sys
- import tarfile
- import tempfile
- import threading
- import unittest
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import upload_to_google_storage
- import download_from_google_storage
- # ../third_party/gsutil/gsutil
- GSUTIL_DEFAULT_PATH = os.path.join(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'gsutil.py')
- TEST_DIR = os.path.dirname(os.path.abspath(__file__))
- class GsutilMock(object):
- def __init__(self, path, boto_path, timeout=None):
- self.path = path
- self.timeout = timeout
- self.boto_path = boto_path
- self.expected = []
- self.history = []
- self.lock = threading.Lock()
- def add_expected(self, return_code, out, err, fn=None):
- self.expected.append((return_code, out, err, fn))
- def append_history(self, method, args):
- self.history.append((method, args))
- def call(self, *args):
- with self.lock:
- self.append_history('call', args)
- if self.expected:
- code, _out, _err, fn = self.expected.pop(0)
- if fn:
- fn()
- return code
- return 0
- def check_call(self, *args):
- with self.lock:
- self.append_history('check_call', args)
- if self.expected:
- code, out, err, fn = self.expected.pop(0)
- if fn:
- fn()
- return code, out, err
- return (0, '', '')
- def check_call_with_retries(self, *args):
- return self.check_call(*args)
- class ChangedWorkingDirectory(object):
- def __init__(self, working_directory):
- self._old_cwd = ''
- self._working_directory = working_directory
- def __enter__(self):
- self._old_cwd = os.getcwd()
- print("Enter directory = ", self._working_directory)
- os.chdir(self._working_directory)
- def __exit__(self, *_):
- print("Enter directory = ", self._old_cwd)
- os.chdir(self._old_cwd)
- class GstoolsUnitTests(unittest.TestCase):
- def setUp(self):
- self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
- self.base_path = os.path.join(self.temp_dir, 'test_files')
- shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path)
- def tearDown(self):
- shutil.rmtree(self.temp_dir)
- def test_validate_tar_file(self):
- lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
- with ChangedWorkingDirectory(self.base_path):
- # Sanity ok check.
- tar_dir = 'ok_dir'
- os.makedirs(os.path.join(self.base_path, tar_dir))
- tar = 'good.tar.gz'
- lorem_ipsum_copy = os.path.join(tar_dir, 'lorem_ipsum.txt')
- shutil.copyfile(lorem_ipsum, lorem_ipsum_copy)
- with tarfile.open(tar, 'w:gz') as tar:
- tar.add(lorem_ipsum_copy)
- self.assertTrue(
- download_from_google_storage._validate_tar_file(
- tar, tar_dir))
- # os.symlink doesn't exist on Windows.
- if sys.platform != 'win32':
- # Test no links.
- tar_dir_link = 'for_tar_link'
- os.makedirs(tar_dir_link)
- link = os.path.join(tar_dir_link, 'link')
- os.symlink(lorem_ipsum, link)
- tar_with_links = 'with_links.tar.gz'
- with tarfile.open(tar_with_links, 'w:gz') as tar:
- tar.add(link)
- self.assertFalse(
- download_from_google_storage._validate_tar_file(
- tar, tar_dir_link))
- # Test not outside.
- tar_dir_outside = 'outside_tar'
- os.makedirs(tar_dir_outside)
- tar_with_outside = 'with_outside.tar.gz'
- with tarfile.open(tar_with_outside, 'w:gz') as tar:
- tar.add(lorem_ipsum)
- self.assertFalse(
- download_from_google_storage._validate_tar_file(
- tar, tar_dir_outside))
- # Test no ../
- tar_with_dotdot = 'with_dotdot.tar.gz'
- dotdot_file = os.path.join(tar_dir, '..', tar_dir,
- 'lorem_ipsum.txt')
- with tarfile.open(tar_with_dotdot, 'w:gz') as tar:
- tar.add(dotdot_file)
- self.assertFalse(
- download_from_google_storage._validate_tar_file(
- tar, tar_dir))
- # Test normal file with .. in name okay
- tar_with_hidden = 'with_normal_dotdot.tar.gz'
- hidden_file = os.path.join(tar_dir, '..hidden_file.txt')
- shutil.copyfile(lorem_ipsum, hidden_file)
- with tarfile.open(tar_with_hidden, 'w:gz') as tar:
- tar.add(hidden_file)
- self.assertTrue(
- download_from_google_storage._validate_tar_file(
- tar, tar_dir))
- def test_gsutil(self):
- # This will download a real gsutil package from Google Storage.
- gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH, None)
- self.assertEqual(gsutil.path, GSUTIL_DEFAULT_PATH)
- code, _, err = gsutil.check_call()
- self.assertEqual(code, 0, err)
- self.assertEqual(err, '')
- def test_get_sha1(self):
- lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
- self.assertEqual(download_from_google_storage.get_sha1(lorem_ipsum),
- '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
- def test_get_md5(self):
- lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
- self.assertEqual(upload_to_google_storage.get_md5(lorem_ipsum),
- '634d7c1ed3545383837428f031840a1e')
- def test_get_md5_cached_read(self):
- lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
- # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum.
- self.assertEqual(upload_to_google_storage.get_md5_cached(lorem_ipsum),
- '734d7c1ed3545383837428f031840a1e')
- def test_get_md5_cached_write(self):
- lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt')
- lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5')
- if os.path.exists(lorem_ipsum2_md5):
- os.remove(lorem_ipsum2_md5)
- # Use a fake 'stale' MD5 sum. Expected behavior is to return stale sum.
- self.assertEqual(upload_to_google_storage.get_md5_cached(lorem_ipsum2),
- '4c02d1eb455a0f22c575265d17b84b6d')
- self.assertTrue(os.path.exists(lorem_ipsum2_md5))
- self.assertEqual(
- open(lorem_ipsum2_md5, 'rb').read().decode(),
- '4c02d1eb455a0f22c575265d17b84b6d')
- os.remove(lorem_ipsum2_md5) # Clean up.
- self.assertFalse(os.path.exists(lorem_ipsum2_md5))
- class DownloadTests(unittest.TestCase):
- def setUp(self):
- self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None)
- self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
- self.checkout_test_files = os.path.join(TEST_DIR, 'gstools',
- 'download_test_data')
- self.base_path = os.path.join(self.temp_dir, 'download_test_data')
- shutil.copytree(self.checkout_test_files, self.base_path)
- self.base_url = 'gs://sometesturl'
- self.parser = optparse.OptionParser()
- self.queue = queue.Queue()
- self.ret_codes = queue.Queue()
- self.lorem_ipsum = os.path.join(TEST_DIR, 'gstools', 'lorem_ipsum.txt')
- self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
- self.maxDiff = None
- def tearDown(self):
- shutil.rmtree(self.temp_dir)
- def test_enumerate_files_non_recursive(self):
- for item in download_from_google_storage.enumerate_input(
- self.base_path, True, False, False, None, False, False):
- self.queue.put(item)
- expected_queue = [('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe',
- os.path.join(self.base_path, 'rootfolder_text.txt')),
- ('7871c8e24da15bad8b0be2c36edc9dc77e37727f',
- os.path.join(self.base_path,
- 'uploaded_lorem_ipsum.txt'))]
- self.assertEqual(sorted(expected_queue), sorted(self.queue.queue))
- def test_enumerate_files_recursive(self):
- for item in download_from_google_storage.enumerate_input(
- self.base_path, True, True, False, None, False, False):
- self.queue.put(item)
- expected_queue = [
- ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe',
- os.path.join(self.base_path, 'rootfolder_text.txt')),
- ('7871c8e24da15bad8b0be2c36edc9dc77e37727f',
- os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')),
- ('b5415aa0b64006a95c0c409182e628881d6d6463',
- os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt')),
- ('b5415aa0b64006a95c0c409182e628881d6d6463',
- os.path.join(self.base_path, 'subfolder2', 'subfolder_text.txt')),
- ]
- self.assertEqual(sorted(expected_queue), sorted(self.queue.queue))
- def test_download_worker_single_file(self):
- sha1_hash = self.lorem_ipsum_sha1
- input_filename = '%s/%s' % (self.base_url, sha1_hash)
- output_filename = os.path.join(self.base_path,
- 'uploaded_lorem_ipsum.txt')
- self.gsutil.add_expected(
- 0, '', '',
- lambda: shutil.copyfile(self.lorem_ipsum, output_filename)) # cp
- self.queue.put((sha1_hash, output_filename))
- self.queue.put((None, None))
- stdout_queue = queue.Queue()
- download_from_google_storage._downloader_worker_thread(
- 0, self.queue, False, self.base_url, self.gsutil, stdout_queue,
- self.ret_codes, True, False)
- expected_calls = [('check_call', ('cp', input_filename,
- output_filename))]
- sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
- if sys.platform != 'win32':
- expected_calls.append(
- ('check_call', ('stat', 'gs://sometesturl/' + sha1_hash)))
- expected_output = [
- '0> Downloading %s@%s...' % (output_filename, sha1_hash)
- ]
- expected_ret_codes = []
- self.assertEqual(list(stdout_queue.queue), expected_output)
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
- def test_download_worker_skips_file(self):
- sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe'
- output_filename = os.path.join(self.base_path, 'rootfolder_text.txt')
- self.queue.put((sha1_hash, output_filename))
- self.queue.put((None, None))
- stdout_queue = queue.Queue()
- download_from_google_storage._downloader_worker_thread(
- 0, self.queue, False, self.base_url, self.gsutil, stdout_queue,
- self.ret_codes, True, False)
- # dfgs does not output anything in the no-op case.
- self.assertEqual(list(stdout_queue.queue), [])
- self.assertEqual(self.gsutil.history, [])
- def test_download_extract_archive(self):
- # Generate a gzipped tarfile
- output_filename = os.path.join(self.base_path, 'subfolder.tar.gz')
- output_dirname = os.path.join(self.base_path, 'subfolder')
- extracted_filename = os.path.join(output_dirname, 'subfolder_text.txt')
- with tarfile.open(output_filename, 'w:gz') as tar:
- tar.add(output_dirname, arcname='subfolder')
- shutil.rmtree(output_dirname)
- sha1_hash = download_from_google_storage.get_sha1(output_filename)
- input_filename = '%s/%s' % (self.base_url, sha1_hash)
- # Initial download
- self.queue.put((sha1_hash, output_filename))
- self.queue.put((None, None))
- stdout_queue = queue.Queue()
- download_from_google_storage._downloader_worker_thread(0,
- self.queue,
- True,
- self.base_url,
- self.gsutil,
- stdout_queue,
- self.ret_codes,
- True,
- True,
- delete=False)
- expected_calls = [('check_call', ('cp', input_filename,
- output_filename))]
- if sys.platform != 'win32':
- expected_calls.append(
- ('check_call', ('stat', 'gs://sometesturl/%s' % sha1_hash)))
- expected_output = [
- '0> Downloading %s@%s...' % (output_filename, sha1_hash)
- ]
- expected_output.extend([
- '0> Extracting 3 entries from %s to %s' %
- (output_filename, output_dirname)
- ])
- expected_ret_codes = []
- self.assertEqual(list(stdout_queue.queue), expected_output)
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
- self.assertTrue(os.path.exists(output_dirname))
- self.assertTrue(os.path.exists(extracted_filename))
- # Test noop download
- self.queue.put((sha1_hash, output_filename))
- self.queue.put((None, None))
- stdout_queue = queue.Queue()
- download_from_google_storage._downloader_worker_thread(0,
- self.queue,
- False,
- self.base_url,
- self.gsutil,
- stdout_queue,
- self.ret_codes,
- True,
- True,
- delete=False)
- self.assertEqual(list(stdout_queue.queue), [])
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(list(self.ret_codes.queue), [])
- self.assertTrue(os.path.exists(output_dirname))
- self.assertTrue(os.path.exists(extracted_filename))
- # With dirty flag file, previous extraction wasn't complete
- with open(os.path.join(self.base_path, 'subfolder.tmp'), 'a'):
- pass
- self.queue.put((sha1_hash, output_filename))
- self.queue.put((None, None))
- stdout_queue = queue.Queue()
- download_from_google_storage._downloader_worker_thread(0,
- self.queue,
- False,
- self.base_url,
- self.gsutil,
- stdout_queue,
- self.ret_codes,
- True,
- True,
- delete=False)
- expected_calls += [('check_call', ('cp', input_filename,
- output_filename))]
- if sys.platform != 'win32':
- expected_calls.append(
- ('check_call', ('stat', 'gs://sometesturl/%s' % sha1_hash)))
- expected_output = [
- '0> Detected tmp flag file for %s, re-downloading...' %
- (output_filename),
- '0> Downloading %s@%s...' % (output_filename, sha1_hash),
- '0> Removed %s...' % (output_dirname),
- '0> Extracting 3 entries from %s to %s' %
- (output_filename, output_dirname),
- ]
- expected_ret_codes = []
- self.assertEqual(list(stdout_queue.queue), expected_output)
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
- self.assertTrue(os.path.exists(output_dirname))
- self.assertTrue(os.path.exists(extracted_filename))
- def test_download_worker_skips_not_found_file(self):
- sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
- input_filename = '%s/%s' % (self.base_url, sha1_hash)
- output_filename = os.path.join(self.base_path,
- 'uploaded_lorem_ipsum.txt')
- self.queue.put((sha1_hash, output_filename))
- self.queue.put((None, None))
- stdout_queue = queue.Queue()
- self.gsutil.add_expected(1, '', '') # Return error when 'cp' is called.
- download_from_google_storage._downloader_worker_thread(
- 0, self.queue, False, self.base_url, self.gsutil, stdout_queue,
- self.ret_codes, True, False)
- expected_output = [
- '0> Downloading %s@%s...' % (output_filename, sha1_hash),
- '0> Failed to fetch file %s for %s, skipping. [Err: ]' %
- (input_filename, output_filename),
- ]
- expected_calls = [('check_call', ('cp', input_filename,
- output_filename))]
- expected_ret_codes = [(1, 'Failed to fetch file %s for %s. [Err: ]' %
- (input_filename, output_filename))]
- self.assertEqual(list(stdout_queue.queue), expected_output)
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
- def test_download_cp_fails(self):
- sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
- input_filename = '%s/%s' % (self.base_url, sha1_hash)
- output_filename = os.path.join(self.base_path,
- 'uploaded_lorem_ipsum.txt')
- self.gsutil.add_expected(101, '', 'Test error message.') # cp
- code = download_from_google_storage.download_from_google_storage(
- input_filename=sha1_hash,
- base_url=self.base_url,
- gsutil=self.gsutil,
- num_threads=1,
- directory=False,
- recursive=False,
- force=True,
- output=output_filename,
- ignore_errors=False,
- sha1_file=False,
- verbose=True,
- auto_platform=False,
- extract=False)
- expected_calls = [('check_call', ('cp', input_filename,
- output_filename))]
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(code, 101)
- def test_corrupt_download(self):
- q = queue.Queue()
- out_q = queue.Queue()
- ret_codes = queue.Queue()
- tmp_dir = tempfile.mkdtemp()
- sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
- output_filename = os.path.join(tmp_dir, 'lorem_ipsum.txt')
- q.put(('7871c8e24da15bad8b0be2c36edc9dc77e37727f', output_filename))
- q.put((None, None))
- def _write_bad_file():
- with open(output_filename, 'w') as f:
- f.write('foobar')
- self.gsutil.add_expected(0, '', '', _write_bad_file) # cp
- download_from_google_storage._downloader_worker_thread(
- 1, q, True, self.base_url, self.gsutil, out_q, ret_codes, True,
- False)
- self.assertTrue(q.empty())
- msg = ('1> ERROR remote sha1 (%s) does not match expected sha1 (%s).' %
- ('8843d7f92416211de9ebb963ff4ce28125932878', sha1_hash))
- self.assertEqual(
- out_q.get(),
- '1> Downloading %s@%s...' % (output_filename, sha1_hash))
- self.assertEqual(out_q.get(), msg)
- self.assertEqual(ret_codes.get(), (20, msg))
- self.assertTrue(out_q.empty())
- self.assertTrue(ret_codes.empty())
- def test_download_directory_no_recursive_non_force(self):
- sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
- input_filename = '%s/%s' % (self.base_url, sha1_hash)
- output_filename = os.path.join(self.base_path,
- 'uploaded_lorem_ipsum.txt')
- self.gsutil.add_expected(0, '', '') # version
- self.gsutil.add_expected(
- 0, '', '',
- lambda: shutil.copyfile(self.lorem_ipsum, output_filename)) # cp
- code = download_from_google_storage.download_from_google_storage(
- input_filename=self.base_path,
- base_url=self.base_url,
- gsutil=self.gsutil,
- num_threads=1,
- directory=True,
- recursive=False,
- force=False,
- output=None,
- ignore_errors=False,
- sha1_file=False,
- verbose=True,
- auto_platform=False,
- extract=False)
- expected_calls = [('check_call', ('version', )),
- ('check_call', ('cp', input_filename,
- output_filename))]
- if sys.platform != 'win32':
- expected_calls.append(
- ('check_call',
- ('stat',
- 'gs://sometesturl/7871c8e24da15bad8b0be2c36edc9dc77e37727f')))
- self.assertEqual(self.gsutil.history, expected_calls)
- self.assertEqual(code, 0)
- if __name__ == '__main__':
- unittest.main()
|