|
@@ -30,7 +30,8 @@ import urllib.parse
|
|
from dataclasses import dataclass
|
|
from dataclasses import dataclass
|
|
from io import StringIO
|
|
from io import StringIO
|
|
from multiprocessing.pool import ThreadPool
|
|
from multiprocessing.pool import ThreadPool
|
|
-from typing import Any, Container, Dict, List, Optional
|
|
|
|
|
|
+from typing import Any, Container, Dict, Mapping
|
|
|
|
+from typing import NamedTuple, List, Optional
|
|
from typing import Tuple, TypedDict, cast
|
|
from typing import Tuple, TypedDict, cast
|
|
|
|
|
|
import httplib2
|
|
import httplib2
|
|
@@ -174,49 +175,52 @@ class SSOHelper(object):
|
|
ssoHelper = SSOHelper()
|
|
ssoHelper = SSOHelper()
|
|
|
|
|
|
|
|
|
|
-@functools.lru_cache(maxsize=None)
|
|
|
|
def ShouldUseSSO(host: str, email: str) -> bool:
|
|
def ShouldUseSSO(host: str, email: str) -> bool:
|
|
"""Return True if we should use SSO for the given Gerrit host and user."""
|
|
"""Return True if we should use SSO for the given Gerrit host and user."""
|
|
LOGGER.debug("Determining whether we should use SSO...")
|
|
LOGGER.debug("Determining whether we should use SSO...")
|
|
|
|
+ result = CheckShouldUseSSO(host, email)
|
|
|
|
+ LOGGER.debug("SSO=%r: %s", result.status, result.reason)
|
|
|
|
+ return result.status
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class SSOCheckResult(NamedTuple):
|
|
|
|
+ status: bool
|
|
|
|
+ reason: str
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@functools.lru_cache(maxsize=None)
|
|
|
|
+def CheckShouldUseSSO(host: str, email: str) -> SSOCheckResult:
|
|
|
|
+ """Checks if we should use SSO for the given Gerrit host and user."""
|
|
if not newauth.Enabled():
|
|
if not newauth.Enabled():
|
|
- LOGGER.debug("SSO=False: not opted in")
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(False, 'not opted in')
|
|
if not host.endswith('.googlesource.com'):
|
|
if not host.endswith('.googlesource.com'):
|
|
- LOGGER.debug("SSO=False: non-googlesource host %r", host)
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(False, f'non-googlesource host {host}')
|
|
if newauth.SkipSSO():
|
|
if newauth.SkipSSO():
|
|
- LOGGER.debug("SSO=False: set skip SSO config")
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(False, 'skip SSO is set in config')
|
|
if not ssoHelper.find_cmd():
|
|
if not ssoHelper.find_cmd():
|
|
- LOGGER.debug("SSO=False: no SSO command")
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(False, 'no SSO command')
|
|
if gclient_utils.IsEnvCog():
|
|
if gclient_utils.IsEnvCog():
|
|
- LOGGER.debug("SSO=True: in Cog")
|
|
|
|
- return True
|
|
|
|
|
|
+ return SSOCheckResult(True, 'in Cog')
|
|
if not email:
|
|
if not email:
|
|
- LOGGER.debug(
|
|
|
|
- "SSO=True: email is empty or missing (and SSO command available)")
|
|
|
|
- return True
|
|
|
|
|
|
+ return SSOCheckResult(
|
|
|
|
+ True, 'email is empty or missing (and SSO command available)')
|
|
if email.endswith('@google.com'):
|
|
if email.endswith('@google.com'):
|
|
- LOGGER.debug("SSO=True: email is google.com")
|
|
|
|
- return True
|
|
|
|
|
|
+ return SSOCheckResult(True, 'email is @google.com')
|
|
if not email.endswith('@chromium.org'):
|
|
if not email.endswith('@chromium.org'):
|
|
- LOGGER.debug("SSO=False: not chromium.org")
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(False, 'email is not @chromium.org')
|
|
authenticator = SSOAuthenticator()
|
|
authenticator = SSOAuthenticator()
|
|
|
|
+ records: list[EmailRecord] = []
|
|
try:
|
|
try:
|
|
- records = GetAccountEmails(host, 'self', authenticator=authenticator)
|
|
|
|
|
|
+ records = GetAccountEmails(host, 'self',
|
|
|
|
+ authenticator=authenticator) or []
|
|
except GerritError as e:
|
|
except GerritError as e:
|
|
if e.http_status == 400:
|
|
if e.http_status == 400:
|
|
# This is likely because the user doesn't have an account on the Gerrit host.
|
|
# This is likely because the user doesn't have an account on the Gerrit host.
|
|
- LOGGER.debug("SSO=False: get account emails returned 400")
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(False, 'account missing onn Gerrit host')
|
|
raise
|
|
raise
|
|
if any(email == r['email'] for r in records):
|
|
if any(email == r['email'] for r in records):
|
|
- LOGGER.debug("SSO=True: email is linked to google.com")
|
|
|
|
- return True
|
|
|
|
- LOGGER.debug("SSO=False: unlinked chromium.org")
|
|
|
|
- return False
|
|
|
|
|
|
+ return SSOCheckResult(True, 'email is linked to @google.com email')
|
|
|
|
+ return SSOCheckResult(False, 'email is not linked to @google.com email')
|
|
|
|
|
|
|
|
|
|
class _Authenticator(object):
|
|
class _Authenticator(object):
|