123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- #!/usr/bin/env vpython3
- # Copyright (c) 2017 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Unit Tests for auth.py"""
- import calendar
- import datetime
- import json
- import os
- import unittest
- import sys
- from unittest import mock
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import auth
- import subprocess2
- NOW = datetime.datetime(2019, 10, 17, 12, 30, 59, 0)
- VALID_EXPIRY = NOW + datetime.timedelta(seconds=31)
- class AuthenticatorTest(unittest.TestCase):
- def setUp(self):
- mock.patch('subprocess2.check_call').start()
- mock.patch('subprocess2.check_call_out').start()
- mock.patch('auth.datetime_now', return_value=NOW).start()
- self.addCleanup(mock.patch.stopall)
- def testHasCachedCredentials_NotLoggedIn(self):
- subprocess2.check_call_out.side_effect = [
- subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
- 'stderr')
- ]
- self.assertFalse(auth.Authenticator().has_cached_credentials())
- def testHasCachedCredentials_LoggedIn(self):
- subprocess2.check_call_out.return_value = (json.dumps({
- 'token': 'token',
- 'expiry': 12345678
- }), '')
- self.assertTrue(auth.Authenticator().has_cached_credentials())
- def testGetAccessToken_NotLoggedIn(self):
- subprocess2.check_call_out.side_effect = [
- subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
- 'stderr')
- ]
- self.assertRaises(auth.LoginRequiredError,
- auth.Authenticator().get_access_token)
- def testGetAccessToken_CachedToken(self):
- authenticator = auth.Authenticator()
- authenticator._access_token = auth.Token('token', None)
- self.assertEqual(auth.Token('token', None),
- authenticator.get_access_token())
- subprocess2.check_call_out.assert_not_called()
- def testGetAccesstoken_LoggedIn(self):
- expiry = calendar.timegm(VALID_EXPIRY.timetuple())
- subprocess2.check_call_out.return_value = (json.dumps({
- 'token': 'token',
- 'expiry': expiry
- }), '')
- self.assertEqual(auth.Token('token', VALID_EXPIRY),
- auth.Authenticator().get_access_token())
- subprocess2.check_call_out.assert_called_with([
- 'luci-auth', 'token', '-scopes', auth.OAUTH_SCOPE_EMAIL,
- '-json-output', '-'
- ],
- stdout=subprocess2.PIPE,
- stderr=subprocess2.PIPE)
- def testGetAccessToken_DifferentScope(self):
- expiry = calendar.timegm(VALID_EXPIRY.timetuple())
- subprocess2.check_call_out.return_value = (json.dumps({
- 'token': 'token',
- 'expiry': expiry
- }), '')
- self.assertEqual(auth.Token('token', VALID_EXPIRY),
- auth.Authenticator('custom scopes').get_access_token())
- subprocess2.check_call_out.assert_called_with([
- 'luci-auth', 'token', '-scopes', 'custom scopes', '-json-output',
- '-'
- ],
- stdout=subprocess2.PIPE,
- stderr=subprocess2.PIPE)
- def testAuthorize_AccessToken(self):
- http = mock.Mock()
- http_request = http.request
- http_request.__name__ = '__name__'
- authenticator = auth.Authenticator()
- authenticator._access_token = auth.Token('access_token', None)
- authenticator._id_token = auth.Token('id_token', None)
- authorized = authenticator.authorize(http)
- authorized.request('https://example.com',
- method='POST',
- body='body',
- headers={'header': 'value'})
- http_request.assert_called_once_with(
- 'https://example.com', 'POST', 'body', {
- 'header': 'value',
- 'Authorization': 'Bearer access_token'
- }, mock.ANY, mock.ANY)
- def testGetIdToken_NotLoggedIn(self):
- subprocess2.check_call_out.side_effect = [
- subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
- 'stderr')
- ]
- self.assertRaises(auth.LoginRequiredError,
- auth.Authenticator().get_id_token)
- def testGetIdToken_CachedToken(self):
- authenticator = auth.Authenticator()
- authenticator._id_token = auth.Token('token', None)
- self.assertEqual(auth.Token('token', None),
- authenticator.get_id_token())
- subprocess2.check_call_out.assert_not_called()
- def testGetIdToken_LoggedIn(self):
- expiry = calendar.timegm(VALID_EXPIRY.timetuple())
- subprocess2.check_call_out.return_value = (json.dumps({
- 'token': 'token',
- 'expiry': expiry
- }), '')
- self.assertEqual(
- auth.Token('token', VALID_EXPIRY),
- auth.Authenticator(audience='https://test.com').get_id_token())
- subprocess2.check_call_out.assert_called_with([
- 'luci-auth', 'token', '-use-id-token', '-audience',
- 'https://test.com', '-json-output', '-'
- ],
- stdout=subprocess2.PIPE,
- stderr=subprocess2.PIPE)
- def testAuthorize_IdToken(self):
- http = mock.Mock()
- http_request = http.request
- http_request.__name__ = '__name__'
- authenticator = auth.Authenticator()
- authenticator._access_token = auth.Token('access_token', None)
- authenticator._id_token = auth.Token('id_token', None)
- authorized = authenticator.authorize(http, use_id_token=True)
- authorized.request('https://example.com',
- method='POST',
- body='body',
- headers={'header': 'value'})
- http_request.assert_called_once_with(
- 'https://example.com', 'POST', 'body', {
- 'header': 'value',
- 'Authorization': 'Bearer id_token'
- }, mock.ANY, mock.ANY)
- class TokenTest(unittest.TestCase):
- def setUp(self):
- mock.patch('auth.datetime_now', return_value=NOW).start()
- self.addCleanup(mock.patch.stopall)
- def testNeedsRefresh_NoExpiry(self):
- self.assertFalse(auth.Token('token', None).needs_refresh())
- def testNeedsRefresh_Expired(self):
- expired = NOW + datetime.timedelta(seconds=30)
- self.assertTrue(auth.Token('token', expired).needs_refresh())
- def testNeedsRefresh_Valid(self):
- self.assertFalse(auth.Token('token', VALID_EXPIRY).needs_refresh())
- class HasLuciContextLocalAuthTest(unittest.TestCase):
- def setUp(self):
- mock.patch('os.environ').start()
- mock.patch('builtins.open', mock.mock_open()).start()
- self.addCleanup(mock.patch.stopall)
- def testNoLuciContextEnvVar(self):
- os.environ = {}
- self.assertFalse(auth.has_luci_context_local_auth())
- def testNonexistentPath(self):
- os.environ = {'LUCI_CONTEXT': 'path'}
- open.side_effect = OSError
- self.assertFalse(auth.has_luci_context_local_auth())
- open.assert_called_with('path')
- def testInvalidJsonFile(self):
- os.environ = {'LUCI_CONTEXT': 'path'}
- open().read.return_value = 'not-a-json-file'
- self.assertFalse(auth.has_luci_context_local_auth())
- open.assert_called_with('path')
- def testNoLocalAuth(self):
- os.environ = {'LUCI_CONTEXT': 'path'}
- open().read.return_value = '{}'
- self.assertFalse(auth.has_luci_context_local_auth())
- open.assert_called_with('path')
- def testNoDefaultAccountId(self):
- os.environ = {'LUCI_CONTEXT': 'path'}
- open().read.return_value = json.dumps({
- 'local_auth': {
- 'secret':
- 'secret',
- 'accounts': [{
- 'email': 'bots@account.iam.gserviceaccount.com',
- 'id': 'system',
- }],
- 'rpc_port':
- 1234,
- }
- })
- self.assertFalse(auth.has_luci_context_local_auth())
- open.assert_called_with('path')
- def testHasLocalAuth(self):
- os.environ = {'LUCI_CONTEXT': 'path'}
- open().read.return_value = json.dumps({
- 'local_auth': {
- 'secret':
- 'secret',
- 'accounts': [
- {
- 'email': 'bots@account.iam.gserviceaccount.com',
- 'id': 'system',
- },
- {
- 'email': 'builder@account.iam.gserviceaccount.com',
- 'id': 'task',
- },
- ],
- 'rpc_port':
- 1234,
- 'default_account_id':
- 'task',
- },
- })
- self.assertTrue(auth.has_luci_context_local_auth())
- open.assert_called_with('path')
- if __name__ == '__main__':
- if '-v' in sys.argv:
- logging.basicConfig(level=logging.DEBUG)
- unittest.main()
|