auth_test.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #!/usr/bin/env vpython3
  2. # Copyright (c) 2017 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit Tests for auth.py"""
  6. import calendar
  7. import datetime
  8. import json
  9. import os
  10. import unittest
  11. import sys
  12. from unittest import mock
  13. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. import auth
  15. import subprocess2
  16. NOW = datetime.datetime(2019, 10, 17, 12, 30, 59, 0)
  17. VALID_EXPIRY = NOW + datetime.timedelta(seconds=31)
  18. class AuthenticatorTest(unittest.TestCase):
  19. def setUp(self):
  20. mock.patch('subprocess2.check_call').start()
  21. mock.patch('subprocess2.check_call_out').start()
  22. mock.patch('auth.datetime_now', return_value=NOW).start()
  23. self.addCleanup(mock.patch.stopall)
  24. def testHasCachedCredentials_NotLoggedIn(self):
  25. subprocess2.check_call_out.side_effect = [
  26. subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
  27. 'stderr')
  28. ]
  29. self.assertFalse(auth.Authenticator().has_cached_credentials())
  30. def testHasCachedCredentials_LoggedIn(self):
  31. subprocess2.check_call_out.return_value = (json.dumps({
  32. 'token': 'token',
  33. 'expiry': 12345678
  34. }), '')
  35. self.assertTrue(auth.Authenticator().has_cached_credentials())
  36. def testGetAccessToken_NotLoggedIn(self):
  37. subprocess2.check_call_out.side_effect = [
  38. subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'stdout',
  39. 'stderr')
  40. ]
  41. self.assertRaises(auth.LoginRequiredError,
  42. auth.Authenticator().get_access_token)
  43. def testGetAccessToken_CachedToken(self):
  44. authenticator = auth.Authenticator()
  45. authenticator._access_token = auth.AccessToken('token', None)
  46. self.assertEqual(auth.AccessToken('token', None),
  47. authenticator.get_access_token())
  48. subprocess2.check_call_out.assert_not_called()
  49. def testGetAccesstoken_LoggedIn(self):
  50. expiry = calendar.timegm(VALID_EXPIRY.timetuple())
  51. subprocess2.check_call_out.return_value = (json.dumps({
  52. 'token': 'token',
  53. 'expiry': expiry
  54. }), '')
  55. self.assertEqual(auth.AccessToken('token', VALID_EXPIRY),
  56. auth.Authenticator().get_access_token())
  57. subprocess2.check_call_out.assert_called_with([
  58. 'luci-auth', 'token', '-scopes', auth.OAUTH_SCOPE_EMAIL,
  59. '-json-output', '-'
  60. ],
  61. stdout=subprocess2.PIPE,
  62. stderr=subprocess2.PIPE)
  63. def testGetAccessToken_DifferentScope(self):
  64. expiry = calendar.timegm(VALID_EXPIRY.timetuple())
  65. subprocess2.check_call_out.return_value = (json.dumps({
  66. 'token': 'token',
  67. 'expiry': expiry
  68. }), '')
  69. self.assertEqual(auth.AccessToken('token', VALID_EXPIRY),
  70. auth.Authenticator('custom scopes').get_access_token())
  71. subprocess2.check_call_out.assert_called_with([
  72. 'luci-auth', 'token', '-scopes', 'custom scopes', '-json-output',
  73. '-'
  74. ],
  75. stdout=subprocess2.PIPE,
  76. stderr=subprocess2.PIPE)
  77. def testAuthorize(self):
  78. http = mock.Mock()
  79. http_request = http.request
  80. http_request.__name__ = '__name__'
  81. authenticator = auth.Authenticator()
  82. authenticator._access_token = auth.AccessToken('token', None)
  83. authorized = authenticator.authorize(http)
  84. authorized.request('https://example.com',
  85. method='POST',
  86. body='body',
  87. headers={'header': 'value'})
  88. http_request.assert_called_once_with('https://example.com', 'POST',
  89. 'body', {
  90. 'header': 'value',
  91. 'Authorization': 'Bearer token'
  92. }, mock.ANY, mock.ANY)
  93. class AccessTokenTest(unittest.TestCase):
  94. def setUp(self):
  95. mock.patch('auth.datetime_now', return_value=NOW).start()
  96. self.addCleanup(mock.patch.stopall)
  97. def testNeedsRefresh_NoExpiry(self):
  98. self.assertFalse(auth.AccessToken('token', None).needs_refresh())
  99. def testNeedsRefresh_Expired(self):
  100. expired = NOW + datetime.timedelta(seconds=30)
  101. self.assertTrue(auth.AccessToken('token', expired).needs_refresh())
  102. def testNeedsRefresh_Valid(self):
  103. self.assertFalse(
  104. auth.AccessToken('token', VALID_EXPIRY).needs_refresh())
  105. class HasLuciContextLocalAuthTest(unittest.TestCase):
  106. def setUp(self):
  107. mock.patch('os.environ').start()
  108. mock.patch('builtins.open', mock.mock_open()).start()
  109. self.addCleanup(mock.patch.stopall)
  110. def testNoLuciContextEnvVar(self):
  111. os.environ = {}
  112. self.assertFalse(auth.has_luci_context_local_auth())
  113. def testNonexistentPath(self):
  114. os.environ = {'LUCI_CONTEXT': 'path'}
  115. open.side_effect = OSError
  116. self.assertFalse(auth.has_luci_context_local_auth())
  117. open.assert_called_with('path')
  118. def testInvalidJsonFile(self):
  119. os.environ = {'LUCI_CONTEXT': 'path'}
  120. open().read.return_value = 'not-a-json-file'
  121. self.assertFalse(auth.has_luci_context_local_auth())
  122. open.assert_called_with('path')
  123. def testNoLocalAuth(self):
  124. os.environ = {'LUCI_CONTEXT': 'path'}
  125. open().read.return_value = '{}'
  126. self.assertFalse(auth.has_luci_context_local_auth())
  127. open.assert_called_with('path')
  128. def testNoDefaultAccountId(self):
  129. os.environ = {'LUCI_CONTEXT': 'path'}
  130. open().read.return_value = json.dumps({
  131. 'local_auth': {
  132. 'secret':
  133. 'secret',
  134. 'accounts': [{
  135. 'email': 'bots@account.iam.gserviceaccount.com',
  136. 'id': 'system',
  137. }],
  138. 'rpc_port':
  139. 1234,
  140. }
  141. })
  142. self.assertFalse(auth.has_luci_context_local_auth())
  143. open.assert_called_with('path')
  144. def testHasLocalAuth(self):
  145. os.environ = {'LUCI_CONTEXT': 'path'}
  146. open().read.return_value = json.dumps({
  147. 'local_auth': {
  148. 'secret':
  149. 'secret',
  150. 'accounts': [
  151. {
  152. 'email': 'bots@account.iam.gserviceaccount.com',
  153. 'id': 'system',
  154. },
  155. {
  156. 'email': 'builder@account.iam.gserviceaccount.com',
  157. 'id': 'task',
  158. },
  159. ],
  160. 'rpc_port':
  161. 1234,
  162. 'default_account_id':
  163. 'task',
  164. },
  165. })
  166. self.assertTrue(auth.has_luci_context_local_auth())
  167. open.assert_called_with('path')
  168. if __name__ == '__main__':
  169. if '-v' in sys.argv:
  170. logging.basicConfig(level=logging.DEBUG)
  171. unittest.main()