gclient_utils_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. #!/usr/bin/env vpython3
  2. # coding=utf-8
  3. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. import io
  7. import os
  8. import sys
  9. import unittest
  10. from unittest import mock
  11. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. import gclient_utils
  13. import subprocess2
  14. from testing_support import trial_dir
  15. # TODO: Should fix these warnings.
  16. # pylint: disable=line-too-long
  17. class CheckCallAndFilterTestCase(unittest.TestCase):
  18. class ProcessIdMock(object):
  19. def __init__(self, test_string, return_code=0):
  20. self.stdout = test_string.encode('utf-8')
  21. self.pid = 9284
  22. self.return_code = return_code
  23. def wait(self):
  24. return self.return_code
  25. def PopenMock(self, *args, **kwargs):
  26. kid = self.kids.pop(0)
  27. stdout = kwargs.get('stdout')
  28. os.write(stdout, kid.stdout)
  29. return kid
  30. def setUp(self):
  31. super(CheckCallAndFilterTestCase, self).setUp()
  32. self.printfn = io.StringIO()
  33. self.stdout = io.BytesIO()
  34. self.kids = []
  35. mock.patch('sys.stdout', mock.Mock()).start()
  36. mock.patch('sys.stdout.buffer', self.stdout).start()
  37. mock.patch('sys.stdout.isatty', return_value=False).start()
  38. mock.patch('builtins.print', self.printfn.write).start()
  39. mock.patch('sys.stdout.flush', lambda: None).start()
  40. self.addCleanup(mock.patch.stopall)
  41. @mock.patch('subprocess2.Popen')
  42. def testCheckCallAndFilter(self, mockPopen):
  43. cwd = 'bleh'
  44. args = ['boo', 'foo', 'bar']
  45. test_string = 'ahah\naccb\nallo\naddb\n✔'
  46. self.kids = [self.ProcessIdMock(test_string)]
  47. mockPopen.side_effect = self.PopenMock
  48. line_list = []
  49. result = gclient_utils.CheckCallAndFilter(args,
  50. cwd=cwd,
  51. show_header=True,
  52. always_show_header=True,
  53. filter_fn=line_list.append)
  54. self.assertEqual(result, test_string.encode('utf-8'))
  55. self.assertEqual(line_list, [
  56. '________ running \'boo foo bar\' in \'bleh\'\n', 'ahah', 'accb',
  57. 'allo', 'addb', '✔'
  58. ])
  59. self.assertEqual(self.stdout.getvalue(), b'')
  60. kall = mockPopen.call_args
  61. self.assertEqual(kall.args, (args, ))
  62. self.assertEqual(kall.kwargs['cwd'], cwd)
  63. self.assertEqual(kall.kwargs['stdout'], mock.ANY)
  64. self.assertEqual(kall.kwargs['stderr'], subprocess2.STDOUT)
  65. self.assertEqual(kall.kwargs['bufsize'], 0)
  66. self.assertIn('env', kall.kwargs)
  67. self.assertIn('COLUMNS', kall.kwargs['env'])
  68. self.assertIn('LINES', kall.kwargs['env'])
  69. @mock.patch('time.sleep')
  70. @mock.patch('subprocess2.Popen')
  71. def testCheckCallAndFilter_RetryOnce(self, mockPopen, mockTime):
  72. cwd = 'bleh'
  73. args = ['boo', 'foo', 'bar']
  74. test_string = 'ahah\naccb\nallo\naddb\n✔'
  75. self.kids = [
  76. self.ProcessIdMock(test_string, 1),
  77. self.ProcessIdMock(test_string, 0)
  78. ]
  79. mockPopen.side_effect = self.PopenMock
  80. line_list = []
  81. result = gclient_utils.CheckCallAndFilter(args,
  82. cwd=cwd,
  83. show_header=True,
  84. always_show_header=True,
  85. filter_fn=line_list.append,
  86. retry=True)
  87. self.assertEqual(result, test_string.encode('utf-8'))
  88. self.assertEqual(line_list, [
  89. '________ running \'boo foo bar\' in \'bleh\'\n',
  90. 'ahah',
  91. 'accb',
  92. 'allo',
  93. 'addb',
  94. '✔',
  95. '________ running \'boo foo bar\' in \'bleh\' attempt 2 / 2\n',
  96. 'ahah',
  97. 'accb',
  98. 'allo',
  99. 'addb',
  100. '✔',
  101. ])
  102. mockTime.assert_called_with(gclient_utils.RETRY_INITIAL_SLEEP)
  103. for i in range(2):
  104. kall = mockPopen.mock_calls[i]
  105. self.assertEqual(kall.args, (args, ))
  106. self.assertEqual(kall.kwargs['cwd'], cwd)
  107. self.assertEqual(kall.kwargs['stdout'], mock.ANY)
  108. self.assertEqual(kall.kwargs['stderr'], subprocess2.STDOUT)
  109. self.assertEqual(kall.kwargs['bufsize'], 0)
  110. self.assertIn('env', kall.kwargs)
  111. self.assertIn('COLUMNS', kall.kwargs['env'])
  112. self.assertIn('LINES', kall.kwargs['env'])
  113. self.assertEqual(self.stdout.getvalue(), b'')
  114. self.assertEqual(
  115. self.printfn.getvalue(),
  116. 'WARNING: subprocess \'"boo" "foo" "bar"\' in bleh failed; will retry '
  117. 'after a short nap...')
  118. @mock.patch('subprocess2.Popen')
  119. def testCheckCallAndFilter_PrintStdout(self, mockPopen):
  120. cwd = 'bleh'
  121. args = ['boo', 'foo', 'bar']
  122. test_string = 'ahah\naccb\nallo\naddb\n✔'
  123. self.kids = [self.ProcessIdMock(test_string)]
  124. mockPopen.side_effect = self.PopenMock
  125. result = gclient_utils.CheckCallAndFilter(args,
  126. cwd=cwd,
  127. show_header=True,
  128. always_show_header=True,
  129. print_stdout=True)
  130. self.assertEqual(result, test_string.encode('utf-8'))
  131. self.assertEqual(self.stdout.getvalue().splitlines(), [
  132. b"________ running 'boo foo bar' in 'bleh'",
  133. b'ahah',
  134. b'accb',
  135. b'allo',
  136. b'addb',
  137. b'\xe2\x9c\x94',
  138. ])
  139. class AnnotatedTestCase(unittest.TestCase):
  140. def setUp(self):
  141. self.out = gclient_utils.MakeFileAnnotated(io.BytesIO())
  142. self.annotated = gclient_utils.MakeFileAnnotated(io.BytesIO(),
  143. include_zero=True)
  144. def testWrite(self):
  145. test_cases = [
  146. ('test string\n', b'test string\n'),
  147. (b'test string\n', b'test string\n'),
  148. ('✔\n', b'\xe2\x9c\x94\n'),
  149. (b'\xe2\x9c\x94\n', b'\xe2\x9c\x94\n'),
  150. ('first line\nsecondline\n', b'first line\nsecondline\n'),
  151. (b'first line\nsecondline\n', b'first line\nsecondline\n'),
  152. ]
  153. for test_input, expected_output in test_cases:
  154. out = gclient_utils.MakeFileAnnotated(io.BytesIO())
  155. out.write(test_input)
  156. self.assertEqual(out.getvalue(), expected_output)
  157. def testWrite_Annotated(self):
  158. test_cases = [
  159. ('test string\n', b'0>test string\n'),
  160. (b'test string\n', b'0>test string\n'),
  161. ('✔\n', b'0>\xe2\x9c\x94\n'),
  162. (b'\xe2\x9c\x94\n', b'0>\xe2\x9c\x94\n'),
  163. ('first line\nsecondline\n', b'0>first line\n0>secondline\n'),
  164. (b'first line\nsecondline\n', b'0>first line\n0>secondline\n'),
  165. ]
  166. for test_input, expected_output in test_cases:
  167. out = gclient_utils.MakeFileAnnotated(io.BytesIO(),
  168. include_zero=True)
  169. out.write(test_input)
  170. self.assertEqual(out.getvalue(), expected_output)
  171. def testByteByByteInput(self):
  172. self.out.write(b'\xe2')
  173. self.out.write(b'\x9c')
  174. self.out.write(b'\x94')
  175. self.out.write(b'\n')
  176. self.out.write(b'\xe2')
  177. self.out.write(b'\n')
  178. self.assertEqual(self.out.getvalue(), b'\xe2\x9c\x94\n\xe2\n')
  179. def testByteByByteInput_Annotated(self):
  180. self.annotated.write(b'\xe2')
  181. self.annotated.write(b'\x9c')
  182. self.annotated.write(b'\x94')
  183. self.annotated.write(b'\n')
  184. self.annotated.write(b'\xe2')
  185. self.annotated.write(b'\n')
  186. self.assertEqual(self.annotated.getvalue(), b'0>\xe2\x9c\x94\n0>\xe2\n')
  187. def testFlush_Annotated(self):
  188. self.annotated.write(b'first line\nsecond line')
  189. self.assertEqual(self.annotated.getvalue(), b'0>first line\n')
  190. self.annotated.flush()
  191. self.assertEqual(self.annotated.getvalue(),
  192. b'0>first line\n0>second line\n')
  193. class SplitUrlRevisionTestCase(unittest.TestCase):
  194. def testSSHUrl(self):
  195. url = "ssh://test@example.com/test.git"
  196. rev = "ac345e52dc"
  197. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  198. self.assertEqual(out_rev, None)
  199. self.assertEqual(out_url, url)
  200. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  201. self.assertEqual(out_rev, rev)
  202. self.assertEqual(out_url, url)
  203. url = "ssh://example.com/test.git"
  204. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  205. self.assertEqual(out_rev, None)
  206. self.assertEqual(out_url, url)
  207. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  208. self.assertEqual(out_rev, rev)
  209. self.assertEqual(out_url, url)
  210. url = "ssh://example.com/git/test.git"
  211. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  212. self.assertEqual(out_rev, None)
  213. self.assertEqual(out_url, url)
  214. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  215. self.assertEqual(out_rev, rev)
  216. self.assertEqual(out_url, url)
  217. rev = "test-stable"
  218. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  219. self.assertEqual(out_rev, rev)
  220. self.assertEqual(out_url, url)
  221. url = "ssh://user-name@example.com/~/test.git"
  222. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  223. self.assertEqual(out_rev, None)
  224. self.assertEqual(out_url, url)
  225. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  226. self.assertEqual(out_rev, rev)
  227. self.assertEqual(out_url, url)
  228. url = "ssh://user-name@example.com/~username/test.git"
  229. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  230. self.assertEqual(out_rev, None)
  231. self.assertEqual(out_url, url)
  232. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  233. self.assertEqual(out_rev, rev)
  234. self.assertEqual(out_url, url)
  235. url = "git@github.com:dart-lang/spark.git"
  236. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  237. self.assertEqual(out_rev, None)
  238. self.assertEqual(out_url, url)
  239. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  240. self.assertEqual(out_rev, rev)
  241. self.assertEqual(out_url, url)
  242. def testSVNUrl(self):
  243. url = "svn://example.com/test"
  244. rev = "ac345e52dc"
  245. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  246. self.assertEqual(out_rev, None)
  247. self.assertEqual(out_url, url)
  248. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  249. self.assertEqual(out_rev, rev)
  250. self.assertEqual(out_url, url)
  251. class ExtracRefNameTest(unittest.TestCase):
  252. def testMatchFound(self):
  253. self.assertEqual(
  254. 'main',
  255. gclient_utils.ExtractRefName('origin', 'refs/remote/origin/main'))
  256. self.assertEqual(
  257. '1234', gclient_utils.ExtractRefName('origin', 'refs/tags/1234'))
  258. self.assertEqual(
  259. 'chicken',
  260. gclient_utils.ExtractRefName('origin', 'refs/heads/chicken'))
  261. def testNoMatch(self):
  262. self.assertIsNone(gclient_utils.ExtractRefName('origin', 'abcbbb1234'))
  263. class GClientUtilsTest(trial_dir.TestCase):
  264. def testHardToDelete(self):
  265. # Use the fact that tearDown will delete the directory to make it hard
  266. # to do so.
  267. l1 = os.path.join(self.root_dir, 'l1')
  268. l2 = os.path.join(l1, 'l2')
  269. l3 = os.path.join(l2, 'l3')
  270. f3 = os.path.join(l3, 'f3')
  271. os.mkdir(l1)
  272. os.mkdir(l2)
  273. os.mkdir(l3)
  274. gclient_utils.FileWrite(f3, 'foo')
  275. os.chmod(f3, 0)
  276. os.chmod(l3, 0)
  277. os.chmod(l2, 0)
  278. os.chmod(l1, 0)
  279. def testUpgradeToHttps(self):
  280. values = [
  281. ['', ''],
  282. [None, None],
  283. ['foo', 'https://foo'],
  284. ['http://foo', 'https://foo'],
  285. ['foo/', 'https://foo/'],
  286. ['ssh-svn://foo', 'ssh-svn://foo'],
  287. ['ssh-svn://foo/bar/', 'ssh-svn://foo/bar/'],
  288. ['codereview.chromium.org', 'https://codereview.chromium.org'],
  289. ['codereview.chromium.org/', 'https://codereview.chromium.org/'],
  290. [
  291. 'chromium-review.googlesource.com',
  292. 'https://chromium-review.googlesource.com'
  293. ],
  294. [
  295. 'chromium-review.googlesource.com/',
  296. 'https://chromium-review.googlesource.com/'
  297. ],
  298. ['http://foo:10000', 'http://foo:10000'],
  299. ['http://foo:10000/bar', 'http://foo:10000/bar'],
  300. ['foo:10000', 'http://foo:10000'],
  301. ['foo:', 'https://foo:'],
  302. ]
  303. for content, expected in values:
  304. self.assertEqual(expected, gclient_utils.UpgradeToHttps(content))
  305. def testParseCodereviewSettingsContent(self):
  306. values = [
  307. ['# bleh\n', {}],
  308. ['\t# foo : bar\n', {}],
  309. ['Foo:bar', {
  310. 'Foo': 'bar'
  311. }],
  312. ['Foo:bar:baz\n', {
  313. 'Foo': 'bar:baz'
  314. }],
  315. [' Foo : bar ', {
  316. 'Foo': 'bar'
  317. }],
  318. [' Foo : bar \n', {
  319. 'Foo': 'bar'
  320. }],
  321. ['a:b\n\rc:d\re:f', {
  322. 'a': 'b',
  323. 'c': 'd',
  324. 'e': 'f'
  325. }],
  326. ['an_url:http://value/', {
  327. 'an_url': 'http://value/'
  328. }],
  329. [
  330. 'CODE_REVIEW_SERVER : http://r/s', {
  331. 'CODE_REVIEW_SERVER': 'https://r/s'
  332. }
  333. ],
  334. ['VIEW_VC:http://r/s', {
  335. 'VIEW_VC': 'https://r/s'
  336. }],
  337. ]
  338. for content, expected in values:
  339. self.assertEqual(
  340. expected, gclient_utils.ParseCodereviewSettingsContent(content))
  341. def testFileRead_Bytes(self):
  342. with gclient_utils.temporary_file() as tmp:
  343. gclient_utils.FileWrite(tmp,
  344. b'foo \xe2\x9c bar',
  345. mode='wb',
  346. encoding=None)
  347. self.assertEqual('foo \ufffd bar', gclient_utils.FileRead(tmp))
  348. def testFileRead_Unicode(self):
  349. with gclient_utils.temporary_file() as tmp:
  350. gclient_utils.FileWrite(tmp, 'foo ✔ bar')
  351. self.assertEqual('foo ✔ bar', gclient_utils.FileRead(tmp))
  352. def testTemporaryFile(self):
  353. with gclient_utils.temporary_file() as tmp:
  354. gclient_utils.FileWrite(tmp, 'test')
  355. self.assertEqual('test', gclient_utils.FileRead(tmp))
  356. self.assertFalse(os.path.exists(tmp))
  357. def testMergeConditions(self):
  358. self.assertEqual(None, gclient_utils.merge_conditions(None, None))
  359. self.assertEqual('foo', gclient_utils.merge_conditions('foo', None))
  360. self.assertEqual('foo', gclient_utils.merge_conditions(None, 'foo'))
  361. self.assertEqual('(foo) and (bar)',
  362. gclient_utils.merge_conditions('foo', 'bar'))
  363. self.assertEqual('(foo or bar) and (baz)',
  364. gclient_utils.merge_conditions('foo or bar', 'baz'))
  365. if __name__ == '__main__':
  366. unittest.main()
  367. # vim: ts=2:sw=2:tw=80:et: