"""test OAuth Source""" from os.path import abspath from sys import platform from time import sleep from typing import Any, Optional from unittest.case import skipUnless from unittest.mock import Mock, patch from django.test import override_settings from docker.models.containers import Container from docker.types import Healthcheck from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait from structlog.stdlib import get_logger from yaml import safe_dump from authentik.core.models import User from authentik.flows.models import Flow from authentik.providers.oauth2.generators import generate_client_id, generate_client_secret from authentik.sources.oauth.models import OAuthSource from authentik.sources.oauth.types.manager import SourceType from authentik.sources.oauth.types.twitter import TwitterOAuthCallback from authentik.stages.identification.models import IdentificationStage from tests.e2e.utils import SeleniumTestCase, apply_migration, object_manager, retry CONFIG_PATH = "/tmp/dex.yml" # nosec LOGGER = get_logger() class OAUth1Type(SourceType): """Twitter Type definition""" callback_view = TwitterOAuthCallback name = "Twitter" slug = "twitter" request_token_url = "http://localhost:5000/oauth/request_token" # nosec access_token_url = "http://localhost:5000/oauth/access_token" # nosec authorization_url = "http://localhost:5000/oauth/authorize" profile_url = "http://localhost:5000/api/me" urls_customizable = False SOURCE_TYPE_MOCK = Mock(return_value=OAUth1Type()) @skipUnless(platform.startswith("linux"), "requires local docker") class TestSourceOAuth2(SeleniumTestCase): """test OAuth Source flow""" container: Container def setUp(self): self.client_secret = generate_client_secret() self.prepare_dex_config() super().setUp() def prepare_dex_config(self): """Since Dex does not document which environment variables can be used to configure clients""" config = { "enablePasswordDB": True, "issuer": "http://127.0.0.1:5556/dex", "logger": {"level": "debug"}, "staticClients": [ { "id": "example-app", "name": "Example App", "redirectURIs": [ self.url( "authentik_sources_oauth:oauth-client-callback", source_slug="dex", ) ], "secret": self.client_secret, } ], "staticPasswords": [ { "email": "admin@example.com", # hash for password "hash": "$2a$10$2b2cU8CPhOTaGrs1HRQuAueS7JTT5ZHsHSzYiFPm1leZck7Mc8T4W", "userID": "08a8684b-db88-4b73-90a9-3cd1661f5466", "username": "admin", } ], "storage": {"config": {"file": "/tmp/dex.db"}, "type": "sqlite3"}, # nosec "web": {"http": "0.0.0.0:5556"}, } with open(CONFIG_PATH, "w+") as _file: safe_dump(config, _file) def get_container_specs(self) -> Optional[dict[str, Any]]: return { "image": "ghcr.io/dexidp/dex:v2.28.1", "detach": True, "network_mode": "host", "auto_remove": True, "command": "dex serve /config.yml", "healthcheck": Healthcheck( test=["CMD", "wget", "--spider", "http://localhost:5556/dex/healthz"], interval=5 * 100 * 1000000, start_period=1 * 100 * 1000000, ), "volumes": {abspath(CONFIG_PATH): {"bind": "/config.yml", "mode": "ro"}}, } def create_objects(self): """Create required objects""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") source = OAuthSource.objects.create( # nosec name="dex", slug="dex", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, provider_type="openidconnect", authorization_url="http://127.0.0.1:5556/dex/auth", access_token_url="http://127.0.0.1:5556/dex/token", profile_url="http://127.0.0.1:5556/dex/userinfo", consumer_key="example-app", consumer_secret=self.client_secret, ) ident_stage = IdentificationStage.objects.first() ident_stage.sources.set([source]) ident_stage.save() @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager def test_oauth_enroll(self): """test OAuth Source With With OIDC""" self.create_objects() self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.ID, "login"))) self.driver.find_element(By.ID, "login").send_keys("admin@example.com") self.driver.find_element(By.ID, "password").send_keys("password") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]"))) self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() # At this point we've been redirected back # and we're asked for the username flow_executor = self.get_shadow_root("ak-flow-executor") prompt_stage = self.get_shadow_root("ak-stage-prompt", flow_executor) prompt_stage.find_element(By.CSS_SELECTOR, "input[name=username]").click() prompt_stage.find_element(By.CSS_SELECTOR, "input[name=username]").send_keys("foo") prompt_stage.find_element(By.CSS_SELECTOR, "input[name=username]").send_keys(Keys.ENTER) # Wait until we've logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.if_admin_url("/user")) self.assert_user(User(username="foo", name="admin", email="admin@example.com")) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager @override_settings(SESSION_COOKIE_SAMESITE="strict") def test_oauth_samesite_strict(self): """test OAuth Source With SameSite set to strict (=will fail because session is not carried over)""" self.create_objects() self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.ID, "login"))) self.driver.find_element(By.ID, "login").send_keys("admin@example.com") self.driver.find_element(By.ID, "password").send_keys("password") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]"))) self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager def test_oauth_enroll_auth(self): """test OAuth Source With With OIDC (enroll and authenticate again)""" self.test_oauth_enroll() # We're logged in at the end of this, log out and re-login self.driver.get(self.url("authentik_flows:default-invalidation")) sleep(1) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.ID, "login"))) self.driver.find_element(By.ID, "login").send_keys("admin@example.com") self.driver.find_element(By.ID, "password").send_keys("password") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]"))) self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() # Wait until we've logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.if_admin_url("/user")) self.assert_user(User(username="foo", name="admin", email="admin@example.com")) @skipUnless(platform.startswith("linux"), "requires local docker") class TestSourceOAuth1(SeleniumTestCase): """Test OAuth1 Source""" def setUp(self) -> None: self.client_id = generate_client_id() self.client_secret = generate_client_secret() self.source_slug = "oauth1-test" super().setUp() def get_container_specs(self) -> Optional[dict[str, Any]]: return { "image": "ghcr.io/beryju/oauth1-test-server:latest", "detach": True, "network_mode": "host", "auto_remove": True, "environment": { "OAUTH1_CLIENT_ID": self.client_id, "OAUTH1_CLIENT_SECRET": self.client_secret, "OAUTH1_REDIRECT_URI": ( self.url( "authentik_sources_oauth:oauth-client-callback", source_slug=self.source_slug, ) ), }, } def create_objects(self): """Create required objects""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") source = OAuthSource.objects.create( # nosec name="oauth1", slug=self.source_slug, authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, provider_type="twitter", consumer_key=self.client_id, consumer_secret=self.client_secret, ) ident_stage = IdentificationStage.objects.first() ident_stage.sources.set([source]) ident_stage.save() @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @patch( "authentik.sources.oauth.types.manager.SourceTypeManager.find_type", SOURCE_TYPE_MOCK, ) @object_manager def test_oauth_enroll(self): """test OAuth Source With With OIDC""" self.create_objects() self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.NAME, "username"))) self.driver.find_element(By.NAME, "username").send_keys("example-user") self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER) sleep(2) # Wait until we're logged in self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']"))) self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click() # Wait until we've loaded the user info page sleep(2) # Wait until we've logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.if_admin_url("/user")) self.assert_user(User(username="example-user", name="test name", email="foo@example.com"))