gclient_utils_test.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. #!/usr/bin/env vpython3
  2. # coding=utf-8
  3. # Copyright (c) 2012 The Chromium Authors. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. import io
  7. import os
  8. import sys
  9. import unittest
  10. from unittest import mock
  11. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. import gclient_utils
  13. import subprocess2
  14. from testing_support import trial_dir
  15. # TODO: Should fix these warnings.
  16. # pylint: disable=line-too-long
  17. class CheckCallAndFilterTestCase(unittest.TestCase):
  18. class ProcessIdMock(object):
  19. def __init__(self, test_string, return_code=0):
  20. self.stdout = test_string.encode('utf-8')
  21. self.pid = 9284
  22. self.return_code = return_code
  23. def wait(self):
  24. return self.return_code
  25. def PopenMock(self, *args, **kwargs):
  26. kid = self.kids.pop(0)
  27. stdout = kwargs.get('stdout')
  28. os.write(stdout, kid.stdout)
  29. return kid
  30. def setUp(self):
  31. super(CheckCallAndFilterTestCase, self).setUp()
  32. self.printfn = io.StringIO()
  33. self.stdout = io.BytesIO()
  34. self.kids = []
  35. mock.patch('sys.stdout', mock.Mock()).start()
  36. mock.patch('sys.stdout.buffer', self.stdout).start()
  37. mock.patch('sys.stdout.isatty', return_value=False).start()
  38. mock.patch('builtins.print', self.printfn.write).start()
  39. mock.patch('sys.stdout.flush', lambda: None).start()
  40. self.addCleanup(mock.patch.stopall)
  41. @mock.patch('subprocess2.Popen')
  42. def testCheckCallAndFilter(self, mockPopen):
  43. cwd = 'bleh'
  44. args = ['boo', 'foo', 'bar']
  45. test_string = 'ahah\naccb\nallo\naddb\n✔'
  46. self.kids = [self.ProcessIdMock(test_string)]
  47. mockPopen.side_effect = self.PopenMock
  48. line_list = []
  49. result = gclient_utils.CheckCallAndFilter(args,
  50. cwd=cwd,
  51. show_header=True,
  52. always_show_header=True,
  53. filter_fn=line_list.append)
  54. self.assertEqual(result, test_string.encode('utf-8'))
  55. self.assertEqual(line_list, [
  56. '________ running \'boo foo bar\' in \'bleh\'\n', 'ahah', 'accb',
  57. 'allo', 'addb', '✔'
  58. ])
  59. self.assertEqual(self.stdout.getvalue(), b'')
  60. mockPopen.assert_called_with(args,
  61. cwd=cwd,
  62. stdout=mock.ANY,
  63. stderr=subprocess2.STDOUT,
  64. bufsize=0)
  65. @mock.patch('time.sleep')
  66. @mock.patch('subprocess2.Popen')
  67. def testCheckCallAndFilter_RetryOnce(self, mockPopen, mockTime):
  68. cwd = 'bleh'
  69. args = ['boo', 'foo', 'bar']
  70. test_string = 'ahah\naccb\nallo\naddb\n✔'
  71. self.kids = [
  72. self.ProcessIdMock(test_string, 1),
  73. self.ProcessIdMock(test_string, 0)
  74. ]
  75. mockPopen.side_effect = self.PopenMock
  76. line_list = []
  77. result = gclient_utils.CheckCallAndFilter(args,
  78. cwd=cwd,
  79. show_header=True,
  80. always_show_header=True,
  81. filter_fn=line_list.append,
  82. retry=True)
  83. self.assertEqual(result, test_string.encode('utf-8'))
  84. self.assertEqual(line_list, [
  85. '________ running \'boo foo bar\' in \'bleh\'\n',
  86. 'ahah',
  87. 'accb',
  88. 'allo',
  89. 'addb',
  90. '✔',
  91. '________ running \'boo foo bar\' in \'bleh\' attempt 2 / 2\n',
  92. 'ahah',
  93. 'accb',
  94. 'allo',
  95. 'addb',
  96. '✔',
  97. ])
  98. mockTime.assert_called_with(gclient_utils.RETRY_INITIAL_SLEEP)
  99. self.assertEqual(mockPopen.mock_calls, [
  100. mock.call(args,
  101. cwd=cwd,
  102. stdout=mock.ANY,
  103. stderr=subprocess2.STDOUT,
  104. bufsize=0),
  105. mock.call(args,
  106. cwd=cwd,
  107. stdout=mock.ANY,
  108. stderr=subprocess2.STDOUT,
  109. bufsize=0),
  110. ])
  111. self.assertEqual(self.stdout.getvalue(), b'')
  112. self.assertEqual(
  113. self.printfn.getvalue(),
  114. 'WARNING: subprocess \'"boo" "foo" "bar"\' in bleh failed; will retry '
  115. 'after a short nap...')
  116. @mock.patch('subprocess2.Popen')
  117. def testCheckCallAndFilter_PrintStdout(self, mockPopen):
  118. cwd = 'bleh'
  119. args = ['boo', 'foo', 'bar']
  120. test_string = 'ahah\naccb\nallo\naddb\n✔'
  121. self.kids = [self.ProcessIdMock(test_string)]
  122. mockPopen.side_effect = self.PopenMock
  123. result = gclient_utils.CheckCallAndFilter(args,
  124. cwd=cwd,
  125. show_header=True,
  126. always_show_header=True,
  127. print_stdout=True)
  128. self.assertEqual(result, test_string.encode('utf-8'))
  129. self.assertEqual(self.stdout.getvalue().splitlines(), [
  130. b"________ running 'boo foo bar' in 'bleh'",
  131. b'ahah',
  132. b'accb',
  133. b'allo',
  134. b'addb',
  135. b'\xe2\x9c\x94',
  136. ])
  137. class AnnotatedTestCase(unittest.TestCase):
  138. def setUp(self):
  139. self.out = gclient_utils.MakeFileAnnotated(io.BytesIO())
  140. self.annotated = gclient_utils.MakeFileAnnotated(io.BytesIO(),
  141. include_zero=True)
  142. def testWrite(self):
  143. test_cases = [
  144. ('test string\n', b'test string\n'),
  145. (b'test string\n', b'test string\n'),
  146. ('✔\n', b'\xe2\x9c\x94\n'),
  147. (b'\xe2\x9c\x94\n', b'\xe2\x9c\x94\n'),
  148. ('first line\nsecondline\n', b'first line\nsecondline\n'),
  149. (b'first line\nsecondline\n', b'first line\nsecondline\n'),
  150. ]
  151. for test_input, expected_output in test_cases:
  152. out = gclient_utils.MakeFileAnnotated(io.BytesIO())
  153. out.write(test_input)
  154. self.assertEqual(out.getvalue(), expected_output)
  155. def testWrite_Annotated(self):
  156. test_cases = [
  157. ('test string\n', b'0>test string\n'),
  158. (b'test string\n', b'0>test string\n'),
  159. ('✔\n', b'0>\xe2\x9c\x94\n'),
  160. (b'\xe2\x9c\x94\n', b'0>\xe2\x9c\x94\n'),
  161. ('first line\nsecondline\n', b'0>first line\n0>secondline\n'),
  162. (b'first line\nsecondline\n', b'0>first line\n0>secondline\n'),
  163. ]
  164. for test_input, expected_output in test_cases:
  165. out = gclient_utils.MakeFileAnnotated(io.BytesIO(),
  166. include_zero=True)
  167. out.write(test_input)
  168. self.assertEqual(out.getvalue(), expected_output)
  169. def testByteByByteInput(self):
  170. self.out.write(b'\xe2')
  171. self.out.write(b'\x9c')
  172. self.out.write(b'\x94')
  173. self.out.write(b'\n')
  174. self.out.write(b'\xe2')
  175. self.out.write(b'\n')
  176. self.assertEqual(self.out.getvalue(), b'\xe2\x9c\x94\n\xe2\n')
  177. def testByteByByteInput_Annotated(self):
  178. self.annotated.write(b'\xe2')
  179. self.annotated.write(b'\x9c')
  180. self.annotated.write(b'\x94')
  181. self.annotated.write(b'\n')
  182. self.annotated.write(b'\xe2')
  183. self.annotated.write(b'\n')
  184. self.assertEqual(self.annotated.getvalue(), b'0>\xe2\x9c\x94\n0>\xe2\n')
  185. def testFlush_Annotated(self):
  186. self.annotated.write(b'first line\nsecond line')
  187. self.assertEqual(self.annotated.getvalue(), b'0>first line\n')
  188. self.annotated.flush()
  189. self.assertEqual(self.annotated.getvalue(),
  190. b'0>first line\n0>second line\n')
  191. class SplitUrlRevisionTestCase(unittest.TestCase):
  192. def testSSHUrl(self):
  193. url = "ssh://test@example.com/test.git"
  194. rev = "ac345e52dc"
  195. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  196. self.assertEqual(out_rev, None)
  197. self.assertEqual(out_url, url)
  198. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  199. self.assertEqual(out_rev, rev)
  200. self.assertEqual(out_url, url)
  201. url = "ssh://example.com/test.git"
  202. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  203. self.assertEqual(out_rev, None)
  204. self.assertEqual(out_url, url)
  205. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  206. self.assertEqual(out_rev, rev)
  207. self.assertEqual(out_url, url)
  208. url = "ssh://example.com/git/test.git"
  209. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  210. self.assertEqual(out_rev, None)
  211. self.assertEqual(out_url, url)
  212. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  213. self.assertEqual(out_rev, rev)
  214. self.assertEqual(out_url, url)
  215. rev = "test-stable"
  216. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  217. self.assertEqual(out_rev, rev)
  218. self.assertEqual(out_url, url)
  219. url = "ssh://user-name@example.com/~/test.git"
  220. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  221. self.assertEqual(out_rev, None)
  222. self.assertEqual(out_url, url)
  223. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  224. self.assertEqual(out_rev, rev)
  225. self.assertEqual(out_url, url)
  226. url = "ssh://user-name@example.com/~username/test.git"
  227. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  228. self.assertEqual(out_rev, None)
  229. self.assertEqual(out_url, url)
  230. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  231. self.assertEqual(out_rev, rev)
  232. self.assertEqual(out_url, url)
  233. url = "git@github.com:dart-lang/spark.git"
  234. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  235. self.assertEqual(out_rev, None)
  236. self.assertEqual(out_url, url)
  237. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  238. self.assertEqual(out_rev, rev)
  239. self.assertEqual(out_url, url)
  240. def testSVNUrl(self):
  241. url = "svn://example.com/test"
  242. rev = "ac345e52dc"
  243. out_url, out_rev = gclient_utils.SplitUrlRevision(url)
  244. self.assertEqual(out_rev, None)
  245. self.assertEqual(out_url, url)
  246. out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev))
  247. self.assertEqual(out_rev, rev)
  248. self.assertEqual(out_url, url)
  249. class ExtracRefNameTest(unittest.TestCase):
  250. def testMatchFound(self):
  251. self.assertEqual(
  252. 'main',
  253. gclient_utils.ExtractRefName('origin', 'refs/remote/origin/main'))
  254. self.assertEqual(
  255. '1234', gclient_utils.ExtractRefName('origin', 'refs/tags/1234'))
  256. self.assertEqual(
  257. 'chicken',
  258. gclient_utils.ExtractRefName('origin', 'refs/heads/chicken'))
  259. def testNoMatch(self):
  260. self.assertIsNone(gclient_utils.ExtractRefName('origin', 'abcbbb1234'))
  261. class GClientUtilsTest(trial_dir.TestCase):
  262. def testHardToDelete(self):
  263. # Use the fact that tearDown will delete the directory to make it hard
  264. # to do so.
  265. l1 = os.path.join(self.root_dir, 'l1')
  266. l2 = os.path.join(l1, 'l2')
  267. l3 = os.path.join(l2, 'l3')
  268. f3 = os.path.join(l3, 'f3')
  269. os.mkdir(l1)
  270. os.mkdir(l2)
  271. os.mkdir(l3)
  272. gclient_utils.FileWrite(f3, 'foo')
  273. os.chmod(f3, 0)
  274. os.chmod(l3, 0)
  275. os.chmod(l2, 0)
  276. os.chmod(l1, 0)
  277. def testUpgradeToHttps(self):
  278. values = [
  279. ['', ''],
  280. [None, None],
  281. ['foo', 'https://foo'],
  282. ['http://foo', 'https://foo'],
  283. ['foo/', 'https://foo/'],
  284. ['ssh-svn://foo', 'ssh-svn://foo'],
  285. ['ssh-svn://foo/bar/', 'ssh-svn://foo/bar/'],
  286. ['codereview.chromium.org', 'https://codereview.chromium.org'],
  287. ['codereview.chromium.org/', 'https://codereview.chromium.org/'],
  288. [
  289. 'chromium-review.googlesource.com',
  290. 'https://chromium-review.googlesource.com'
  291. ],
  292. [
  293. 'chromium-review.googlesource.com/',
  294. 'https://chromium-review.googlesource.com/'
  295. ],
  296. ['http://foo:10000', 'http://foo:10000'],
  297. ['http://foo:10000/bar', 'http://foo:10000/bar'],
  298. ['foo:10000', 'http://foo:10000'],
  299. ['foo:', 'https://foo:'],
  300. ]
  301. for content, expected in values:
  302. self.assertEqual(expected, gclient_utils.UpgradeToHttps(content))
  303. def testParseCodereviewSettingsContent(self):
  304. values = [
  305. ['# bleh\n', {}],
  306. ['\t# foo : bar\n', {}],
  307. ['Foo:bar', {
  308. 'Foo': 'bar'
  309. }],
  310. ['Foo:bar:baz\n', {
  311. 'Foo': 'bar:baz'
  312. }],
  313. [' Foo : bar ', {
  314. 'Foo': 'bar'
  315. }],
  316. [' Foo : bar \n', {
  317. 'Foo': 'bar'
  318. }],
  319. ['a:b\n\rc:d\re:f', {
  320. 'a': 'b',
  321. 'c': 'd',
  322. 'e': 'f'
  323. }],
  324. ['an_url:http://value/', {
  325. 'an_url': 'http://value/'
  326. }],
  327. [
  328. 'CODE_REVIEW_SERVER : http://r/s', {
  329. 'CODE_REVIEW_SERVER': 'https://r/s'
  330. }
  331. ],
  332. ['VIEW_VC:http://r/s', {
  333. 'VIEW_VC': 'https://r/s'
  334. }],
  335. ]
  336. for content, expected in values:
  337. self.assertEqual(
  338. expected, gclient_utils.ParseCodereviewSettingsContent(content))
  339. def testFileRead_Bytes(self):
  340. with gclient_utils.temporary_file() as tmp:
  341. gclient_utils.FileWrite(tmp,
  342. b'foo \xe2\x9c bar',
  343. mode='wb',
  344. encoding=None)
  345. self.assertEqual('foo \ufffd bar', gclient_utils.FileRead(tmp))
  346. def testFileRead_Unicode(self):
  347. with gclient_utils.temporary_file() as tmp:
  348. gclient_utils.FileWrite(tmp, 'foo ✔ bar')
  349. self.assertEqual('foo ✔ bar', gclient_utils.FileRead(tmp))
  350. def testTemporaryFile(self):
  351. with gclient_utils.temporary_file() as tmp:
  352. gclient_utils.FileWrite(tmp, 'test')
  353. self.assertEqual('test', gclient_utils.FileRead(tmp))
  354. self.assertFalse(os.path.exists(tmp))
  355. if __name__ == '__main__':
  356. unittest.main()
  357. # vim: ts=2:sw=2:tw=80:et: