123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- #!/usr/bin/env python
- # Copyright (c) 2015 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- import codecs
- import copy
- import json
- import os
- import sys
- import unittest
- #import test_env # pylint: disable=relative-import,unused-import
- sys.path.insert(0, os.path.join(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
- 'recipe_modules', 'bot_update', 'resources'))
- import bot_update
- DEFAULT_PARAMS = {
- 'solutions': [{
- 'name': 'somename',
- 'url': 'https://fake.com'
- }],
- 'revisions': [],
- 'first_sln': 'somename',
- 'target_os': None,
- 'target_os_only': None,
- 'patch_root': None,
- 'issue': None,
- 'patchset': None,
- 'rietveld_server': None,
- 'gerrit_repo': None,
- 'gerrit_ref': None,
- 'gerrit_rebase_patch_ref': None,
- 'revision_mapping': {},
- 'apply_issue_email_file': None,
- 'apply_issue_key_file': None,
- 'apply_issue_oauth2_file': None,
- 'shallow': False,
- 'refs': [],
- 'git_cache_dir': '',
- 'gerrit_reset': None,
- }
- class MockedPopen(object):
- """A fake instance of a called subprocess.
- This is meant to be used in conjunction with MockedCall.
- """
- def __init__(self, args=None, kwargs=None):
- self.args = args or []
- self.kwargs = kwargs or {}
- self.return_value = None
- self.fails = False
- def returns(self, rv):
- """Set the return value when this popen is called.
- rv can be a string, or a callable (eg function).
- """
- self.return_value = rv
- return self
- def check(self, args, kwargs):
- """Check to see if the given args/kwargs call match this instance.
- This does a partial match, so that a call to "git clone foo" will match
- this instance if this instance was recorded as "git clone"
- """
- if any(input_arg != expected_arg
- for (input_arg, expected_arg) in zip(args, self.args)):
- return False
- return self.return_value
- def __call__(self, args, kwargs):
- """Actually call this popen instance."""
- if hasattr(self.return_value, '__call__'):
- return self.return_value(*args, **kwargs)
- return self.return_value
- class MockedCall(object):
- """A fake instance of bot_update.call().
- This object is pre-seeded with "answers" in self.expectations. The type
- is a MockedPopen object, or any object with a __call__() and check() method.
- The check() method is used to check to see if the correct popen object is
- chosen (can be a partial match, eg a "git clone" popen module would match
- a "git clone foo" call).
- By default, if no answers have been pre-seeded, the call() returns successful
- with an empty string.
- """
- def __init__(self, fake_filesystem):
- self.expectations = []
- self.records = []
- def expect(self, args=None, kwargs=None):
- args = args or []
- kwargs = kwargs or {}
- popen = MockedPopen(args, kwargs)
- self.expectations.append(popen)
- return popen
- def __call__(self, *args, **kwargs):
- self.records.append((args, kwargs))
- for popen in self.expectations:
- if popen.check(args, kwargs):
- self.expectations.remove(popen)
- return popen(args, kwargs)
- return ''
- class MockedGclientSync():
- """A class producing a callable instance of gclient sync.
- Because for bot_update, gclient sync also emits an output json file, we need
- a callable object that can understand where the output json file is going, and
- emit a (albite) fake file for bot_update to consume.
- """
- def __init__(self, fake_filesystem):
- self.output = {}
- self.fake_filesystem = fake_filesystem
- def __call__(self, *args, **_):
- output_json_index = args.index('--output-json') + 1
- with self.fake_filesystem.open(args[output_json_index], 'w') as f:
- json.dump(self.output, f)
- class FakeFile():
- def __init__(self):
- self.contents = ''
- def write(self, buf):
- self.contents += buf
- def read(self):
- return self.contents
- def __enter__(self):
- return self
- def __exit__(self, _, __, ___):
- pass
- class FakeFilesystem():
- def __init__(self):
- self.files = {}
- def open(self, target, mode='r', encoding=None):
- if 'w' in mode:
- self.files[target] = FakeFile()
- return self.files[target]
- return self.files[target]
- def fake_git(*args, **kwargs):
- return bot_update.call('git', *args, **kwargs)
- class BotUpdateUnittests(unittest.TestCase):
- def setUp(self):
- sys.platform = 'linux2' # For consistency, ya know?
- self.filesystem = FakeFilesystem()
- self.call = MockedCall(self.filesystem)
- self.gclient = MockedGclientSync(self.filesystem)
- self.call.expect(
- (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')
- ).returns(self.gclient)
- self.old_call = getattr(bot_update, 'call')
- self.params = copy.deepcopy(DEFAULT_PARAMS)
- setattr(bot_update, 'call', self.call)
- setattr(bot_update, 'git', fake_git)
- self.old_os_cwd = os.getcwd
- setattr(os, 'getcwd', lambda: '/b/build/slave/foo/build')
- setattr(bot_update, 'open', self.filesystem.open)
- self.old_codecs_open = codecs.open
- setattr(codecs, 'open', self.filesystem.open)
- def tearDown(self):
- setattr(bot_update, 'call', self.old_call)
- setattr(os, 'getcwd', self.old_os_cwd)
- delattr(bot_update, 'open')
- setattr(codecs, 'open', self.old_codecs_open)
- def overrideSetupForWindows(self):
- sys.platform = 'win'
- self.call.expect(
- (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')
- ).returns(self.gclient)
- def testBasic(self):
- bot_update.ensure_checkout(**self.params)
- return self.call.records
- def testBasicShallow(self):
- self.params['shallow'] = True
- bot_update.ensure_checkout(**self.params)
- return self.call.records
- def testBreakLocks(self):
- self.overrideSetupForWindows()
- bot_update.ensure_checkout(**self.params)
- gclient_sync_cmd = None
- for record in self.call.records:
- args = record[0]
- if args[:4] == (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync'):
- gclient_sync_cmd = args
- self.assertTrue('--break_repo_locks' in gclient_sync_cmd)
- def testGitCheckoutBreaksLocks(self):
- self.overrideSetupForWindows()
- path = '/b/build/slave/foo/build/.git'
- lockfile = 'index.lock'
- removed = []
- old_os_walk = os.walk
- old_os_remove = os.remove
- setattr(os, 'walk', lambda _: [(path, None, [lockfile])])
- setattr(os, 'remove', removed.append)
- bot_update.ensure_checkout(**self.params)
- setattr(os, 'walk', old_os_walk)
- setattr(os, 'remove', old_os_remove)
- self.assertTrue(os.path.join(path, lockfile) in removed)
- if __name__ == '__main__':
- unittest.main()
|