123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680 |
- #!/usr/bin/env python
- # Copyright (c) 2012 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Unit tests for subprocess2.py."""
- import logging
- import optparse
- import os
- import sys
- import time
- import unittest
- try:
- import fcntl
- except ImportError:
- fcntl = None
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import subprocess
- import subprocess2
- from testing_support import auto_stub
- # Method could be a function
- # pylint: disable=R0201
- # Create aliases for subprocess2 specific tests. They shouldn't be used for
- # regression tests.
- TIMED_OUT = subprocess2.TIMED_OUT
- VOID = subprocess2.VOID
- PIPE = subprocess2.PIPE
- STDOUT = subprocess2.STDOUT
- def convert_to_crlf(string):
- """Unconditionally convert LF to CRLF."""
- return string.replace('\n', '\r\n')
- def convert_to_cr(string):
- """Unconditionally convert LF to CR."""
- return string.replace('\n', '\r')
- def convert_win(string):
- """Converts string to CRLF on Windows only."""
- if sys.platform == 'win32':
- return string.replace('\n', '\r\n')
- return string
- class DefaultsTest(auto_stub.TestCase):
- # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
- # can be trapped in the child process for better coverage.
- def _fake_communicate(self):
- """Mocks subprocess2.communicate()."""
- results = {}
- def fake_communicate(args, **kwargs):
- assert not results
- results.update(kwargs)
- results['args'] = args
- return ('stdout', 'stderr'), 0
- self.mock(subprocess2, 'communicate', fake_communicate)
- return results
- def _fake_Popen(self):
- """Mocks the whole subprocess2.Popen class."""
- results = {}
- class fake_Popen(object):
- returncode = -8
- def __init__(self, args, **kwargs):
- assert not results
- results.update(kwargs)
- results['args'] = args
- @staticmethod
- # pylint: disable=W0622
- def communicate(input=None, timeout=None, nag_timer=None):
- return None, None
- self.mock(subprocess2, 'Popen', fake_Popen)
- return results
- def _fake_subprocess_Popen(self):
- """Mocks the base class subprocess.Popen only."""
- results = {}
- def __init__(self, args, **kwargs):
- assert not results
- results.update(kwargs)
- results['args'] = args
- def communicate():
- return None, None
- self.mock(subprocess.Popen, '__init__', __init__)
- self.mock(subprocess.Popen, 'communicate', communicate)
- return results
- def test_check_call_defaults(self):
- results = self._fake_communicate()
- self.assertEquals(
- ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True))
- expected = {
- 'args': ['foo'],
- 'a':True,
- }
- self.assertEquals(expected, results)
- def test_capture_defaults(self):
- results = self._fake_communicate()
- self.assertEquals(
- 'stdout', subprocess2.capture(['foo'], a=True))
- expected = {
- 'args': ['foo'],
- 'a':True,
- 'stdin': subprocess2.VOID,
- 'stdout': subprocess2.PIPE,
- }
- self.assertEquals(expected, results)
- def test_communicate_defaults(self):
- results = self._fake_Popen()
- self.assertEquals(
- ((None, None), -8), subprocess2.communicate(['foo'], a=True))
- expected = {
- 'args': ['foo'],
- 'a': True,
- }
- self.assertEquals(expected, results)
- def test_Popen_defaults(self):
- results = self._fake_subprocess_Popen()
- proc = subprocess2.Popen(['foo'], a=True)
- # Cleanup code in subprocess.py needs this member to be set.
- # pylint: disable=W0201
- proc._child_created = None
- expected = {
- 'args': ['foo'],
- 'a': True,
- 'shell': bool(sys.platform=='win32'),
- }
- if sys.platform != 'win32':
- env = os.environ.copy()
- is_english = lambda name: env.get(name, 'en').startswith('en')
- if not is_english('LANG'):
- env['LANG'] = 'en_US.UTF-8'
- expected['env'] = env
- if not is_english('LANGUAGE'):
- env['LANGUAGE'] = 'en_US.UTF-8'
- expected['env'] = env
- self.assertEquals(expected, results)
- self.assertTrue(time.time() >= proc.start)
- def test_check_output_defaults(self):
- results = self._fake_communicate()
- # It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but
- # fake_communicate() doesn't 'implement' that.
- self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True))
- expected = {
- 'args': ['foo'],
- 'a':True,
- 'stdin': subprocess2.VOID,
- 'stdout': subprocess2.PIPE,
- }
- self.assertEquals(expected, results)
- class BaseTestCase(unittest.TestCase):
- def setUp(self):
- super(BaseTestCase, self).setUp()
- self.exe_path = __file__
- self.exe = [sys.executable, self.exe_path, '--child']
- self.states = {}
- if fcntl:
- for v in (sys.stdin, sys.stdout, sys.stderr):
- fileno = v.fileno()
- self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
- def tearDown(self):
- for fileno, fl in self.states.iteritems():
- self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
- super(BaseTestCase, self).tearDown()
- def _check_res(self, res, stdout, stderr, returncode):
- (out, err), code = res
- self.assertEquals(stdout, out)
- self.assertEquals(stderr, err)
- self.assertEquals(returncode, code)
- class RegressionTest(BaseTestCase):
- # Regression tests to ensure that subprocess and subprocess2 have the same
- # behavior.
- def _run_test(self, function):
- """Runs tests in 12 combinations:
- - LF output with universal_newlines=False
- - CR output with universal_newlines=False
- - CRLF output with universal_newlines=False
- - LF output with universal_newlines=True
- - CR output with universal_newlines=True
- - CRLF output with universal_newlines=True
- Once with subprocess, once with subprocess2.
- First |function| argument is the conversion for the original expected LF
- string to the right EOL.
- Second |function| argument is the executable and initial flag to run, to
- control what EOL is used by the child process.
- Third |function| argument is universal_newlines value.
- """
- noop = lambda x: x
- for subp in (subprocess, subprocess2):
- function(noop, self.exe, False, subp)
- function(convert_to_cr, self.exe + ['--cr'], False, subp)
- function(convert_to_crlf, self.exe + ['--crlf'], False, subp)
- function(noop, self.exe, True, subp)
- function(noop, self.exe + ['--cr'], True, subp)
- function(noop, self.exe + ['--crlf'], True, subp)
- def _check_exception(self, subp, e, stdout, stderr, returncode):
- """On exception, look if the exception members are set correctly."""
- self.assertEquals(returncode, e.returncode)
- if subp is subprocess:
- # subprocess never save the output.
- self.assertFalse(hasattr(e, 'stdout'))
- self.assertFalse(hasattr(e, 'stderr'))
- elif subp is subprocess2:
- self.assertEquals(stdout, e.stdout)
- self.assertEquals(stderr, e.stderr)
- else:
- self.fail()
- def test_check_output_no_stdout(self):
- try:
- subprocess2.check_output(self.exe, stdout=subprocess2.PIPE)
- self.fail()
- except ValueError:
- pass
-
- if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27:
- # python 2.7+
- try:
- # pylint: disable=E1101
- subprocess.check_output(self.exe, stdout=subprocess.PIPE)
- self.fail()
- except ValueError:
- pass
- def test_check_output_throw_stdout(self):
- def fn(c, e, un, subp):
- if not hasattr(subp, 'check_output'):
- return
- try:
- subp.check_output(
- e + ['--fail', '--stdout'], universal_newlines=un)
- self.fail()
- except subp.CalledProcessError, exception:
- self._check_exception(subp, exception, c('A\nBB\nCCC\n'), None, 64)
- self._run_test(fn)
- def test_check_output_throw_no_stderr(self):
- def fn(c, e, un, subp):
- if not hasattr(subp, 'check_output'):
- return
- try:
- subp.check_output(
- e + ['--fail', '--stderr'], universal_newlines=un)
- self.fail()
- except subp.CalledProcessError, exception:
- self._check_exception(subp, exception, c(''), None, 64)
- self._run_test(fn)
- def test_check_output_throw_stderr(self):
- def fn(c, e, un, subp):
- if not hasattr(subp, 'check_output'):
- return
- try:
- subp.check_output(
- e + ['--fail', '--stderr'],
- stderr=subp.PIPE,
- universal_newlines=un)
- self.fail()
- except subp.CalledProcessError, exception:
- self._check_exception(subp, exception, '', c('a\nbb\nccc\n'), 64)
- self._run_test(fn)
- def test_check_output_throw_stderr_stdout(self):
- def fn(c, e, un, subp):
- if not hasattr(subp, 'check_output'):
- return
- try:
- subp.check_output(
- e + ['--fail', '--stderr'],
- stderr=subp.STDOUT,
- universal_newlines=un)
- self.fail()
- except subp.CalledProcessError, exception:
- self._check_exception(subp, exception, c('a\nbb\nccc\n'), None, 64)
- self._run_test(fn)
- def test_check_call_throw(self):
- for subp in (subprocess, subprocess2):
- try:
- subp.check_call(self.exe + ['--fail', '--stderr'])
- self.fail()
- except subp.CalledProcessError, exception:
- self._check_exception(subp, exception, None, None, 64)
- def test_redirect_stderr_to_stdout_pipe(self):
- def fn(c, e, un, subp):
- # stderr output into stdout.
- proc = subp.Popen(
- e + ['--stderr'],
- stdout=subp.PIPE,
- stderr=subp.STDOUT,
- universal_newlines=un)
- res = proc.communicate(), proc.returncode
- self._check_res(res, c('a\nbb\nccc\n'), None, 0)
- self._run_test(fn)
- def test_redirect_stderr_to_stdout(self):
- def fn(c, e, un, subp):
- # stderr output into stdout but stdout is not piped.
- proc = subp.Popen(
- e + ['--stderr'], stderr=STDOUT, universal_newlines=un)
- res = proc.communicate(), proc.returncode
- self._check_res(res, None, None, 0)
- self._run_test(fn)
- def test_stderr(self):
- cmd = ['expr', '1', '/', '0']
- p1 = subprocess.Popen(cmd, stderr=subprocess.PIPE)
- p2 = subprocess2.Popen(cmd, stderr=subprocess.PIPE)
- r1 = p1.communicate()
- r2 = p2.communicate(timeout=100)
- self.assertEquals(r1, r2)
- class S2Test(BaseTestCase):
- # Tests that can only run in subprocess2, e.g. new functionalities.
- # In particular, subprocess2.communicate() doesn't exist in subprocess.
- def _run_test(self, function):
- """Runs tests in 6 combinations:
- - LF output with universal_newlines=False
- - CR output with universal_newlines=False
- - CRLF output with universal_newlines=False
- - LF output with universal_newlines=True
- - CR output with universal_newlines=True
- - CRLF output with universal_newlines=True
- First |function| argument is the convertion for the origianl expected LF
- string to the right EOL.
- Second |function| argument is the executable and initial flag to run, to
- control what EOL is used by the child process.
- Third |function| argument is universal_newlines value.
- """
- noop = lambda x: x
- function(noop, self.exe, False)
- function(convert_to_cr, self.exe + ['--cr'], False)
- function(convert_to_crlf, self.exe + ['--crlf'], False)
- function(noop, self.exe, True)
- function(noop, self.exe + ['--cr'], True)
- function(noop, self.exe + ['--crlf'], True)
- def _check_exception(self, e, stdout, stderr, returncode):
- """On exception, look if the exception members are set correctly."""
- self.assertEquals(returncode, e.returncode)
- self.assertEquals(stdout, e.stdout)
- self.assertEquals(stderr, e.stderr)
- def test_timeout(self):
- # timeout doesn't exist in subprocess.
- def fn(c, e, un):
- res = subprocess2.communicate(
- self.exe + ['--sleep_first', '--stdout'],
- timeout=0.01,
- stdout=PIPE,
- shell=False)
- self._check_res(res, '', None, TIMED_OUT)
- self._run_test(fn)
- def test_timeout_shell_throws(self):
- def fn(c, e, un):
- try:
- # With shell=True, it needs a string.
- subprocess2.communicate(' '.join(self.exe), timeout=0.01, shell=True)
- self.fail()
- except TypeError:
- pass
- self._run_test(fn)
- def test_stdin(self):
- def fn(c, e, un):
- stdin = '0123456789'
- res = subprocess2.communicate(
- e + ['--read'],
- stdin=stdin,
- universal_newlines=un)
- self._check_res(res, None, None, 10)
- self._run_test(fn)
- def test_stdin_unicode(self):
- def fn(c, e, un):
- stdin = u'0123456789'
- res = subprocess2.communicate(
- e + ['--read'],
- stdin=stdin,
- universal_newlines=un)
- self._check_res(res, None, None, 10)
- self._run_test(fn)
- def test_stdin_empty(self):
- def fn(c, e, un):
- stdin = ''
- res = subprocess2.communicate(
- e + ['--read'],
- stdin=stdin,
- universal_newlines=un)
- self._check_res(res, None, None, 0)
- self._run_test(fn)
- def test_stdin_void(self):
- res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID)
- self._check_res(res, None, None, 0)
- def test_stdin_void_stdout_timeout(self):
- # Make sure a mix of VOID, PIPE and timeout works.
- def fn(c, e, un):
- res = subprocess2.communicate(
- e + ['--stdout', '--read'],
- stdin=VOID,
- stdout=PIPE,
- timeout=10,
- universal_newlines=un)
- self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
- self._run_test(fn)
- def test_stdout_void(self):
- def fn(c, e, un):
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr'],
- stdout=VOID,
- stderr=PIPE,
- universal_newlines=un)
- self._check_res(res, None, c('a\nbb\nccc\n'), 0)
- self._run_test(fn)
- def test_stderr_void(self):
- def fn(c, e, un):
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr'],
- stdout=PIPE,
- stderr=VOID,
- universal_newlines=un)
- self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
- self._run_test(fn)
- def test_stdout_void_stderr_redirect(self):
- def fn(c, e, un):
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr'],
- stdout=VOID,
- stderr=STDOUT,
- universal_newlines=un)
- self._check_res(res, None, None, 0)
- self._run_test(fn)
- def test_tee_stderr(self):
- def fn(c, e, un):
- stderr = []
- res = subprocess2.communicate(
- e + ['--stderr'], stderr=stderr.append, universal_newlines=un)
- self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
- self._check_res(res, None, None, 0)
- self._run_test(fn)
- def test_tee_stdout_stderr(self):
- def fn(c, e, un):
- stdout = []
- stderr = []
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr'],
- stdout=stdout.append,
- stderr=stderr.append,
- universal_newlines=un)
- self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
- self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
- self._check_res(res, None, None, 0)
- self._run_test(fn)
- def test_tee_stdin(self):
- def fn(c, e, un):
- # Mix of stdin input and stdout callback.
- stdout = []
- stdin = '0123456789'
- res = subprocess2.communicate(
- e + ['--stdout', '--read'],
- stdin=stdin,
- stdout=stdout.append,
- universal_newlines=un)
- self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
- self._check_res(res, None, None, 10)
- self._run_test(fn)
- def test_tee_throw(self):
- def fn(c, e, un):
- # Make sure failure still returns stderr completely.
- stderr = []
- try:
- subprocess2.check_output(
- e + ['--stderr', '--fail'],
- stderr=stderr.append,
- universal_newlines=un)
- self.fail()
- except subprocess2.CalledProcessError, exception:
- self._check_exception(exception, '', None, 64)
- self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
- self._run_test(fn)
- def test_tee_timeout_stdout_void(self):
- def fn(c, e, un):
- stderr = []
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr', '--fail'],
- stdout=VOID,
- stderr=stderr.append,
- shell=False,
- timeout=10,
- universal_newlines=un)
- self._check_res(res, None, None, 64)
- self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
- self._run_test(fn)
- def test_tee_timeout_stderr_void(self):
- def fn(c, e, un):
- stdout = []
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr', '--fail'],
- stdout=stdout.append,
- stderr=VOID,
- shell=False,
- timeout=10,
- universal_newlines=un)
- self._check_res(res, None, None, 64)
- self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
- self._run_test(fn)
- def test_tee_timeout_stderr_stdout(self):
- def fn(c, e, un):
- stdout = []
- res = subprocess2.communicate(
- e + ['--stdout', '--stderr', '--fail'],
- stdout=stdout.append,
- stderr=STDOUT,
- shell=False,
- timeout=10,
- universal_newlines=un)
- self._check_res(res, None, None, 64)
- # Ordering is random due to buffering.
- self.assertEquals(
- set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)),
- set(''.join(stdout).splitlines(True)))
- self._run_test(fn)
- def test_tee_large(self):
- stdout = []
- # Read 128kb. On my workstation it takes >2s. Welcome to 2011.
- res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append)
- self.assertEquals(128*1024, len(''.join(stdout)))
- self._check_res(res, None, None, 0)
- def test_tee_large_stdin(self):
- stdout = []
- # Write 128kb.
- stdin = '0123456789abcdef' * (8*1024)
- res = subprocess2.communicate(
- self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
- self.assertEquals(128*1024, len(''.join(stdout)))
- self._check_res(res, None, None, 0)
- def test_tee_cb_throw(self):
- # Having a callback throwing up should not cause side-effects. It's a bit
- # hard to measure.
- class Blow(Exception):
- pass
- def blow(_):
- raise Blow()
- proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow)
- try:
- proc.communicate()
- self.fail()
- except Blow:
- self.assertNotEquals(0, proc.returncode)
- def test_nag_timer(self):
- w = []
- l = logging.getLogger()
- class _Filter(logging.Filter):
- def filter(self, record):
- if record.levelno == logging.WARNING:
- w.append(record.getMessage().lstrip())
- return 0
- f = _Filter()
- l.addFilter(f)
- proc = subprocess2.Popen(
- self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
- res = proc.communicate(nag_timer=3), proc.returncode
- l.removeFilter(f)
- self._check_res(res, 'A\nBB\nCCC\n', None, 0)
- expected = ['No output for 3 seconds from command:', proc.cmd_str,
- 'No output for 6 seconds from command:', proc.cmd_str,
- 'No output for 9 seconds from command:', proc.cmd_str]
- self.assertEquals(w, expected)
-
- def child_main(args):
- if sys.platform == 'win32':
- # Annoying, make sure the output is not translated on Windows.
- # pylint: disable=E1101,F0401
- import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
- msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
- parser = optparse.OptionParser()
- parser.add_option(
- '--fail',
- dest='return_value',
- action='store_const',
- default=0,
- const=64)
- parser.add_option(
- '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
- parser.add_option(
- '--cr', action='store_const', const='\r', dest='eol')
- parser.add_option('--stdout', action='store_true')
- parser.add_option('--stderr', action='store_true')
- parser.add_option('--sleep_first', action='store_true')
- parser.add_option('--sleep_last', action='store_true')
- parser.add_option('--large', action='store_true')
- parser.add_option('--read', action='store_true')
- options, args = parser.parse_args(args)
- if args:
- parser.error('Internal error')
- if options.sleep_first:
- time.sleep(10)
- def do(string):
- if options.stdout:
- sys.stdout.write(string.upper())
- sys.stdout.write(options.eol)
- if options.stderr:
- sys.stderr.write(string.lower())
- sys.stderr.write(options.eol)
- do('A')
- do('BB')
- do('CCC')
- if options.large:
- # Print 128kb.
- string = '0123456789abcdef' * (8*1024)
- sys.stdout.write(string)
- if options.read:
- assert options.return_value is 0
- try:
- while sys.stdin.read(1):
- options.return_value += 1
- except OSError:
- pass
- if options.sleep_last:
- time.sleep(10)
- return options.return_value
- if __name__ == '__main__':
- logging.basicConfig(level=
- [logging.WARNING, logging.INFO, logging.DEBUG][
- min(2, sys.argv.count('-v'))])
- if len(sys.argv) > 1 and sys.argv[1] == '--child':
- sys.exit(child_main(sys.argv[2:]))
- unittest.main()
|