gclient_utils_test.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. #!/usr/bin/env vpython3
  2. # coding=utf-8
  3. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. from __future__ import print_function
  7. from __future__ import unicode_literals
  8. import io
  9. import os
  10. import sys
  11. import tempfile
  12. import time
  13. import unittest
  14. if sys.version_info.major == 2:
  15. from StringIO import StringIO
  16. import mock
  17. else:
  18. from io import StringIO
  19. from unittest import mock
  20. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  21. from testing_support import trial_dir
  22. import gclient_utils
  23. import subprocess2
  24. class CheckCallAndFilterTestCase(unittest.TestCase):
  25. class ProcessIdMock(object):
  26. def __init__(self, test_string, return_code=0):
  27. self.stdout = io.BytesIO(test_string.encode('utf-8'))
  28. self.pid = 9284
  29. self.return_code = return_code
  30. def wait(self):
  31. return self.return_code
  32. def setUp(self):
  33. super(CheckCallAndFilterTestCase, self).setUp()
  34. self.printfn = io.StringIO()
  35. self.stdout = io.BytesIO()
  36. if sys.version_info.major == 2:
  37. mock.patch('sys.stdout', self.stdout).start()
  38. mock.patch('__builtin__.print', self.printfn.write).start()
  39. else:
  40. mock.patch('sys.stdout', mock.Mock()).start()
  41. mock.patch('sys.stdout.buffer', self.stdout).start()
  42. mock.patch('builtins.print', self.printfn.write).start()
  43. mock.patch('sys.stdout.flush', lambda: None).start()
  44. self.addCleanup(mock.patch.stopall)
  45. @mock.patch('subprocess2.Popen')
  46. def testCheckCallAndFilter(self, mockPopen):
  47. cwd = 'bleh'
  48. args = ['boo', 'foo', 'bar']
  49. test_string = 'ahah\naccb\nallo\naddb\n✔'
  50. mockPopen.return_value = self.ProcessIdMock(test_string)
  51. line_list = []
  52. result = gclient_utils.CheckCallAndFilter(
  53. args, cwd=cwd, show_header=True, always_show_header=True,
  54. filter_fn=line_list.append)
  55. self.assertEqual(result, test_string.encode('utf-8'))
  56. self.assertEqual(line_list, [
  57. '________ running \'boo foo bar\' in \'bleh\'\n',
  58. 'ahah',
  59. 'accb',
  60. 'allo',
  61. 'addb',
  62. '✔'])
  63. self.assertEqual(self.stdout.getvalue(), b'')
  64. mockPopen.assert_called_with(
  65. args, cwd=cwd, stdout=subprocess2.PIPE, stderr=subprocess2.STDOUT,
  66. bufsize=0)
  67. @mock.patch('time.sleep')
  68. @mock.patch('subprocess2.Popen')
  69. def testCheckCallAndFilter_RetryOnce(self, mockPopen, mockTime):
  70. cwd = 'bleh'
  71. args = ['boo', 'foo', 'bar']
  72. test_string = 'ahah\naccb\nallo\naddb\n✔'
  73. mockPopen.side_effect = [
  74. self.ProcessIdMock(test_string, 1),
  75. self.ProcessIdMock(test_string, 0),
  76. ]
  77. line_list = []
  78. result = gclient_utils.CheckCallAndFilter(
  79. args, cwd=cwd, show_header=True, always_show_header=True,
  80. filter_fn=line_list.append, retry=True)
  81. self.assertEqual(result, test_string.encode('utf-8'))
  82. self.assertEqual(line_list, [
  83. '________ running \'boo foo bar\' in \'bleh\'\n',
  84. 'ahah',
  85. 'accb',
  86. 'allo',
  87. 'addb',
  88. '✔',
  89. '________ running \'boo foo bar\' in \'bleh\' attempt 2 / 4\n',
  90. 'ahah',
  91. 'accb',
  92. 'allo',
  93. 'addb',
  94. '✔',
  95. ])
  96. mockTime.assert_called_with(gclient_utils.RETRY_INITIAL_SLEEP)
  97. self.assertEqual(
  98. mockPopen.mock_calls,
  99. [
  100. mock.call(
  101. args, cwd=cwd, stdout=subprocess2.PIPE,
  102. stderr=subprocess2.STDOUT, bufsize=0),
  103. mock.call(
  104. args, cwd=cwd, stdout=subprocess2.PIPE,
  105. stderr=subprocess2.STDOUT, bufsize=0),
  106. ])
  107. self.assertEqual(self.stdout.getvalue(), b'')
  108. self.assertEqual(
  109. self.printfn.getvalue(),
  110. 'WARNING: subprocess \'"boo" "foo" "bar"\' in bleh failed; will retry '
  111. 'after a short nap...')
  112. @mock.patch('subprocess2.Popen')
  113. def testCheckCallAndFilter_PrintStdout(self, mockPopen):
  114. cwd = 'bleh'
  115. args = ['boo', 'foo', 'bar']
  116. test_string = 'ahah\naccb\nallo\naddb\n✔'
  117. mockPopen.return_value = self.ProcessIdMock(test_string)
  118. result = gclient_utils.CheckCallAndFilter(
  119. args, cwd=cwd, show_header=True, always_show_header=True,
  120. print_stdout=True)
  121. self.assertEqual(result, test_string.encode('utf-8'))
  122. self.assertEqual(self.stdout.getvalue().splitlines(), [
  123. b"________ running 'boo foo bar' in 'bleh'",
  124. b'ahah',
  125. b'accb',
  126. b'allo',
  127. b'addb',
  128. b'\xe2\x9c\x94',
  129. ])
  130. class AnnotatedTestCase(unittest.TestCase):
  131. def setUp(self):
  132. self.out = gclient_utils.MakeFileAnnotated(io.BytesIO())
  133. self.annotated = gclient_utils.MakeFileAnnotated(
  134. io.BytesIO(), include_zero=True)
  135. def testWrite(self):
  136. test_cases = [
  137. ('test string\n', b'test string\n'),
  138. (b'test string\n', b'test string\n'),
  139. ('✔\n', b'\xe2\x9c\x94\n'),
  140. (b'\xe2\x9c\x94\n', b'\xe2\x9c\x94\n'),
  141. ('first line\nsecondline\n', b'first line\nsecondline\n'),
  142. (b'first line\nsecondline\n', b'first line\nsecondline\n'),
  143. ]
  144. for test_input, expected_output in test_cases:
  145. out = gclient_utils.MakeFileAnnotated(io.BytesIO())
  146. out.write(test_input)
  147. self.assertEqual(out.getvalue(), expected_output)
  148. def testWrite_Annotated(self):
  149. test_cases = [
  150. ('test string\n', b'0>test string\n'),
  151. (b'test string\n', b'0>test string\n'),
  152. ('✔\n', b'0>\xe2\x9c\x94\n'),
  153. (b'\xe2\x9c\x94\n', b'0>\xe2\x9c\x94\n'),
  154. ('first line\nsecondline\n', b'0>first line\n0>secondline\n'),
  155. (b'first line\nsecondline\n', b'0>first line\n0>secondline\n'),
  156. ]
  157. for test_input, expected_output in test_cases:
  158. out = gclient_utils.MakeFileAnnotated(io.BytesIO(), include_zero=True)
  159. out.write(test_input)
  160. self.assertEqual(out.getvalue(), expected_output)
  161. def testByteByByteInput(self):
  162. self.out.write(b'\xe2')
  163. self.out.write(b'\x9c')
  164. self.out.write(b'\x94')
  165. self.out.write(b'\n')
  166. self.out.write(b'\xe2')
  167. self.out.write(b'\n')
  168. self.assertEqual(self.out.getvalue(), b'\xe2\x9c\x94\n\xe2\n')
  169. def testByteByByteInput_Annotated(self):
  170. self.annotated.write(b'\xe2')
  171. self.annotated.write(b'\x9c')
  172. self.annotated.write(b'\x94')
  173. self.annotated.write(b'\n')
  174. self.annotated.write(b'\xe2')
  175. self.annotated.write(b'\n')
  176. self.assertEqual(self.annotated.getvalue(), b'0>\xe2\x9c\x94\n0>\xe2\n')
  177. def testFlush_Annotated(self):
  178. self.annotated.write(b'first line\nsecond line')
  179. self.assertEqual(self.annotated.getvalue(), b'0>first line\n')
  180. self.annotated.flush()
  181. self.assertEqual(
  182. self.annotated.getvalue(), b'0>first line\n0>second line\n')
  183. class SplitUrlRevisionTestCase(unittest.TestCase):
  184. def testSSHUrl(self):
  185. url = "ssh://test@example.com/test.git"
  186. rev = "ac345e52dc"
  187. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  188. self.assertEqual(out_rev, None)
  189. self.assertEqual(out_url, url)
  190. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  191. self.assertEqual(out_rev, rev)
  192. self.assertEqual(out_url, url)
  193. url = "ssh://example.com/test.git"
  194. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  195. self.assertEqual(out_rev, None)
  196. self.assertEqual(out_url, url)
  197. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  198. self.assertEqual(out_rev, rev)
  199. self.assertEqual(out_url, url)
  200. url = "ssh://example.com/git/test.git"
  201. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  202. self.assertEqual(out_rev, None)
  203. self.assertEqual(out_url, url)
  204. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  205. self.assertEqual(out_rev, rev)
  206. self.assertEqual(out_url, url)
  207. rev = "test-stable"
  208. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  209. self.assertEqual(out_rev, rev)
  210. self.assertEqual(out_url, url)
  211. url = "ssh://user-name@example.com/~/test.git"
  212. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  213. self.assertEqual(out_rev, None)
  214. self.assertEqual(out_url, url)
  215. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  216. self.assertEqual(out_rev, rev)
  217. self.assertEqual(out_url, url)
  218. url = "ssh://user-name@example.com/~username/test.git"
  219. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  220. self.assertEqual(out_rev, None)
  221. self.assertEqual(out_url, url)
  222. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  223. self.assertEqual(out_rev, rev)
  224. self.assertEqual(out_url, url)
  225. url = "git@github.com:dart-lang/spark.git"
  226. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  227. self.assertEqual(out_rev, None)
  228. self.assertEqual(out_url, url)
  229. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  230. self.assertEqual(out_rev, rev)
  231. self.assertEqual(out_url, url)
  232. def testSVNUrl(self):
  233. url = "svn://example.com/test"
  234. rev = "ac345e52dc"
  235. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  236. self.assertEqual(out_rev, None)
  237. self.assertEqual(out_url, url)
  238. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  239. self.assertEqual(out_rev, rev)
  240. self.assertEqual(out_url, url)
  241. class GClientUtilsTest(trial_dir.TestCase):
  242. def testHardToDelete(self):
  243. # Use the fact that tearDown will delete the directory to make it hard to do
  244. # so.
  245. l1 = os.path.join(self.root_dir, 'l1')
  246. l2 = os.path.join(l1, 'l2')
  247. l3 = os.path.join(l2, 'l3')
  248. f3 = os.path.join(l3, 'f3')
  249. os.mkdir(l1)
  250. os.mkdir(l2)
  251. os.mkdir(l3)
  252. gclient_utils.FileWrite(f3, 'foo')
  253. os.chmod(f3, 0)
  254. os.chmod(l3, 0)
  255. os.chmod(l2, 0)
  256. os.chmod(l1, 0)
  257. def testUpgradeToHttps(self):
  258. values = [
  259. ['', ''],
  260. [None, None],
  261. ['foo', 'https://foo'],
  262. ['http://foo', 'https://foo'],
  263. ['foo/', 'https://foo/'],
  264. ['ssh-svn://foo', 'ssh-svn://foo'],
  265. ['ssh-svn://foo/bar/', 'ssh-svn://foo/bar/'],
  266. ['codereview.chromium.org', 'https://codereview.chromium.org'],
  267. ['codereview.chromium.org/', 'https://codereview.chromium.org/'],
  268. ['http://foo:10000', 'http://foo:10000'],
  269. ['http://foo:10000/bar', 'http://foo:10000/bar'],
  270. ['foo:10000', 'http://foo:10000'],
  271. ['foo:', 'https://foo:'],
  272. ]
  273. for content, expected in values:
  274. self.assertEqual(
  275. expected, gclient_utils.UpgradeToHttps(content))
  276. def testParseCodereviewSettingsContent(self):
  277. values = [
  278. ['# bleh\n', {}],
  279. ['\t# foo : bar\n', {}],
  280. ['Foo:bar', {'Foo': 'bar'}],
  281. ['Foo:bar:baz\n', {'Foo': 'bar:baz'}],
  282. [' Foo : bar ', {'Foo': 'bar'}],
  283. [' Foo : bar \n', {'Foo': 'bar'}],
  284. ['a:b\n\rc:d\re:f', {'a': 'b', 'c': 'd', 'e': 'f'}],
  285. ['an_url:http://value/', {'an_url': 'http://value/'}],
  286. [
  287. 'CODE_REVIEW_SERVER : http://r/s',
  288. {'CODE_REVIEW_SERVER': 'https://r/s'}
  289. ],
  290. ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}],
  291. ]
  292. for content, expected in values:
  293. self.assertEqual(
  294. expected, gclient_utils.ParseCodereviewSettingsContent(content))
  295. def testFileRead_Bytes(self):
  296. with tempfile.NamedTemporaryFile(delete=False) as tmp:
  297. tmp.write(b'foo \xe2\x9c bar')
  298. # NamedTemporaryFiles must be closed on Windows before being opened again.
  299. tmp.close()
  300. try:
  301. self.assertEqual('foo \ufffd bar', gclient_utils.FileRead(tmp.name))
  302. finally:
  303. os.remove(tmp.name)
  304. def testFileRead_Unicode(self):
  305. with tempfile.NamedTemporaryFile(delete=False) as tmp:
  306. tmp.write(b'foo \xe2\x9c\x94 bar')
  307. # NamedTemporaryFiles must be closed on Windows before being opened again.
  308. tmp.close()
  309. try:
  310. self.assertEqual('foo ✔ bar', gclient_utils.FileRead(tmp.name))
  311. finally:
  312. os.remove(tmp.name)
  313. if __name__ == '__main__':
  314. unittest.main()
  315. # vim: ts=2:sw=2:tw=80:et: