123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807 |
- #!/usr/bin/env vpython3
- # coding=utf-8
- # Copyright (c) 2019 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- import json
- import os
- import socket
- import subprocess
- import sys
- import textwrap
- import unittest
- from io import StringIO
- from pathlib import Path
- from typing import Optional
- from unittest import mock
- import httplib2
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import scm_mock
- import gerrit_util
- import metrics
- import scm
- import subprocess2
- RUN_SUBPROC_TESTS = 'RUN_SUBPROC_TESTS' in os.environ
- def makeConn(host: str) -> gerrit_util.HttpConn:
- """Makes an empty gerrit_util.HttpConn for the given host."""
- return gerrit_util.HttpConn(
- req_uri='???',
- req_method='GET',
- req_host=host,
- req_headers={},
- req_body=None,
- )
- class CookiesAuthenticatorTest(unittest.TestCase):
- _GITCOOKIES = '\n'.join([
- '\t'.join([
- 'chromium.googlesource.com',
- 'FALSE',
- '/',
- 'TRUE',
- '2147483647',
- 'o',
- 'git-user.chromium.org=1/chromium-secret',
- ]),
- '\t'.join([
- 'chromium-review.googlesource.com',
- 'FALSE',
- '/',
- 'TRUE',
- '2147483647',
- 'o',
- 'git-user.chromium.org=1/chromium-secret',
- ]),
- '\t'.join([
- '.example.com',
- 'FALSE',
- '/',
- 'TRUE',
- '2147483647',
- 'o',
- 'example-bearer-token',
- ]),
- '\t'.join([
- 'another-path.example.com',
- 'FALSE',
- '/foo',
- 'TRUE',
- '2147483647',
- 'o',
- 'git-example.com=1/another-path-secret',
- ]),
- '\t'.join([
- 'another-key.example.com',
- 'FALSE',
- '/',
- 'TRUE',
- '2147483647',
- 'not-o',
- 'git-example.com=1/another-key-secret',
- ]),
- '#' + '\t'.join([
- 'chromium-review.googlesource.com',
- 'FALSE',
- '/',
- 'TRUE',
- '2147483647',
- 'o',
- 'git-invalid-user.chromium.org=1/invalid-chromium-secret',
- ]),
- 'Some unrelated line\t that should not be here',
- ])
- def setUp(self):
- mock.patch('gclient_utils.FileRead',
- return_value=self._GITCOOKIES).start()
- mock.patch('os.getenv', return_value={}).start()
- mock.patch('os.environ', {'HOME': '$HOME'}).start()
- mock.patch('os.getcwd', return_value='/fame/cwd').start()
- mock.patch('os.path.exists', return_value=True).start()
- mock.patch(
- 'git_common.run',
- side_effect=[
- subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'out', 'err')
- ],
- ).start()
- scm_mock.GIT(self)
- self.addCleanup(mock.patch.stopall)
- self.maxDiff = None
- def assertAuthenticatedConnAuth(self,
- auth: gerrit_util.CookiesAuthenticator,
- host: str, expected: str):
- conn = makeConn(host)
- auth.authenticate(conn)
- self.assertEqual(conn.req_headers['Authorization'], expected)
- def testGetNewPasswordUrl(self):
- auth = gerrit_util.CookiesAuthenticator()
- self.assertEqual('https://chromium.googlesource.com/new-password',
- auth.get_new_password_url('chromium.googlesource.com'))
- self.assertEqual(
- 'https://chrome-internal.googlesource.com/new-password',
- auth.get_new_password_url(
- 'chrome-internal-review.googlesource.com'))
- def testGetNewPasswordMessage(self):
- auth = gerrit_util.CookiesAuthenticator()
- self.assertIn(
- 'https://chromium.googlesource.com/new-password',
- auth._get_new_password_message('chromium-review.googlesource.com'))
- self.assertIn(
- 'https://chrome-internal.googlesource.com/new-password',
- auth._get_new_password_message('chrome-internal.googlesource.com'))
- def testGetGitcookiesPath(self):
- self.assertEqual(
- os.path.expanduser(os.path.join('~', '.gitcookies')),
- gerrit_util.CookiesAuthenticator().get_gitcookies_path())
- scm.GIT.SetConfig(os.getcwd(), 'http.cookiefile', '/some/path')
- self.assertEqual(
- '/some/path',
- gerrit_util.CookiesAuthenticator().get_gitcookies_path())
- os.getenv.return_value = 'git-cookies-path'
- self.assertEqual(
- 'git-cookies-path',
- gerrit_util.CookiesAuthenticator().get_gitcookies_path())
- os.getenv.assert_called_with('GIT_COOKIES_PATH')
- def testGitcookies(self):
- auth = gerrit_util.CookiesAuthenticator()
- self.assertEqual(
- auth.gitcookies, {
- 'chromium.googlesource.com':
- ('git-user.chromium.org', '1/chromium-secret'),
- 'chromium-review.googlesource.com':
- ('git-user.chromium.org', '1/chromium-secret'),
- '.example.com': ('', 'example-bearer-token'),
- })
- def testGetAuthHeader(self):
- expected_chromium_header = (
- 'Basic Z2l0LXVzZXIuY2hyb21pdW0ub3JnOjEvY2hyb21pdW0tc2VjcmV0')
- auth = gerrit_util.CookiesAuthenticator()
- self.assertAuthenticatedConnAuth(auth, 'chromium.googlesource.com',
- expected_chromium_header)
- self.assertAuthenticatedConnAuth(auth,
- 'chromium-review.googlesource.com',
- expected_chromium_header)
- self.assertAuthenticatedConnAuth(auth, 'some-review.example.com',
- 'Bearer example-bearer-token')
- def testGetAuthEmail(self):
- auth = gerrit_util.CookiesAuthenticator()
- self.assertEqual('user@chromium.org',
- auth.get_auth_email('chromium.googlesource.com'))
- self.assertEqual(
- 'user@chromium.org',
- auth.get_auth_email('chromium-review.googlesource.com'))
- self.assertIsNone(auth.get_auth_email('some-review.example.com'))
- class GceAuthenticatorTest(unittest.TestCase):
- def setUp(self):
- super(GceAuthenticatorTest, self).setUp()
- mock.patch('httplib2.Http').start()
- mock.patch('os.getenv', return_value=None).start()
- mock.patch('gerrit_util.time_sleep').start()
- mock.patch('gerrit_util.time_time').start()
- self.addCleanup(mock.patch.stopall)
- # GceAuthenticator has class variables that cache the results. Build a
- # new class for every test to avoid inter-test dependencies.
- class GceAuthenticator(gerrit_util.GceAuthenticator):
- pass
- self.GceAuthenticator = GceAuthenticator
- def assertAuthenticatedToken(self, token: Optional[str]):
- conn = makeConn('some.example.com')
- self.GceAuthenticator().authenticate(conn)
- if token is None:
- self.assertNotIn('Authorization', conn.req_headers)
- else:
- self.assertEqual(conn.req_headers['Authorization'], token)
- def testIsGce_EnvVarSkip(self, *_mocks):
- os.getenv.return_value = '1'
- self.assertFalse(self.GceAuthenticator.is_applicable())
- os.getenv.assert_called_once_with('SKIP_GCE_AUTH_FOR_GIT')
- def testIsGce_Error(self):
- httplib2.Http().request.side_effect = httplib2.HttpLib2Error
- self.assertFalse(self.GceAuthenticator.is_applicable())
- def testIsGce_500(self):
- httplib2.Http().request.return_value = (mock.Mock(status=500), None)
- self.assertFalse(self.GceAuthenticator.is_applicable())
- last_call = gerrit_util.time_sleep.mock_calls[-1]
- self.assertLessEqual(last_call, mock.call(43.0))
- def testIsGce_FailsThenSucceeds(self):
- response = mock.Mock(status=200)
- response.get.return_value = 'Google'
- httplib2.Http().request.side_effect = [
- (mock.Mock(status=500), None),
- (response, 'who cares'),
- ]
- self.assertTrue(self.GceAuthenticator.is_applicable())
- def testIsGce_MetadataFlavorIsNotGoogle(self):
- response = mock.Mock(status=200)
- response.get.return_value = None
- httplib2.Http().request.return_value = (response, 'who cares')
- self.assertFalse(self.GceAuthenticator.is_applicable())
- response.get.assert_called_once_with('metadata-flavor')
- def testIsGce_ResultIsCached(self):
- response = mock.Mock(status=200)
- response.get.return_value = 'Google'
- httplib2.Http().request.side_effect = [(response, 'who cares')]
- self.assertTrue(self.GceAuthenticator.is_applicable())
- self.assertTrue(self.GceAuthenticator.is_applicable())
- httplib2.Http().request.assert_called_once()
- def testGetAuthHeader_Error(self):
- httplib2.Http().request.side_effect = httplib2.HttpLib2Error
- self.assertAuthenticatedToken(None)
- def testGetAuthHeader_500(self):
- httplib2.Http().request.return_value = (mock.Mock(status=500), None)
- self.assertAuthenticatedToken(None)
- def testGetAuthHeader_Non200(self):
- httplib2.Http().request.return_value = (mock.Mock(status=403), None)
- self.assertAuthenticatedToken(None)
- def testGetAuthHeader_OK(self):
- httplib2.Http().request.return_value = (
- mock.Mock(status=200),
- '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}'
- )
- gerrit_util.time_time.return_value = 0
- self.assertAuthenticatedToken('TYPE TOKEN')
- def testGetAuthHeader_Cache(self):
- httplib2.Http().request.return_value = (
- mock.Mock(status=200),
- '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}'
- )
- gerrit_util.time_time.return_value = 0
- self.assertAuthenticatedToken('TYPE TOKEN')
- self.assertAuthenticatedToken('TYPE TOKEN')
- httplib2.Http().request.assert_called_once()
- def testGetAuthHeader_CacheOld(self):
- httplib2.Http().request.return_value = (
- mock.Mock(status=200),
- '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}'
- )
- gerrit_util.time_time.side_effect = [0, 100, 200]
- self.assertAuthenticatedToken('TYPE TOKEN')
- self.assertAuthenticatedToken('TYPE TOKEN')
- self.assertEqual(2, len(httplib2.Http().request.mock_calls))
- class GerritUtilTest(unittest.TestCase):
- def setUp(self):
- super(GerritUtilTest, self).setUp()
- mock.patch('gerrit_util.LOGGER').start()
- mock.patch('gerrit_util.time_sleep').start()
- mock.patch('metrics.collector').start()
- mock.patch('metrics_utils.extract_http_metrics',
- return_value='http_metrics').start()
- self.addCleanup(mock.patch.stopall)
- def testQueryString(self):
- self.assertEqual('', gerrit_util._QueryString([]))
- self.assertEqual('first%20param%2B',
- gerrit_util._QueryString([], 'first param+'))
- self.assertEqual(
- 'key:val+foo:bar',
- gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')]))
- self.assertEqual(
- 'first%20param%2B+key:val+foo:bar',
- gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')],
- 'first param+'))
- @mock.patch('gerrit_util.CookiesAuthenticator._get_auth_for_host')
- @mock.patch('gerrit_util._Authenticator.get')
- def testCreateHttpConn_Basic(self, mockAuth, cookieAuth):
- mockAuth.return_value = gerrit_util.CookiesAuthenticator()
- cookieAuth.return_value = None
- conn = gerrit_util.CreateHttpConn('host.example.com', 'foo/bar')
- self.assertEqual('host.example.com', conn.req_host)
- self.assertEqual(
- {
- 'uri': 'https://host.example.com/a/foo/bar',
- 'method': 'GET',
- 'headers': {},
- 'body': None,
- }, conn.req_params)
- @mock.patch('gerrit_util.CookiesAuthenticator._get_auth_for_host')
- @mock.patch('gerrit_util._Authenticator.get')
- def testCreateHttpConn_Authenticated(self, mockAuth, cookieAuth):
- mockAuth.return_value = gerrit_util.CookiesAuthenticator()
- cookieAuth.return_value = (None, 'token')
- conn = gerrit_util.CreateHttpConn('host.example.com',
- 'foo/bar',
- headers={'header': 'value'})
- self.assertEqual('host.example.com', conn.req_host)
- self.assertEqual(
- {
- 'uri': 'https://host.example.com/a/foo/bar',
- 'method': 'GET',
- 'headers': {
- 'Authorization': 'Bearer token',
- 'header': 'value'
- },
- 'body': None,
- }, conn.req_params)
- @mock.patch('gerrit_util.CookiesAuthenticator._get_auth_for_host')
- @mock.patch('gerrit_util._Authenticator')
- def testCreateHttpConn_Body(self, mockAuth, cookieAuth):
- mockAuth.return_value = gerrit_util.CookiesAuthenticator()
- cookieAuth.return_value = None
- conn = gerrit_util.CreateHttpConn('host.example.com',
- 'foo/bar',
- body={
- 'l': [1, 2, 3],
- 'd': {
- 'k': 'v'
- }
- })
- self.assertEqual('host.example.com', conn.req_host)
- self.assertEqual(
- {
- 'uri': 'https://host.example.com/a/foo/bar',
- 'method': 'GET',
- 'headers': {
- 'Content-Type': 'application/json'
- },
- 'body': '{"d": {"k": "v"}, "l": [1, 2, 3]}',
- }, conn.req_params)
- def testReadHttpResponse_200(self):
- conn = mock.Mock()
- conn.req_params = {'uri': 'uri', 'method': 'method'}
- conn.request.return_value = (mock.Mock(status=200),
- b'content\xe2\x9c\x94')
- content = gerrit_util.ReadHttpResponse(conn)
- self.assertEqual('content✔', content.getvalue())
- metrics.collector.add_repeated.assert_called_once_with(
- 'http_requests', 'http_metrics')
- def testReadHttpResponse_AuthenticationIssue(self):
- for status in (302, 401, 403):
- response = mock.Mock(status=status)
- response.get.return_value = None
- conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
- conn.request.return_value = (response, b'')
- with mock.patch('sys.stdout', StringIO()):
- with self.assertRaises(gerrit_util.GerritError) as cm:
- gerrit_util.ReadHttpResponse(conn)
- self.assertEqual(status, cm.exception.http_status)
- self.assertIn('Your Gerrit credentials might be misconfigured',
- sys.stdout.getvalue())
- def testReadHttpResponse_ClientError(self):
- conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
- conn.request.return_value = (mock.Mock(status=404), b'')
- with self.assertRaises(gerrit_util.GerritError) as cm:
- gerrit_util.ReadHttpResponse(conn)
- self.assertEqual(404, cm.exception.http_status)
- def readHttpResponse_ServerErrorHelper(self, status):
- conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
- conn.request.return_value = (mock.Mock(status=status), b'')
- with self.assertRaises(gerrit_util.GerritError) as cm:
- gerrit_util.ReadHttpResponse(conn)
- self.assertEqual(status, cm.exception.http_status)
- self.assertEqual(gerrit_util.TRY_LIMIT, len(conn.request.mock_calls))
- last_call = gerrit_util.time_sleep.mock_calls[-1]
- self.assertLessEqual(last_call, mock.call(422.0))
- def testReadHttpResponse_ServerError(self):
- self.readHttpResponse_ServerErrorHelper(status=404)
- self.readHttpResponse_ServerErrorHelper(status=409)
- self.readHttpResponse_ServerErrorHelper(status=429)
- self.readHttpResponse_ServerErrorHelper(status=500)
- def testReadHttpResponse_ServerErrorAndSuccess(self):
- conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
- conn.request.side_effect = [
- (mock.Mock(status=500), b''),
- (mock.Mock(status=200), b'content\xe2\x9c\x94'),
- ]
- self.assertEqual('content✔',
- gerrit_util.ReadHttpResponse(conn).getvalue())
- self.assertEqual(2, len(conn.request.mock_calls))
- gerrit_util.time_sleep.assert_called_once_with(12.0)
- def testReadHttpResponse_TimeoutAndSuccess(self):
- conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
- conn.request.side_effect = [
- socket.timeout('timeout'),
- (mock.Mock(status=200), b'content\xe2\x9c\x94'),
- ]
- self.assertEqual('content✔',
- gerrit_util.ReadHttpResponse(conn).getvalue())
- self.assertEqual(2, len(conn.request.mock_calls))
- gerrit_util.time_sleep.assert_called_once_with(12.0)
- def testReadHttpResponse_SetMaxTries(self):
- conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'})
- conn.request.side_effect = [
- (mock.Mock(status=409), b'error!'),
- (mock.Mock(status=409), b'error!'),
- (mock.Mock(status=409), b'error!'),
- ]
- self.assertRaises(gerrit_util.GerritError,
- gerrit_util.ReadHttpResponse,
- conn,
- max_tries=2)
- self.assertEqual(2, len(conn.request.mock_calls))
- gerrit_util.time_sleep.assert_called_once_with(12.0)
- def testReadHttpResponse_Expected404(self):
- conn = mock.Mock()
- conn.req_params = {'uri': 'uri', 'method': 'method'}
- conn.request.return_value = (mock.Mock(status=404),
- b'content\xe2\x9c\x94')
- content = gerrit_util.ReadHttpResponse(conn, (404, ))
- self.assertEqual('', content.getvalue())
- @mock.patch('gerrit_util.ReadHttpResponse')
- def testReadHttpJsonResponse_NotJSON(self, mockReadHttpResponse):
- mockReadHttpResponse.return_value = StringIO('not json')
- with self.assertRaises(gerrit_util.GerritError) as cm:
- gerrit_util.ReadHttpJsonResponse(None)
- self.assertEqual(cm.exception.http_status, 200)
- self.assertEqual(cm.exception.message,
- '(200) Unexpected json output: not json')
- @mock.patch('gerrit_util.ReadHttpResponse')
- def testReadHttpJsonResponse_EmptyValue(self, mockReadHttpResponse):
- mockReadHttpResponse.return_value = StringIO(')]}\'')
- self.assertEqual(gerrit_util.ReadHttpJsonResponse(None), {})
- @mock.patch('gerrit_util.ReadHttpResponse')
- def testReadHttpJsonResponse_JSON(self, mockReadHttpResponse):
- expected_value = {'foo': 'bar', 'baz': [1, '2', 3]}
- mockReadHttpResponse.return_value = StringIO(')]}\'\n' +
- json.dumps(expected_value))
- self.assertEqual(expected_value, gerrit_util.ReadHttpJsonResponse(None))
- @mock.patch('gerrit_util.CreateHttpConn')
- @mock.patch('gerrit_util.ReadHttpJsonResponse')
- def testQueryChanges(self, mockJsonResponse, mockCreateHttpConn):
- gerrit_util.QueryChanges('host', [('key', 'val'), ('foo', 'bar baz')],
- 'first param',
- limit=500,
- o_params=['PARAM_A', 'PARAM_B'],
- start='start')
- mockCreateHttpConn.assert_called_once_with(
- 'host', ('changes/?q=first%20param+key:val+foo:bar+baz'
- '&start=start'
- '&n=500'
- '&o=PARAM_A'
- '&o=PARAM_B'),
- timeout=30.0)
- def testQueryChanges_NoParams(self):
- self.assertRaises(RuntimeError, gerrit_util.QueryChanges, 'host', [])
- @mock.patch('gerrit_util.QueryChanges')
- def testGenerateAllChanges(self, mockQueryChanges):
- mockQueryChanges.side_effect = [
- # First results page
- [
- {
- '_number': '4'
- },
- {
- '_number': '3'
- },
- {
- '_number': '2',
- '_more_changes': True
- },
- ],
- # Second results page, there are new changes, so second page
- # includes some results from the first page.
- [
- {
- '_number': '2'
- },
- {
- '_number': '1'
- },
- ],
- # GenerateAllChanges queries again from the start to get any new
- # changes (5 in this case).
- [
- {
- '_number': '5'
- },
- {
- '_number': '4'
- },
- {
- '_number': '3',
- '_more_changes': True
- },
- ],
- ]
- changes = list(gerrit_util.GenerateAllChanges('host', 'params'))
- self.assertEqual([
- {
- '_number': '4'
- },
- {
- '_number': '3'
- },
- {
- '_number': '2',
- '_more_changes': True
- },
- {
- '_number': '1'
- },
- {
- '_number': '5'
- },
- ], changes)
- self.assertEqual([
- mock.call('host', 'params', None, 500, None, 0),
- mock.call('host', 'params', None, 500, None, 3),
- mock.call('host', 'params', None, 500, None, 0),
- ], mockQueryChanges.mock_calls)
- @mock.patch('gerrit_util.CreateHttpConn')
- @mock.patch('gerrit_util.ReadHttpJsonResponse')
- def testIsCodeOwnersEnabledOnRepo_Disabled(self, mockJsonResponse,
- mockCreateHttpConn):
- mockJsonResponse.return_value = {'status': {'disabled': True}}
- self.assertFalse(gerrit_util.IsCodeOwnersEnabledOnRepo('host', 'repo'))
- @mock.patch('gerrit_util.CreateHttpConn')
- @mock.patch('gerrit_util.ReadHttpJsonResponse')
- def testIsCodeOwnersEnabledOnRepo_Enabled(self, mockJsonResponse,
- mockCreateHttpConn):
- mockJsonResponse.return_value = {'status': {}}
- self.assertTrue(gerrit_util.IsCodeOwnersEnabledOnRepo('host', 'repo'))
- class SSOAuthenticatorTest(unittest.TestCase):
- @classmethod
- def setUpClass(cls) -> None:
- cls._original_timeout_secs = gerrit_util.SSOAuthenticator._timeout_secs
- return super().setUpClass()
- def setUp(self) -> None:
- gerrit_util.SSOAuthenticator._sso_info = None
- gerrit_util.SSOAuthenticator._testing_load_expired_cookies = True
- gerrit_util.SSOAuthenticator._timeout_secs = self._original_timeout_secs
- self.sso = gerrit_util.SSOAuthenticator()
- return super().setUp()
- def tearDown(self) -> None:
- gerrit_util.SSOAuthenticator._sso_info = None
- gerrit_util.SSOAuthenticator._testing_load_expired_cookies = False
- gerrit_util.SSOAuthenticator._timeout_secs = self._original_timeout_secs
- return super().tearDown()
- @property
- def _input_dir(self) -> Path:
- base = Path(__file__).absolute().with_suffix('.inputs')
- # Here _testMethodName would be a string like "testCmdAssemblyFound"
- return base / self._testMethodName
- @mock.patch('gerrit_util.ssoHelper.find_cmd',
- return_value='/fake/git-remote-sso')
- def testCmdAssemblyFound(self, _):
- self.assertEqual(self.sso._resolve_sso_cmd(),
- ('/fake/git-remote-sso', '-print_config',
- 'sso://*.git.corp.google.com'))
- with mock.patch('scm.GIT.GetConfig') as p:
- p.side_effect = ['firefly@google.com']
- self.assertTrue(self.sso.is_applicable())
- @mock.patch('gerrit_util.ssoHelper.find_cmd', return_value=None)
- def testCmdAssemblyNotFound(self, _):
- self.assertEqual(self.sso._resolve_sso_cmd(), ())
- self.assertFalse(self.sso.is_applicable())
- def testParseConfigOK(self):
- test_config = {
- 'somekey': 'a value with = in it',
- 'novalue': '',
- 'http.proxy': 'localhost:12345',
- 'http.cookiefile': str(self._input_dir / 'cookiefile.txt'),
- 'include.path': str(self._input_dir / 'gitconfig'),
- }
- parsed = self.sso._parse_config(test_config)
- self.assertDictEqual(parsed.headers, {
- 'Authorization': 'Basic REALLY_COOL_TOKEN',
- })
- self.assertEqual(parsed.proxy.proxy_host, b'localhost')
- self.assertEqual(parsed.proxy.proxy_port, 12345)
- c = parsed.cookies._cookies
- self.assertEqual(c['login.example.com']['/']['SSO'].value,
- 'TUVFUE1PUlAK')
- self.assertEqual(c['.example.com']['/']['__CoolProxy'].value,
- 'QkxFRVBCTE9SUAo=')
- @unittest.skipUnless(RUN_SUBPROC_TESTS, 'subprocess tests are flakey')
- def testLaunchHelperOK(self):
- gerrit_util.SSOAuthenticator._sso_cmd = ('python3',
- str(self._input_dir /
- 'git-remote-sso.py'))
- info = self.sso._get_sso_info()
- self.assertDictEqual(info.headers, {
- 'Authorization': 'Basic REALLY_COOL_TOKEN',
- })
- self.assertEqual(info.proxy.proxy_host, b'localhost')
- self.assertEqual(info.proxy.proxy_port, 12345)
- c = info.cookies._cookies
- self.assertEqual(c['login.example.com']['/']['SSO'].value,
- 'TUVFUE1PUlAK')
- self.assertEqual(c['.example.com']['/']['__CoolProxy'].value,
- 'QkxFRVBCTE9SUAo=')
- @unittest.skipUnless(RUN_SUBPROC_TESTS, 'subprocess tests are flakey')
- def testLaunchHelperFailQuick(self):
- gerrit_util.SSOAuthenticator._sso_cmd = ('python3',
- str(self._input_dir /
- 'git-remote-sso.py'))
- with self.assertRaisesRegex(SystemExit, "SSO Failure Message!!!"):
- self.sso._get_sso_info()
- @unittest.skipUnless(RUN_SUBPROC_TESTS, 'subprocess tests are flakey')
- def testLaunchHelperFailSlow(self):
- gerrit_util.SSOAuthenticator._timeout_secs = 0.2
- gerrit_util.SSOAuthenticator._sso_cmd = ('python3',
- str(self._input_dir /
- 'git-remote-sso.py'))
- with self.assertRaises(subprocess.TimeoutExpired):
- self.sso._get_sso_info()
- class SSOHelperTest(unittest.TestCase):
- def setUp(self) -> None:
- self.sso = gerrit_util.SSOHelper()
- return super().setUp()
- @mock.patch('shutil.which', return_value='/fake/git-remote-sso')
- def testFindCmd(self, _):
- self.assertEqual(self.sso.find_cmd(), '/fake/git-remote-sso')
- @mock.patch('shutil.which', return_value=None)
- def testFindCmdMissing(self, _):
- self.assertEqual(self.sso.find_cmd(), '')
- @mock.patch('shutil.which', return_value='/fake/git-remote-sso')
- def testFindCmdCached(self, which):
- self.sso.find_cmd()
- self.sso.find_cmd()
- self.assertEqual(which.called, 1)
- class ShouldUseSSOTest(unittest.TestCase):
- def setUp(self) -> None:
- self.newauth = mock.patch('newauth.Enabled', return_value=True)
- self.newauth.start()
- self.cwd = mock.patch('os.getcwd', return_value='/fake/cwd')
- self.cwd.start()
- self.sso = mock.patch('gerrit_util.ssoHelper.find_cmd',
- return_value='/fake/git-remote-sso')
- self.sso.start()
- scm_mock.GIT(self)
- self.addCleanup(mock.patch.stopall)
- gerrit_util.ShouldUseSSO.cache_clear()
- return super().setUp()
- def tearDown(self) -> None:
- super().tearDown()
- self.sso.stop()
- self.newauth.stop()
- @mock.patch('newauth.Enabled', return_value=False)
- def testDisabled(self, _):
- self.assertFalse(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
- 'firefly@google.com'))
- @mock.patch('gerrit_util.ssoHelper.find_cmd', return_value='')
- def testMissingCommand(self, _):
- self.assertFalse(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
- 'firefly@google.com'))
- def testBadHost(self):
- self.assertFalse(
- gerrit_util.ShouldUseSSO('fake-host.coreboot.org',
- 'firefly@google.com'))
- def testEmptyEmail(self):
- self.assertTrue(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com', ''))
- def testGoogleEmail(self):
- self.assertTrue(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
- 'firefly@google.com'))
- def testGmail(self):
- self.assertFalse(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
- 'firefly@gmail.com'))
- @mock.patch('gerrit_util.GetAccountEmails',
- return_value=[{
- 'email': 'firefly@chromium.org'
- }])
- def testLinkedChromium(self, email):
- self.assertTrue(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
- 'firefly@chromium.org'))
- email.assert_called_with('fake-host.googlesource.com',
- 'self',
- authenticator=mock.ANY)
- @mock.patch('gerrit_util.GetAccountEmails',
- return_value=[{
- 'email': 'firefly@google.com'
- }])
- def testUnlinkedChromium(self, email):
- self.assertFalse(
- gerrit_util.ShouldUseSSO('fake-host.googlesource.com',
- 'firefly@chromium.org'))
- email.assert_called_with('fake-host.googlesource.com',
- 'self',
- authenticator=mock.ANY)
- if __name__ == '__main__':
- unittest.main()
|