subprocess2_test.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. #!/usr/bin/env python
  2. # Copyright (c) 2011 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit tests for subprocess2.py."""
  6. import logging
  7. import optparse
  8. import os
  9. import sys
  10. import time
  11. import unittest
  12. try:
  13. import fcntl
  14. except ImportError:
  15. fcntl = None
  16. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  17. import subprocess2
  18. from testing_support import auto_stub
  19. # Method could be a function
  20. # pylint: disable=R0201
  21. def convert_to_crlf(string):
  22. """Unconditionally convert LF to CRLF."""
  23. return string.replace('\n', '\r\n')
  24. def convert_to_cr(string):
  25. """Unconditionally convert LF to CR."""
  26. return string.replace('\n', '\r')
  27. def convert_win(string):
  28. """Converts string to CRLF on Windows only."""
  29. if sys.platform == 'win32':
  30. return string.replace('\n', '\r\n')
  31. return string
  32. class DefaultsTest(auto_stub.TestCase):
  33. # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
  34. # can be trapped in the child process for better coverage.
  35. def _fake_communicate(self):
  36. """Mocks subprocess2.communicate()."""
  37. results = {}
  38. def fake_communicate(args, **kwargs):
  39. assert not results
  40. results.update(kwargs)
  41. results['args'] = args
  42. return ('stdout', 'stderr'), 0
  43. self.mock(subprocess2, 'communicate', fake_communicate)
  44. return results
  45. def _fake_Popen(self):
  46. """Mocks the whole subprocess2.Popen class."""
  47. results = {}
  48. class fake_Popen(object):
  49. returncode = -8
  50. def __init__(self, args, **kwargs):
  51. assert not results
  52. results.update(kwargs)
  53. results['args'] = args
  54. @staticmethod
  55. def communicate():
  56. return None, None
  57. self.mock(subprocess2, 'Popen', fake_Popen)
  58. return results
  59. def _fake_subprocess_Popen(self):
  60. """Mocks the base class subprocess.Popen only."""
  61. results = {}
  62. def __init__(self, args, **kwargs):
  63. assert not results
  64. results.update(kwargs)
  65. results['args'] = args
  66. def communicate():
  67. return None, None
  68. self.mock(subprocess2.subprocess.Popen, '__init__', __init__)
  69. self.mock(subprocess2.subprocess.Popen, 'communicate', communicate)
  70. return results
  71. def test_check_call_defaults(self):
  72. results = self._fake_communicate()
  73. self.assertEquals(
  74. ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True))
  75. expected = {
  76. 'args': ['foo'],
  77. 'a':True,
  78. }
  79. self.assertEquals(expected, results)
  80. def test_capture_defaults(self):
  81. results = self._fake_communicate()
  82. self.assertEquals(
  83. 'stdout', subprocess2.capture(['foo'], a=True))
  84. expected = {
  85. 'args': ['foo'],
  86. 'a':True,
  87. 'stdin': subprocess2.VOID,
  88. 'stdout': subprocess2.PIPE,
  89. }
  90. self.assertEquals(expected, results)
  91. def test_communicate_defaults(self):
  92. results = self._fake_Popen()
  93. self.assertEquals(
  94. ((None, None), -8), subprocess2.communicate(['foo'], a=True))
  95. expected = {
  96. 'args': ['foo'],
  97. 'a': True,
  98. }
  99. self.assertEquals(expected, results)
  100. def test_Popen_defaults(self):
  101. results = self._fake_subprocess_Popen()
  102. proc = subprocess2.Popen(['foo'], a=True)
  103. # Cleanup code in subprocess.py needs this member to be set.
  104. # pylint: disable=W0201
  105. proc._child_created = None
  106. # Since subprocess.Popen.__init__() is not called, proc.returncode shouldn't
  107. # be present.
  108. self.assertFalse(hasattr(proc, 'returncode'))
  109. expected = {
  110. 'args': ['foo'],
  111. 'a': True,
  112. 'shell': bool(sys.platform=='win32'),
  113. }
  114. if sys.platform != 'win32':
  115. env = os.environ.copy()
  116. is_english = lambda name: env.get(name, 'en').startswith('en')
  117. if not is_english('LANG'):
  118. env['LANG'] = 'en_US.UTF-8'
  119. expected['env'] = env
  120. if not is_english('LANGUAGE'):
  121. env['LANGUAGE'] = 'en_US.UTF-8'
  122. expected['env'] = env
  123. self.assertEquals(expected, results)
  124. def test_check_output_defaults(self):
  125. results = self._fake_communicate()
  126. # It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but
  127. # fake_communicate() doesn't 'implement' that.
  128. self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True))
  129. expected = {
  130. 'args': ['foo'],
  131. 'a':True,
  132. 'stdin': subprocess2.VOID,
  133. 'stdout': subprocess2.PIPE,
  134. }
  135. self.assertEquals(expected, results)
  136. class S2Test(unittest.TestCase):
  137. def setUp(self):
  138. super(S2Test, self).setUp()
  139. self.exe_path = __file__
  140. self.exe = [sys.executable, self.exe_path, '--child']
  141. self.states = {}
  142. if fcntl:
  143. for v in (sys.stdin, sys.stdout, sys.stderr):
  144. fileno = v.fileno()
  145. self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
  146. def tearDown(self):
  147. for fileno, fl in self.states.iteritems():
  148. self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
  149. super(S2Test, self).tearDown()
  150. def _run_test(self, function):
  151. """Runs tests in 6 combinations:
  152. - LF output with universal_newlines=False
  153. - CR output with universal_newlines=False
  154. - CRLF output with universal_newlines=False
  155. - LF output with universal_newlines=True
  156. - CR output with universal_newlines=True
  157. - CRLF output with universal_newlines=True
  158. First |function| argument is the convertion for the origianl expected LF
  159. string to the right EOL.
  160. Second |function| argument is the executable and initial flag to run, to
  161. control what EOL is used by the child process.
  162. Third |function| argument is universal_newlines value.
  163. """
  164. noop = lambda x: x
  165. function(noop, self.exe, False)
  166. function(convert_to_cr, self.exe + ['--cr'], False)
  167. function(convert_to_crlf, self.exe + ['--crlf'], False)
  168. function(noop, self.exe, True)
  169. function(noop, self.exe + ['--cr'], True)
  170. function(noop, self.exe + ['--crlf'], True)
  171. def test_timeout(self):
  172. out, returncode = subprocess2.communicate(
  173. self.exe + ['--sleep_first', '--stdout'],
  174. timeout=0.01,
  175. stdout=subprocess2.PIPE,
  176. shell=False)
  177. self.assertEquals(subprocess2.TIMED_OUT, returncode)
  178. self.assertEquals(('', None), out)
  179. def test_check_output_no_stdout(self):
  180. try:
  181. subprocess2.check_output(self.exe, stdout=subprocess2.PIPE)
  182. self.fail()
  183. except TypeError:
  184. pass
  185. def test_stdout_void(self):
  186. def fn(c, e, un):
  187. (out, err), code = subprocess2.communicate(
  188. e + ['--stdout', '--stderr'],
  189. stdout=subprocess2.VOID,
  190. stderr=subprocess2.PIPE,
  191. universal_newlines=un)
  192. self.assertEquals(None, out)
  193. self.assertEquals(c('a\nbb\nccc\n'), err)
  194. self.assertEquals(0, code)
  195. self._run_test(fn)
  196. def test_stderr_void(self):
  197. def fn(c, e, un):
  198. (out, err), code = subprocess2.communicate(
  199. e + ['--stdout', '--stderr'],
  200. stdout=subprocess2.PIPE,
  201. stderr=subprocess2.VOID,
  202. universal_newlines=un)
  203. self.assertEquals(c('A\nBB\nCCC\n'), out)
  204. self.assertEquals(None, err)
  205. self.assertEquals(0, code)
  206. self._run_test(fn)
  207. def test_check_output_redirect_stderr_to_stdout_pipe(self):
  208. def fn(c, e, un):
  209. (out, err), code = subprocess2.communicate(
  210. e + ['--stderr'],
  211. stdout=subprocess2.PIPE,
  212. stderr=subprocess2.STDOUT,
  213. universal_newlines=un)
  214. # stderr output into stdout.
  215. self.assertEquals(c('a\nbb\nccc\n'), out)
  216. self.assertEquals(None, err)
  217. self.assertEquals(0, code)
  218. self._run_test(fn)
  219. def test_check_output_redirect_stderr_to_stdout(self):
  220. def fn(c, e, un):
  221. (out, err), code = subprocess2.communicate(
  222. e + ['--stderr'],
  223. stderr=subprocess2.STDOUT,
  224. universal_newlines=un)
  225. # stderr output into stdout but stdout is not piped.
  226. self.assertEquals(None, out)
  227. self.assertEquals(None, err)
  228. self.assertEquals(0, code)
  229. self._run_test(fn)
  230. def test_check_output_throw_stdout(self):
  231. def fn(c, e, un):
  232. try:
  233. subprocess2.check_output(
  234. e + ['--fail', '--stdout'], universal_newlines=un)
  235. self.fail()
  236. except subprocess2.CalledProcessError, e:
  237. self.assertEquals(c('A\nBB\nCCC\n'), e.stdout)
  238. self.assertEquals(None, e.stderr)
  239. self.assertEquals(64, e.returncode)
  240. self._run_test(fn)
  241. def test_check_output_throw_no_stderr(self):
  242. def fn(c, e, un):
  243. try:
  244. subprocess2.check_output(
  245. e + ['--fail', '--stderr'], universal_newlines=un)
  246. self.fail()
  247. except subprocess2.CalledProcessError, e:
  248. self.assertEquals(c(''), e.stdout)
  249. self.assertEquals(None, e.stderr)
  250. self.assertEquals(64, e.returncode)
  251. self._run_test(fn)
  252. def test_check_output_throw_stderr(self):
  253. def fn(c, e, un):
  254. try:
  255. subprocess2.check_output(
  256. e + ['--fail', '--stderr'], stderr=subprocess2.PIPE,
  257. universal_newlines=un)
  258. self.fail()
  259. except subprocess2.CalledProcessError, e:
  260. self.assertEquals('', e.stdout)
  261. self.assertEquals(c('a\nbb\nccc\n'), e.stderr)
  262. self.assertEquals(64, e.returncode)
  263. self._run_test(fn)
  264. def test_check_output_throw_stderr_stdout(self):
  265. def fn(c, e, un):
  266. try:
  267. subprocess2.check_output(
  268. e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT,
  269. universal_newlines=un)
  270. self.fail()
  271. except subprocess2.CalledProcessError, e:
  272. self.assertEquals(c('a\nbb\nccc\n'), e.stdout)
  273. self.assertEquals(None, e.stderr)
  274. self.assertEquals(64, e.returncode)
  275. self._run_test(fn)
  276. def test_check_call_throw(self):
  277. try:
  278. subprocess2.check_call(self.exe + ['--fail', '--stderr'])
  279. self.fail()
  280. except subprocess2.CalledProcessError, e:
  281. self.assertEquals(None, e.stdout)
  282. self.assertEquals(None, e.stderr)
  283. self.assertEquals(64, e.returncode)
  284. def child_main(args):
  285. if sys.platform == 'win32':
  286. # Annoying, make sure the output is not translated on Windows.
  287. # pylint: disable=E1101,F0401
  288. import msvcrt
  289. msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
  290. msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
  291. parser = optparse.OptionParser()
  292. parser.add_option(
  293. '--fail',
  294. dest='return_value',
  295. action='store_const',
  296. default=0,
  297. const=64)
  298. parser.add_option(
  299. '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
  300. parser.add_option(
  301. '--cr', action='store_const', const='\r', dest='eol')
  302. parser.add_option('--stdout', action='store_true')
  303. parser.add_option('--stderr', action='store_true')
  304. parser.add_option('--sleep_first', action='store_true')
  305. parser.add_option('--sleep_last', action='store_true')
  306. parser.add_option('--large', action='store_true')
  307. parser.add_option('--read', action='store_true')
  308. options, args = parser.parse_args(args)
  309. if args:
  310. parser.error('Internal error')
  311. if options.sleep_first:
  312. time.sleep(10)
  313. def do(string):
  314. if options.stdout:
  315. sys.stdout.write(string.upper())
  316. sys.stdout.write(options.eol)
  317. if options.stderr:
  318. sys.stderr.write(string.lower())
  319. sys.stderr.write(options.eol)
  320. do('A')
  321. do('BB')
  322. do('CCC')
  323. if options.large:
  324. # Print 128kb.
  325. string = '0123456789abcdef' * (8*1024)
  326. sys.stdout.write(string)
  327. if options.read:
  328. try:
  329. while sys.stdin.read():
  330. pass
  331. except OSError:
  332. pass
  333. if options.sleep_last:
  334. time.sleep(10)
  335. return options.return_value
  336. if __name__ == '__main__':
  337. logging.basicConfig(level=
  338. [logging.WARNING, logging.INFO, logging.DEBUG][
  339. min(2, sys.argv.count('-v'))])
  340. if len(sys.argv) > 1 and sys.argv[1] == '--child':
  341. sys.exit(child_main(sys.argv[2:]))
  342. unittest.main()