subprocess2_test.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. #!/usr/bin/env python
  2. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit tests for subprocess2.py."""
  6. import logging
  7. import optparse
  8. import os
  9. import sys
  10. import time
  11. import unittest
  12. try:
  13. import fcntl
  14. except ImportError:
  15. fcntl = None
  16. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  17. import subprocess
  18. import subprocess2
  19. from testing_support import auto_stub
  20. # Method could be a function
  21. # pylint: disable=R0201
  22. # Create aliases for subprocess2 specific tests. They shouldn't be used for
  23. # regression tests.
  24. TIMED_OUT = subprocess2.TIMED_OUT
  25. VOID = subprocess2.VOID
  26. PIPE = subprocess2.PIPE
  27. STDOUT = subprocess2.STDOUT
  28. def convert_to_crlf(string):
  29. """Unconditionally convert LF to CRLF."""
  30. return string.replace('\n', '\r\n')
  31. def convert_to_cr(string):
  32. """Unconditionally convert LF to CR."""
  33. return string.replace('\n', '\r')
  34. def convert_win(string):
  35. """Converts string to CRLF on Windows only."""
  36. if sys.platform == 'win32':
  37. return string.replace('\n', '\r\n')
  38. return string
  39. class DefaultsTest(auto_stub.TestCase):
  40. # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
  41. # can be trapped in the child process for better coverage.
  42. def _fake_communicate(self):
  43. """Mocks subprocess2.communicate()."""
  44. results = {}
  45. def fake_communicate(args, **kwargs):
  46. assert not results
  47. results.update(kwargs)
  48. results['args'] = args
  49. return ('stdout', 'stderr'), 0
  50. self.mock(subprocess2, 'communicate', fake_communicate)
  51. return results
  52. def _fake_Popen(self):
  53. """Mocks the whole subprocess2.Popen class."""
  54. results = {}
  55. class fake_Popen(object):
  56. returncode = -8
  57. def __init__(self, args, **kwargs):
  58. assert not results
  59. results.update(kwargs)
  60. results['args'] = args
  61. @staticmethod
  62. # pylint: disable=W0622
  63. def communicate(input=None, timeout=None, nag_timer=None):
  64. return None, None
  65. self.mock(subprocess2, 'Popen', fake_Popen)
  66. return results
  67. def _fake_subprocess_Popen(self):
  68. """Mocks the base class subprocess.Popen only."""
  69. results = {}
  70. def __init__(self, args, **kwargs):
  71. assert not results
  72. results.update(kwargs)
  73. results['args'] = args
  74. def communicate():
  75. return None, None
  76. self.mock(subprocess.Popen, '__init__', __init__)
  77. self.mock(subprocess.Popen, 'communicate', communicate)
  78. return results
  79. def test_check_call_defaults(self):
  80. results = self._fake_communicate()
  81. self.assertEquals(
  82. ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True))
  83. expected = {
  84. 'args': ['foo'],
  85. 'a':True,
  86. }
  87. self.assertEquals(expected, results)
  88. def test_capture_defaults(self):
  89. results = self._fake_communicate()
  90. self.assertEquals(
  91. 'stdout', subprocess2.capture(['foo'], a=True))
  92. expected = {
  93. 'args': ['foo'],
  94. 'a':True,
  95. 'stdin': subprocess2.VOID,
  96. 'stdout': subprocess2.PIPE,
  97. }
  98. self.assertEquals(expected, results)
  99. def test_communicate_defaults(self):
  100. results = self._fake_Popen()
  101. self.assertEquals(
  102. ((None, None), -8), subprocess2.communicate(['foo'], a=True))
  103. expected = {
  104. 'args': ['foo'],
  105. 'a': True,
  106. }
  107. self.assertEquals(expected, results)
  108. def test_Popen_defaults(self):
  109. results = self._fake_subprocess_Popen()
  110. proc = subprocess2.Popen(['foo'], a=True)
  111. # Cleanup code in subprocess.py needs this member to be set.
  112. # pylint: disable=W0201
  113. proc._child_created = None
  114. expected = {
  115. 'args': ['foo'],
  116. 'a': True,
  117. 'shell': bool(sys.platform=='win32'),
  118. }
  119. if sys.platform != 'win32':
  120. env = os.environ.copy()
  121. is_english = lambda name: env.get(name, 'en').startswith('en')
  122. if not is_english('LANG'):
  123. env['LANG'] = 'en_US.UTF-8'
  124. expected['env'] = env
  125. if not is_english('LANGUAGE'):
  126. env['LANGUAGE'] = 'en_US.UTF-8'
  127. expected['env'] = env
  128. self.assertEquals(expected, results)
  129. self.assertTrue(time.time() >= proc.start)
  130. def test_check_output_defaults(self):
  131. results = self._fake_communicate()
  132. # It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but
  133. # fake_communicate() doesn't 'implement' that.
  134. self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True))
  135. expected = {
  136. 'args': ['foo'],
  137. 'a':True,
  138. 'stdin': subprocess2.VOID,
  139. 'stdout': subprocess2.PIPE,
  140. }
  141. self.assertEquals(expected, results)
  142. class BaseTestCase(unittest.TestCase):
  143. def setUp(self):
  144. super(BaseTestCase, self).setUp()
  145. self.exe_path = __file__
  146. self.exe = [sys.executable, self.exe_path, '--child']
  147. self.states = {}
  148. if fcntl:
  149. for v in (sys.stdin, sys.stdout, sys.stderr):
  150. fileno = v.fileno()
  151. self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
  152. def tearDown(self):
  153. for fileno, fl in self.states.iteritems():
  154. self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
  155. super(BaseTestCase, self).tearDown()
  156. def _check_res(self, res, stdout, stderr, returncode):
  157. (out, err), code = res
  158. self.assertEquals(stdout, out)
  159. self.assertEquals(stderr, err)
  160. self.assertEquals(returncode, code)
  161. class RegressionTest(BaseTestCase):
  162. # Regression tests to ensure that subprocess and subprocess2 have the same
  163. # behavior.
  164. def _run_test(self, function):
  165. """Runs tests in 12 combinations:
  166. - LF output with universal_newlines=False
  167. - CR output with universal_newlines=False
  168. - CRLF output with universal_newlines=False
  169. - LF output with universal_newlines=True
  170. - CR output with universal_newlines=True
  171. - CRLF output with universal_newlines=True
  172. Once with subprocess, once with subprocess2.
  173. First |function| argument is the conversion for the original expected LF
  174. string to the right EOL.
  175. Second |function| argument is the executable and initial flag to run, to
  176. control what EOL is used by the child process.
  177. Third |function| argument is universal_newlines value.
  178. """
  179. noop = lambda x: x
  180. for subp in (subprocess, subprocess2):
  181. function(noop, self.exe, False, subp)
  182. function(convert_to_cr, self.exe + ['--cr'], False, subp)
  183. function(convert_to_crlf, self.exe + ['--crlf'], False, subp)
  184. function(noop, self.exe, True, subp)
  185. function(noop, self.exe + ['--cr'], True, subp)
  186. function(noop, self.exe + ['--crlf'], True, subp)
  187. def _check_exception(self, subp, e, stdout, stderr, returncode):
  188. """On exception, look if the exception members are set correctly."""
  189. self.assertEquals(returncode, e.returncode)
  190. if subp is subprocess:
  191. # subprocess never save the output.
  192. self.assertFalse(hasattr(e, 'stdout'))
  193. self.assertFalse(hasattr(e, 'stderr'))
  194. elif subp is subprocess2:
  195. self.assertEquals(stdout, e.stdout)
  196. self.assertEquals(stderr, e.stderr)
  197. else:
  198. self.fail()
  199. def test_check_output_no_stdout(self):
  200. try:
  201. subprocess2.check_output(self.exe, stdout=subprocess2.PIPE)
  202. self.fail()
  203. except ValueError:
  204. pass
  205. if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27:
  206. # python 2.7+
  207. try:
  208. # pylint: disable=E1101
  209. subprocess.check_output(self.exe, stdout=subprocess.PIPE)
  210. self.fail()
  211. except ValueError:
  212. pass
  213. def test_check_output_throw_stdout(self):
  214. def fn(c, e, un, subp):
  215. if not hasattr(subp, 'check_output'):
  216. return
  217. try:
  218. subp.check_output(
  219. e + ['--fail', '--stdout'], universal_newlines=un)
  220. self.fail()
  221. except subp.CalledProcessError, exception:
  222. self._check_exception(subp, exception, c('A\nBB\nCCC\n'), None, 64)
  223. self._run_test(fn)
  224. def test_check_output_throw_no_stderr(self):
  225. def fn(c, e, un, subp):
  226. if not hasattr(subp, 'check_output'):
  227. return
  228. try:
  229. subp.check_output(
  230. e + ['--fail', '--stderr'], universal_newlines=un)
  231. self.fail()
  232. except subp.CalledProcessError, exception:
  233. self._check_exception(subp, exception, c(''), None, 64)
  234. self._run_test(fn)
  235. def test_check_output_throw_stderr(self):
  236. def fn(c, e, un, subp):
  237. if not hasattr(subp, 'check_output'):
  238. return
  239. try:
  240. subp.check_output(
  241. e + ['--fail', '--stderr'],
  242. stderr=subp.PIPE,
  243. universal_newlines=un)
  244. self.fail()
  245. except subp.CalledProcessError, exception:
  246. self._check_exception(subp, exception, '', c('a\nbb\nccc\n'), 64)
  247. self._run_test(fn)
  248. def test_check_output_throw_stderr_stdout(self):
  249. def fn(c, e, un, subp):
  250. if not hasattr(subp, 'check_output'):
  251. return
  252. try:
  253. subp.check_output(
  254. e + ['--fail', '--stderr'],
  255. stderr=subp.STDOUT,
  256. universal_newlines=un)
  257. self.fail()
  258. except subp.CalledProcessError, exception:
  259. self._check_exception(subp, exception, c('a\nbb\nccc\n'), None, 64)
  260. self._run_test(fn)
  261. def test_check_call_throw(self):
  262. for subp in (subprocess, subprocess2):
  263. try:
  264. subp.check_call(self.exe + ['--fail', '--stderr'])
  265. self.fail()
  266. except subp.CalledProcessError, exception:
  267. self._check_exception(subp, exception, None, None, 64)
  268. def test_redirect_stderr_to_stdout_pipe(self):
  269. def fn(c, e, un, subp):
  270. # stderr output into stdout.
  271. proc = subp.Popen(
  272. e + ['--stderr'],
  273. stdout=subp.PIPE,
  274. stderr=subp.STDOUT,
  275. universal_newlines=un)
  276. res = proc.communicate(), proc.returncode
  277. self._check_res(res, c('a\nbb\nccc\n'), None, 0)
  278. self._run_test(fn)
  279. def test_redirect_stderr_to_stdout(self):
  280. def fn(c, e, un, subp):
  281. # stderr output into stdout but stdout is not piped.
  282. proc = subp.Popen(
  283. e + ['--stderr'], stderr=STDOUT, universal_newlines=un)
  284. res = proc.communicate(), proc.returncode
  285. self._check_res(res, None, None, 0)
  286. self._run_test(fn)
  287. def test_stderr(self):
  288. cmd = ['expr', '1', '/', '0']
  289. p1 = subprocess.Popen(cmd, stderr=subprocess.PIPE)
  290. p2 = subprocess2.Popen(cmd, stderr=subprocess.PIPE)
  291. r1 = p1.communicate()
  292. r2 = p2.communicate(timeout=100)
  293. self.assertEquals(r1, r2)
  294. class S2Test(BaseTestCase):
  295. # Tests that can only run in subprocess2, e.g. new functionalities.
  296. # In particular, subprocess2.communicate() doesn't exist in subprocess.
  297. def _run_test(self, function):
  298. """Runs tests in 6 combinations:
  299. - LF output with universal_newlines=False
  300. - CR output with universal_newlines=False
  301. - CRLF output with universal_newlines=False
  302. - LF output with universal_newlines=True
  303. - CR output with universal_newlines=True
  304. - CRLF output with universal_newlines=True
  305. First |function| argument is the convertion for the origianl expected LF
  306. string to the right EOL.
  307. Second |function| argument is the executable and initial flag to run, to
  308. control what EOL is used by the child process.
  309. Third |function| argument is universal_newlines value.
  310. """
  311. noop = lambda x: x
  312. function(noop, self.exe, False)
  313. function(convert_to_cr, self.exe + ['--cr'], False)
  314. function(convert_to_crlf, self.exe + ['--crlf'], False)
  315. function(noop, self.exe, True)
  316. function(noop, self.exe + ['--cr'], True)
  317. function(noop, self.exe + ['--crlf'], True)
  318. def _check_exception(self, e, stdout, stderr, returncode):
  319. """On exception, look if the exception members are set correctly."""
  320. self.assertEquals(returncode, e.returncode)
  321. self.assertEquals(stdout, e.stdout)
  322. self.assertEquals(stderr, e.stderr)
  323. def test_timeout(self):
  324. # timeout doesn't exist in subprocess.
  325. def fn(c, e, un):
  326. res = subprocess2.communicate(
  327. self.exe + ['--sleep_first', '--stdout'],
  328. timeout=0.01,
  329. stdout=PIPE,
  330. shell=False)
  331. self._check_res(res, '', None, TIMED_OUT)
  332. self._run_test(fn)
  333. def test_timeout_shell_throws(self):
  334. def fn(c, e, un):
  335. try:
  336. # With shell=True, it needs a string.
  337. subprocess2.communicate(' '.join(self.exe), timeout=0.01, shell=True)
  338. self.fail()
  339. except TypeError:
  340. pass
  341. self._run_test(fn)
  342. def test_stdin(self):
  343. def fn(c, e, un):
  344. stdin = '0123456789'
  345. res = subprocess2.communicate(
  346. e + ['--read'],
  347. stdin=stdin,
  348. universal_newlines=un)
  349. self._check_res(res, None, None, 10)
  350. self._run_test(fn)
  351. def test_stdin_unicode(self):
  352. def fn(c, e, un):
  353. stdin = u'0123456789'
  354. res = subprocess2.communicate(
  355. e + ['--read'],
  356. stdin=stdin,
  357. universal_newlines=un)
  358. self._check_res(res, None, None, 10)
  359. self._run_test(fn)
  360. def test_stdin_empty(self):
  361. def fn(c, e, un):
  362. stdin = ''
  363. res = subprocess2.communicate(
  364. e + ['--read'],
  365. stdin=stdin,
  366. universal_newlines=un)
  367. self._check_res(res, None, None, 0)
  368. self._run_test(fn)
  369. def test_stdin_void(self):
  370. res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID)
  371. self._check_res(res, None, None, 0)
  372. def test_stdin_void_stdout_timeout(self):
  373. # Make sure a mix of VOID, PIPE and timeout works.
  374. def fn(c, e, un):
  375. res = subprocess2.communicate(
  376. e + ['--stdout', '--read'],
  377. stdin=VOID,
  378. stdout=PIPE,
  379. timeout=10,
  380. universal_newlines=un)
  381. self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
  382. self._run_test(fn)
  383. def test_stdout_void(self):
  384. def fn(c, e, un):
  385. res = subprocess2.communicate(
  386. e + ['--stdout', '--stderr'],
  387. stdout=VOID,
  388. stderr=PIPE,
  389. universal_newlines=un)
  390. self._check_res(res, None, c('a\nbb\nccc\n'), 0)
  391. self._run_test(fn)
  392. def test_stderr_void(self):
  393. def fn(c, e, un):
  394. res = subprocess2.communicate(
  395. e + ['--stdout', '--stderr'],
  396. stdout=PIPE,
  397. stderr=VOID,
  398. universal_newlines=un)
  399. self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
  400. self._run_test(fn)
  401. def test_stdout_void_stderr_redirect(self):
  402. def fn(c, e, un):
  403. res = subprocess2.communicate(
  404. e + ['--stdout', '--stderr'],
  405. stdout=VOID,
  406. stderr=STDOUT,
  407. universal_newlines=un)
  408. self._check_res(res, None, None, 0)
  409. self._run_test(fn)
  410. def test_tee_stderr(self):
  411. def fn(c, e, un):
  412. stderr = []
  413. res = subprocess2.communicate(
  414. e + ['--stderr'], stderr=stderr.append, universal_newlines=un)
  415. self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
  416. self._check_res(res, None, None, 0)
  417. self._run_test(fn)
  418. def test_tee_stdout_stderr(self):
  419. def fn(c, e, un):
  420. stdout = []
  421. stderr = []
  422. res = subprocess2.communicate(
  423. e + ['--stdout', '--stderr'],
  424. stdout=stdout.append,
  425. stderr=stderr.append,
  426. universal_newlines=un)
  427. self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
  428. self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
  429. self._check_res(res, None, None, 0)
  430. self._run_test(fn)
  431. def test_tee_stdin(self):
  432. def fn(c, e, un):
  433. # Mix of stdin input and stdout callback.
  434. stdout = []
  435. stdin = '0123456789'
  436. res = subprocess2.communicate(
  437. e + ['--stdout', '--read'],
  438. stdin=stdin,
  439. stdout=stdout.append,
  440. universal_newlines=un)
  441. self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
  442. self._check_res(res, None, None, 10)
  443. self._run_test(fn)
  444. def test_tee_throw(self):
  445. def fn(c, e, un):
  446. # Make sure failure still returns stderr completely.
  447. stderr = []
  448. try:
  449. subprocess2.check_output(
  450. e + ['--stderr', '--fail'],
  451. stderr=stderr.append,
  452. universal_newlines=un)
  453. self.fail()
  454. except subprocess2.CalledProcessError, exception:
  455. self._check_exception(exception, '', None, 64)
  456. self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
  457. self._run_test(fn)
  458. def test_tee_timeout_stdout_void(self):
  459. def fn(c, e, un):
  460. stderr = []
  461. res = subprocess2.communicate(
  462. e + ['--stdout', '--stderr', '--fail'],
  463. stdout=VOID,
  464. stderr=stderr.append,
  465. shell=False,
  466. timeout=10,
  467. universal_newlines=un)
  468. self._check_res(res, None, None, 64)
  469. self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
  470. self._run_test(fn)
  471. def test_tee_timeout_stderr_void(self):
  472. def fn(c, e, un):
  473. stdout = []
  474. res = subprocess2.communicate(
  475. e + ['--stdout', '--stderr', '--fail'],
  476. stdout=stdout.append,
  477. stderr=VOID,
  478. shell=False,
  479. timeout=10,
  480. universal_newlines=un)
  481. self._check_res(res, None, None, 64)
  482. self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
  483. self._run_test(fn)
  484. def test_tee_timeout_stderr_stdout(self):
  485. def fn(c, e, un):
  486. stdout = []
  487. res = subprocess2.communicate(
  488. e + ['--stdout', '--stderr', '--fail'],
  489. stdout=stdout.append,
  490. stderr=STDOUT,
  491. shell=False,
  492. timeout=10,
  493. universal_newlines=un)
  494. self._check_res(res, None, None, 64)
  495. # Ordering is random due to buffering.
  496. self.assertEquals(
  497. set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)),
  498. set(''.join(stdout).splitlines(True)))
  499. self._run_test(fn)
  500. def test_tee_large(self):
  501. stdout = []
  502. # Read 128kb. On my workstation it takes >2s. Welcome to 2011.
  503. res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append)
  504. self.assertEquals(128*1024, len(''.join(stdout)))
  505. self._check_res(res, None, None, 0)
  506. def test_tee_large_stdin(self):
  507. stdout = []
  508. # Write 128kb.
  509. stdin = '0123456789abcdef' * (8*1024)
  510. res = subprocess2.communicate(
  511. self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
  512. self.assertEquals(128*1024, len(''.join(stdout)))
  513. self._check_res(res, None, None, 0)
  514. def test_tee_cb_throw(self):
  515. # Having a callback throwing up should not cause side-effects. It's a bit
  516. # hard to measure.
  517. class Blow(Exception):
  518. pass
  519. def blow(_):
  520. raise Blow()
  521. proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow)
  522. try:
  523. proc.communicate()
  524. self.fail()
  525. except Blow:
  526. self.assertNotEquals(0, proc.returncode)
  527. def test_nag_timer(self):
  528. w = []
  529. l = logging.getLogger()
  530. class _Filter(logging.Filter):
  531. def filter(self, record):
  532. if record.levelno == logging.WARNING:
  533. w.append(record.getMessage().lstrip())
  534. return 0
  535. f = _Filter()
  536. l.addFilter(f)
  537. proc = subprocess2.Popen(
  538. self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
  539. res = proc.communicate(nag_timer=3), proc.returncode
  540. l.removeFilter(f)
  541. self._check_res(res, 'A\nBB\nCCC\n', None, 0)
  542. expected = ['No output for 3 seconds from command:', proc.cmd_str,
  543. 'No output for 6 seconds from command:', proc.cmd_str,
  544. 'No output for 9 seconds from command:', proc.cmd_str]
  545. self.assertEquals(w, expected)
  546. def child_main(args):
  547. if sys.platform == 'win32':
  548. # Annoying, make sure the output is not translated on Windows.
  549. # pylint: disable=E1101,F0401
  550. import msvcrt
  551. msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
  552. msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
  553. parser = optparse.OptionParser()
  554. parser.add_option(
  555. '--fail',
  556. dest='return_value',
  557. action='store_const',
  558. default=0,
  559. const=64)
  560. parser.add_option(
  561. '--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
  562. parser.add_option(
  563. '--cr', action='store_const', const='\r', dest='eol')
  564. parser.add_option('--stdout', action='store_true')
  565. parser.add_option('--stderr', action='store_true')
  566. parser.add_option('--sleep_first', action='store_true')
  567. parser.add_option('--sleep_last', action='store_true')
  568. parser.add_option('--large', action='store_true')
  569. parser.add_option('--read', action='store_true')
  570. options, args = parser.parse_args(args)
  571. if args:
  572. parser.error('Internal error')
  573. if options.sleep_first:
  574. time.sleep(10)
  575. def do(string):
  576. if options.stdout:
  577. sys.stdout.write(string.upper())
  578. sys.stdout.write(options.eol)
  579. if options.stderr:
  580. sys.stderr.write(string.lower())
  581. sys.stderr.write(options.eol)
  582. do('A')
  583. do('BB')
  584. do('CCC')
  585. if options.large:
  586. # Print 128kb.
  587. string = '0123456789abcdef' * (8*1024)
  588. sys.stdout.write(string)
  589. if options.read:
  590. assert options.return_value is 0
  591. try:
  592. while sys.stdin.read(1):
  593. options.return_value += 1
  594. except OSError:
  595. pass
  596. if options.sleep_last:
  597. time.sleep(10)
  598. return options.return_value
  599. if __name__ == '__main__':
  600. logging.basicConfig(level=
  601. [logging.WARNING, logging.INFO, logging.DEBUG][
  602. min(2, sys.argv.count('-v'))])
  603. if len(sys.argv) > 1 and sys.argv[1] == '--child':
  604. sys.exit(child_main(sys.argv[2:]))
  605. unittest.main()