auth_test.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. #!/usr/bin/env vpython3
  2. # Copyright (c) 2017 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit Tests for auth.py"""
  6. import calendar
  7. import datetime
  8. import json
  9. import os
  10. import unittest
  11. import sys
  12. from unittest import mock
  13. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. import auth
  15. import subprocess2
  16. NOW = datetime.datetime(2019, 10, 17, 12, 30, 59, 0)
  17. VALID_EXPIRY = NOW + datetime.timedelta(seconds=31)
  18. class AuthenticatorTest(unittest.TestCase):
  19. def setUp(self):
  20. mock.patch('subprocess2.check_call').start()
  21. mock.patch('subprocess2.check_call_out').start()
  22. mock.patch('auth.datetime_now', return_value=NOW).start()
  23. self.addCleanup(mock.patch.stopall)
  24. def testHasCachedCredentials_NotLoggedIn(self):
  25. subprocess2.check_call_out.side_effect = [
  26. subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
  27. 'stderr')
  28. ]
  29. self.assertFalse(auth.Authenticator().has_cached_credentials())
  30. def testHasCachedCredentials_LoggedIn(self):
  31. subprocess2.check_call_out.return_value = (json.dumps({
  32. 'token': 'token',
  33. 'expiry': 12345678
  34. }), '')
  35. self.assertTrue(auth.Authenticator().has_cached_credentials())
  36. def testGetAccessToken_NotLoggedIn(self):
  37. subprocess2.check_call_out.side_effect = [
  38. subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
  39. 'stderr')
  40. ]
  41. self.assertRaises(auth.LoginRequiredError,
  42. auth.Authenticator().get_access_token)
  43. def testGetAccessToken_CachedToken(self):
  44. authenticator = auth.Authenticator()
  45. authenticator._access_token = auth.Token('token', None)
  46. self.assertEqual(auth.Token('token', None),
  47. authenticator.get_access_token())
  48. subprocess2.check_call_out.assert_not_called()
  49. def testGetAccesstoken_LoggedIn(self):
  50. expiry = calendar.timegm(VALID_EXPIRY.timetuple())
  51. subprocess2.check_call_out.return_value = (json.dumps({
  52. 'token': 'token',
  53. 'expiry': expiry
  54. }), '')
  55. self.assertEqual(auth.Token('token', VALID_EXPIRY),
  56. auth.Authenticator().get_access_token())
  57. subprocess2.check_call_out.assert_called_with([
  58. 'luci-auth', 'token', '-scopes', auth.OAUTH_SCOPE_EMAIL,
  59. '-json-output', '-'
  60. ],
  61. stdout=subprocess2.PIPE,
  62. stderr=subprocess2.PIPE)
  63. def testGetAccessToken_DifferentScope(self):
  64. expiry = calendar.timegm(VALID_EXPIRY.timetuple())
  65. subprocess2.check_call_out.return_value = (json.dumps({
  66. 'token': 'token',
  67. 'expiry': expiry
  68. }), '')
  69. self.assertEqual(auth.Token('token', VALID_EXPIRY),
  70. auth.Authenticator('custom scopes').get_access_token())
  71. subprocess2.check_call_out.assert_called_with([
  72. 'luci-auth', 'token', '-scopes', 'custom scopes', '-json-output',
  73. '-'
  74. ],
  75. stdout=subprocess2.PIPE,
  76. stderr=subprocess2.PIPE)
  77. def testAuthorize_AccessToken(self):
  78. http = mock.Mock()
  79. http_request = http.request
  80. http_request.__name__ = '__name__'
  81. authenticator = auth.Authenticator()
  82. authenticator._access_token = auth.Token('access_token', None)
  83. authenticator._id_token = auth.Token('id_token', None)
  84. authorized = authenticator.authorize(http)
  85. authorized.request('https://example.com',
  86. method='POST',
  87. body='body',
  88. headers={'header': 'value'})
  89. http_request.assert_called_once_with(
  90. 'https://example.com', 'POST', 'body', {
  91. 'header': 'value',
  92. 'Authorization': 'Bearer access_token'
  93. }, mock.ANY, mock.ANY)
  94. def testGetIdToken_NotLoggedIn(self):
  95. subprocess2.check_call_out.side_effect = [
  96. subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
  97. 'stderr')
  98. ]
  99. self.assertRaises(auth.LoginRequiredError,
  100. auth.Authenticator().get_id_token)
  101. def testGetIdToken_CachedToken(self):
  102. authenticator = auth.Authenticator()
  103. authenticator._id_token = auth.Token('token', None)
  104. self.assertEqual(auth.Token('token', None),
  105. authenticator.get_id_token())
  106. subprocess2.check_call_out.assert_not_called()
  107. def testGetIdToken_LoggedIn(self):
  108. expiry = calendar.timegm(VALID_EXPIRY.timetuple())
  109. subprocess2.check_call_out.return_value = (json.dumps({
  110. 'token': 'token',
  111. 'expiry': expiry
  112. }), '')
  113. self.assertEqual(
  114. auth.Token('token', VALID_EXPIRY),
  115. auth.Authenticator(audience='https://test.com').get_id_token())
  116. subprocess2.check_call_out.assert_called_with([
  117. 'luci-auth', 'token', '-use-id-token', '-audience',
  118. 'https://test.com', '-json-output', '-'
  119. ],
  120. stdout=subprocess2.PIPE,
  121. stderr=subprocess2.PIPE)
  122. def testAuthorize_IdToken(self):
  123. http = mock.Mock()
  124. http_request = http.request
  125. http_request.__name__ = '__name__'
  126. authenticator = auth.Authenticator()
  127. authenticator._access_token = auth.Token('access_token', None)
  128. authenticator._id_token = auth.Token('id_token', None)
  129. authorized = authenticator.authorize(http, use_id_token=True)
  130. authorized.request('https://example.com',
  131. method='POST',
  132. body='body',
  133. headers={'header': 'value'})
  134. http_request.assert_called_once_with(
  135. 'https://example.com', 'POST', 'body', {
  136. 'header': 'value',
  137. 'Authorization': 'Bearer id_token'
  138. }, mock.ANY, mock.ANY)
  139. class TokenTest(unittest.TestCase):
  140. def setUp(self):
  141. mock.patch('auth.datetime_now', return_value=NOW).start()
  142. self.addCleanup(mock.patch.stopall)
  143. def testNeedsRefresh_NoExpiry(self):
  144. self.assertFalse(auth.Token('token', None).needs_refresh())
  145. def testNeedsRefresh_Expired(self):
  146. expired = NOW + datetime.timedelta(seconds=30)
  147. self.assertTrue(auth.Token('token', expired).needs_refresh())
  148. def testNeedsRefresh_Valid(self):
  149. self.assertFalse(auth.Token('token', VALID_EXPIRY).needs_refresh())
  150. class HasLuciContextLocalAuthTest(unittest.TestCase):
  151. def setUp(self):
  152. mock.patch('os.environ').start()
  153. mock.patch('builtins.open', mock.mock_open()).start()
  154. self.addCleanup(mock.patch.stopall)
  155. def testNoLuciContextEnvVar(self):
  156. os.environ = {}
  157. self.assertFalse(auth.has_luci_context_local_auth())
  158. def testNonexistentPath(self):
  159. os.environ = {'LUCI_CONTEXT': 'path'}
  160. open.side_effect = OSError
  161. self.assertFalse(auth.has_luci_context_local_auth())
  162. open.assert_called_with('path')
  163. def testInvalidJsonFile(self):
  164. os.environ = {'LUCI_CONTEXT': 'path'}
  165. open().read.return_value = 'not-a-json-file'
  166. self.assertFalse(auth.has_luci_context_local_auth())
  167. open.assert_called_with('path')
  168. def testNoLocalAuth(self):
  169. os.environ = {'LUCI_CONTEXT': 'path'}
  170. open().read.return_value = '{}'
  171. self.assertFalse(auth.has_luci_context_local_auth())
  172. open.assert_called_with('path')
  173. def testNoDefaultAccountId(self):
  174. os.environ = {'LUCI_CONTEXT': 'path'}
  175. open().read.return_value = json.dumps({
  176. 'local_auth': {
  177. 'secret':
  178. 'secret',
  179. 'accounts': [{
  180. 'email': 'bots@account.iam.gserviceaccount.com',
  181. 'id': 'system',
  182. }],
  183. 'rpc_port':
  184. 1234,
  185. }
  186. })
  187. self.assertFalse(auth.has_luci_context_local_auth())
  188. open.assert_called_with('path')
  189. def testHasLocalAuth(self):
  190. os.environ = {'LUCI_CONTEXT': 'path'}
  191. open().read.return_value = json.dumps({
  192. 'local_auth': {
  193. 'secret':
  194. 'secret',
  195. 'accounts': [
  196. {
  197. 'email': 'bots@account.iam.gserviceaccount.com',
  198. 'id': 'system',
  199. },
  200. {
  201. 'email': 'builder@account.iam.gserviceaccount.com',
  202. 'id': 'task',
  203. },
  204. ],
  205. 'rpc_port':
  206. 1234,
  207. 'default_account_id':
  208. 'task',
  209. },
  210. })
  211. self.assertTrue(auth.has_luci_context_local_auth())
  212. open.assert_called_with('path')
  213. if __name__ == '__main__':
  214. if '-v' in sys.argv:
  215. logging.basicConfig(level=logging.DEBUG)
  216. unittest.main()