subprocess2.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. # coding=utf8
  2. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Collection of subprocess wrapper functions.
  6. In theory you shouldn't need anything else in subprocess, or this module failed.
  7. """
  8. import cStringIO
  9. import errno
  10. import logging
  11. import os
  12. import Queue
  13. import subprocess
  14. import sys
  15. import time
  16. import threading
  17. # Constants forwarded from subprocess.
  18. PIPE = subprocess.PIPE
  19. STDOUT = subprocess.STDOUT
  20. # Sends stdout or stderr to os.devnull.
  21. VOID = object()
  22. # Error code when a process was killed because it timed out.
  23. TIMED_OUT = -2001
  24. # Globals.
  25. # Set to True if you somehow need to disable this hack.
  26. SUBPROCESS_CLEANUP_HACKED = False
  27. class CalledProcessError(subprocess.CalledProcessError):
  28. """Augment the standard exception with more data."""
  29. def __init__(self, returncode, cmd, cwd, stdout, stderr):
  30. super(CalledProcessError, self).__init__(returncode, cmd)
  31. self.stdout = stdout
  32. self.stderr = stderr
  33. self.cwd = cwd
  34. def __str__(self):
  35. out = 'Command %s returned non-zero exit status %s' % (
  36. ' '.join(self.cmd), self.returncode)
  37. if self.cwd:
  38. out += ' in ' + self.cwd
  39. return '\n'.join(filter(None, (out, self.stdout, self.stderr)))
  40. class CygwinRebaseError(CalledProcessError):
  41. """Occurs when cygwin's fork() emulation fails due to rebased dll."""
  42. ## Utility functions
  43. def kill_pid(pid):
  44. """Kills a process by its process id."""
  45. try:
  46. # Unable to import 'module'
  47. # pylint: disable=E1101,F0401
  48. import signal
  49. return os.kill(pid, signal.SIGKILL)
  50. except ImportError:
  51. pass
  52. def kill_win(process):
  53. """Kills a process with its windows handle.
  54. Has no effect on other platforms.
  55. """
  56. try:
  57. # Unable to import 'module'
  58. # pylint: disable=F0401
  59. import win32process
  60. # Access to a protected member _handle of a client class
  61. # pylint: disable=W0212
  62. return win32process.TerminateProcess(process._handle, -1)
  63. except ImportError:
  64. pass
  65. def add_kill():
  66. """Adds kill() method to subprocess.Popen for python <2.6"""
  67. if hasattr(subprocess.Popen, 'kill'):
  68. return
  69. if sys.platform == 'win32':
  70. subprocess.Popen.kill = kill_win
  71. else:
  72. subprocess.Popen.kill = lambda process: kill_pid(process.pid)
  73. def hack_subprocess():
  74. """subprocess functions may throw exceptions when used in multiple threads.
  75. See http://bugs.python.org/issue1731717 for more information.
  76. """
  77. global SUBPROCESS_CLEANUP_HACKED
  78. if not SUBPROCESS_CLEANUP_HACKED and threading.activeCount() != 1:
  79. # Only hack if there is ever multiple threads.
  80. # There is no point to leak with only one thread.
  81. subprocess._cleanup = lambda: None
  82. SUBPROCESS_CLEANUP_HACKED = True
  83. def get_english_env(env):
  84. """Forces LANG and/or LANGUAGE to be English.
  85. Forces encoding to utf-8 for subprocesses.
  86. Returns None if it is unnecessary.
  87. """
  88. if sys.platform == 'win32':
  89. return None
  90. env = env or os.environ
  91. # Test if it is necessary at all.
  92. is_english = lambda name: env.get(name, 'en').startswith('en')
  93. if is_english('LANG') and is_english('LANGUAGE'):
  94. return None
  95. # Requires modifications.
  96. env = env.copy()
  97. def fix_lang(name):
  98. if not is_english(name):
  99. env[name] = 'en_US.UTF-8'
  100. fix_lang('LANG')
  101. fix_lang('LANGUAGE')
  102. return env
  103. class Popen(subprocess.Popen):
  104. """Wraps subprocess.Popen() with various workarounds.
  105. - Forces English output since it's easier to parse the stdout if it is always
  106. in English.
  107. - Sets shell=True on windows by default. You can override this by forcing
  108. shell parameter to a value.
  109. - Adds support for VOID to not buffer when not needed.
  110. - Adds self.start property.
  111. Note: Popen() can throw OSError when cwd or args[0] doesn't exist. Translate
  112. exceptions generated by cygwin when it fails trying to emulate fork().
  113. """
  114. def __init__(self, args, **kwargs):
  115. # Make sure we hack subprocess if necessary.
  116. hack_subprocess()
  117. add_kill()
  118. env = get_english_env(kwargs.get('env'))
  119. if env:
  120. kwargs['env'] = env
  121. if kwargs.get('shell') is None:
  122. # *Sigh*: Windows needs shell=True, or else it won't search %PATH% for
  123. # the executable, but shell=True makes subprocess on Linux fail when it's
  124. # called with a list because it only tries to execute the first item in
  125. # the list.
  126. kwargs['shell'] = bool(sys.platform=='win32')
  127. if isinstance(args, basestring):
  128. tmp_str = args
  129. elif isinstance(args, (list, tuple)):
  130. tmp_str = ' '.join(args)
  131. else:
  132. raise CalledProcessError(None, args, kwargs.get('cwd'), None, None)
  133. if kwargs.get('cwd', None):
  134. tmp_str += '; cwd=%s' % kwargs['cwd']
  135. logging.debug(tmp_str)
  136. self.stdout_cb = None
  137. self.stderr_cb = None
  138. self.stdin_is_void = False
  139. self.stdout_is_void = False
  140. self.stderr_is_void = False
  141. self.cmd_str = tmp_str
  142. if kwargs.get('stdin') is VOID:
  143. kwargs['stdin'] = open(os.devnull, 'r')
  144. self.stdin_is_void = True
  145. for stream in ('stdout', 'stderr'):
  146. if kwargs.get(stream) in (VOID, os.devnull):
  147. kwargs[stream] = open(os.devnull, 'w')
  148. setattr(self, stream + '_is_void', True)
  149. if callable(kwargs.get(stream)):
  150. setattr(self, stream + '_cb', kwargs[stream])
  151. kwargs[stream] = PIPE
  152. self.start = time.time()
  153. self.timeout = None
  154. self.nag_timer = None
  155. self.shell = kwargs.get('shell', None)
  156. # Silence pylint on MacOSX
  157. self.returncode = None
  158. try:
  159. super(Popen, self).__init__(args, **kwargs)
  160. except OSError, e:
  161. if e.errno == errno.EAGAIN and sys.platform == 'cygwin':
  162. # Convert fork() emulation failure into a CygwinRebaseError().
  163. raise CygwinRebaseError(
  164. e.errno,
  165. args,
  166. kwargs.get('cwd'),
  167. None,
  168. 'Visit '
  169. 'http://code.google.com/p/chromium/wiki/CygwinDllRemappingFailure '
  170. 'to learn how to fix this error; you need to rebase your cygwin '
  171. 'dlls')
  172. # Popen() can throw OSError when cwd or args[0] doesn't exist. Let it go
  173. # through
  174. raise
  175. def _tee_threads(self, input): # pylint: disable=W0622
  176. """Does I/O for a process's pipes using threads.
  177. It's the simplest and slowest implementation. Expect very slow behavior.
  178. If there is a callback and it doesn't keep up with the calls, the timeout
  179. effectiveness will be delayed accordingly.
  180. """
  181. # Queue of either of <threadname> when done or (<threadname>, data). In
  182. # theory we would like to limit to ~64kb items to not cause large memory
  183. # usage when the callback blocks. It is not done because it slows down
  184. # processing on OSX10.6 by a factor of 2x, making it even slower than
  185. # Windows! Revisit this decision if it becomes a problem, e.g. crash
  186. # because of memory exhaustion.
  187. queue = Queue.Queue()
  188. done = threading.Event()
  189. timer = []
  190. last_output = [time.time()] * 2
  191. def write_stdin():
  192. try:
  193. stdin_io = cStringIO.StringIO(input)
  194. while True:
  195. data = stdin_io.read(1024)
  196. if data:
  197. self.stdin.write(data)
  198. else:
  199. self.stdin.close()
  200. break
  201. finally:
  202. queue.put('stdin')
  203. def _queue_pipe_read(pipe, name):
  204. """Queues characters read from a pipe into a queue."""
  205. try:
  206. while True:
  207. data = pipe.read(1)
  208. if not data:
  209. break
  210. last_output[0] = time.time()
  211. queue.put((name, data))
  212. finally:
  213. queue.put(name)
  214. def nag_fn():
  215. now = time.time()
  216. if done.is_set():
  217. return
  218. if last_output[0] == last_output[1]:
  219. logging.warn(' No output for %.0f seconds from command:' % (
  220. now - last_output[1]))
  221. logging.warn(' %s' % self.cmd_str)
  222. # Use 0.1 fudge factor in case:
  223. # now ~= last_output[0] + self.nag_timer
  224. sleep_time = self.nag_timer + last_output[0] - now - 0.1
  225. while sleep_time < 0:
  226. sleep_time += self.nag_timer
  227. last_output[1] = last_output[0]
  228. timer[0] = threading.Timer(sleep_time, nag_fn)
  229. timer[0].start()
  230. def timeout_fn():
  231. try:
  232. done.wait(self.timeout)
  233. finally:
  234. queue.put('timeout')
  235. def wait_fn():
  236. try:
  237. self.wait()
  238. finally:
  239. queue.put('wait')
  240. # Starts up to 5 threads:
  241. # Wait for the process to quit
  242. # Read stdout
  243. # Read stderr
  244. # Write stdin
  245. # Timeout
  246. threads = {
  247. 'wait': threading.Thread(target=wait_fn),
  248. }
  249. if self.timeout is not None:
  250. threads['timeout'] = threading.Thread(target=timeout_fn)
  251. if self.stdout_cb:
  252. threads['stdout'] = threading.Thread(
  253. target=_queue_pipe_read, args=(self.stdout, 'stdout'))
  254. if self.stderr_cb:
  255. threads['stderr'] = threading.Thread(
  256. target=_queue_pipe_read, args=(self.stderr, 'stderr'))
  257. if input:
  258. threads['stdin'] = threading.Thread(target=write_stdin)
  259. elif self.stdin:
  260. # Pipe but no input, make sure it's closed.
  261. self.stdin.close()
  262. for t in threads.itervalues():
  263. t.start()
  264. if self.nag_timer:
  265. timer.append(threading.Timer(self.nag_timer, nag_fn))
  266. timer[0].start()
  267. timed_out = False
  268. try:
  269. # This thread needs to be optimized for speed.
  270. while threads:
  271. item = queue.get()
  272. if item[0] == 'stdout':
  273. self.stdout_cb(item[1])
  274. elif item[0] == 'stderr':
  275. self.stderr_cb(item[1])
  276. else:
  277. # A thread terminated.
  278. threads[item].join()
  279. del threads[item]
  280. if item == 'wait':
  281. # Terminate the timeout thread if necessary.
  282. done.set()
  283. elif item == 'timeout' and not timed_out and self.poll() is None:
  284. logging.debug('Timed out after %fs: killing' % self.timeout)
  285. self.kill()
  286. timed_out = True
  287. finally:
  288. # Stop the threads.
  289. done.set()
  290. if timer:
  291. timer[0].cancel()
  292. if 'wait' in threads:
  293. # Accelerate things, otherwise it would hang until the child process is
  294. # done.
  295. logging.debug('Killing child because of an exception')
  296. self.kill()
  297. # Join threads.
  298. for thread in threads.itervalues():
  299. thread.join()
  300. if timed_out:
  301. self.returncode = TIMED_OUT
  302. # pylint: disable=W0221,W0622
  303. def communicate(self, input=None, timeout=None, nag_timer=None):
  304. """Adds timeout and callbacks support.
  305. Returns (stdout, stderr) like subprocess.Popen().communicate().
  306. - The process will be killed after |timeout| seconds and returncode set to
  307. TIMED_OUT.
  308. - If the subprocess runs for |nag_timer| seconds without producing terminal
  309. output, print a warning to stderr.
  310. """
  311. self.timeout = timeout
  312. self.nag_timer = nag_timer
  313. if (not self.timeout and not self.nag_timer and
  314. not self.stdout_cb and not self.stderr_cb):
  315. return super(Popen, self).communicate(input)
  316. if self.timeout and self.shell:
  317. raise TypeError(
  318. 'Using timeout and shell simultaneously will cause a process leak '
  319. 'since the shell will be killed instead of the child process.')
  320. stdout = None
  321. stderr = None
  322. # Convert to a lambda to workaround python's deadlock.
  323. # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
  324. # When the pipe fills up, it would deadlock this process.
  325. if self.stdout and not self.stdout_cb and not self.stdout_is_void:
  326. stdout = []
  327. self.stdout_cb = stdout.append
  328. if self.stderr and not self.stderr_cb and not self.stderr_is_void:
  329. stderr = []
  330. self.stderr_cb = stderr.append
  331. self._tee_threads(input)
  332. if stdout is not None:
  333. stdout = ''.join(stdout)
  334. if stderr is not None:
  335. stderr = ''.join(stderr)
  336. return (stdout, stderr)
  337. def communicate(args, timeout=None, nag_timer=None, **kwargs):
  338. """Wraps subprocess.Popen().communicate() and add timeout support.
  339. Returns ((stdout, stderr), returncode).
  340. - The process will be killed after |timeout| seconds and returncode set to
  341. TIMED_OUT.
  342. - If the subprocess runs for |nag_timer| seconds without producing terminal
  343. output, print a warning to stderr.
  344. - Automatically passes stdin content as input so do not specify stdin=PIPE.
  345. """
  346. stdin = kwargs.pop('stdin', None)
  347. if stdin is not None:
  348. if isinstance(stdin, basestring):
  349. # When stdin is passed as an argument, use it as the actual input data and
  350. # set the Popen() parameter accordingly.
  351. kwargs['stdin'] = PIPE
  352. else:
  353. kwargs['stdin'] = stdin
  354. stdin = None
  355. proc = Popen(args, **kwargs)
  356. if stdin:
  357. return proc.communicate(stdin, timeout, nag_timer), proc.returncode
  358. else:
  359. return proc.communicate(None, timeout, nag_timer), proc.returncode
  360. def call(args, **kwargs):
  361. """Emulates subprocess.call().
  362. Automatically convert stdout=PIPE or stderr=PIPE to VOID.
  363. In no case they can be returned since no code path raises
  364. subprocess2.CalledProcessError.
  365. """
  366. if kwargs.get('stdout') == PIPE:
  367. kwargs['stdout'] = VOID
  368. if kwargs.get('stderr') == PIPE:
  369. kwargs['stderr'] = VOID
  370. return communicate(args, **kwargs)[1]
  371. def check_call_out(args, **kwargs):
  372. """Improved version of subprocess.check_call().
  373. Returns (stdout, stderr), unlike subprocess.check_call().
  374. """
  375. out, returncode = communicate(args, **kwargs)
  376. if returncode:
  377. raise CalledProcessError(
  378. returncode, args, kwargs.get('cwd'), out[0], out[1])
  379. return out
  380. def check_call(args, **kwargs):
  381. """Emulate subprocess.check_call()."""
  382. check_call_out(args, **kwargs)
  383. return 0
  384. def capture(args, **kwargs):
  385. """Captures stdout of a process call and returns it.
  386. Returns stdout.
  387. - Discards returncode.
  388. - Blocks stdin by default if not specified since no output will be visible.
  389. """
  390. kwargs.setdefault('stdin', VOID)
  391. # Like check_output, deny the caller from using stdout arg.
  392. return communicate(args, stdout=PIPE, **kwargs)[0][0]
  393. def check_output(args, **kwargs):
  394. """Emulates subprocess.check_output().
  395. Captures stdout of a process call and returns stdout only.
  396. - Throws if return code is not 0.
  397. - Works even prior to python 2.7.
  398. - Blocks stdin by default if not specified since no output will be visible.
  399. - As per doc, "The stdout argument is not allowed as it is used internally."
  400. """
  401. kwargs.setdefault('stdin', VOID)
  402. if 'stdout' in kwargs:
  403. raise ValueError('stdout argument not allowed, it will be overridden.')
  404. return check_call_out(args, stdout=PIPE, **kwargs)[0]