diff --git a/tests/e2e/test_provider_oauth2_oidc_implicit.py b/tests/e2e/test_provider_oauth2_oidc_implicit.py new file mode 100644 index 000000000..52bc5664b --- /dev/null +++ b/tests/e2e/test_provider_oauth2_oidc_implicit.py @@ -0,0 +1,273 @@ +"""test OAuth2 OpenID Provider flow""" +from json import loads +from sys import platform +from time import sleep +from unittest.case import skipUnless + +from docker import DockerClient, from_env +from docker.models.containers import Container +from docker.types import Healthcheck +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as ec +from structlog.stdlib import get_logger + +from authentik.core.models import Application +from authentik.crypto.models import CertificateKeyPair +from authentik.flows.models import Flow +from authentik.policies.expression.models import ExpressionPolicy +from authentik.policies.models import PolicyBinding +from authentik.providers.oauth2.constants import ( + SCOPE_OPENID, + SCOPE_OPENID_EMAIL, + SCOPE_OPENID_PROFILE, +) +from authentik.providers.oauth2.generators import ( + generate_client_id, + generate_client_secret, +) +from authentik.providers.oauth2.models import ClientTypes, OAuth2Provider, ScopeMapping +from tests.e2e.utils import ( + USER, + SeleniumTestCase, + apply_migration, + object_manager, + retry, +) + +LOGGER = get_logger() + + +@skipUnless(platform.startswith("linux"), "requires local docker") +class TestProviderOAuth2OIDCImplicit(SeleniumTestCase): + """test OAuth with OpenID Provider flow""" + + def setUp(self): + self.client_id = generate_client_id() + self.client_secret = generate_client_secret() + self.application_slug = "test" + super().setUp() + + def setup_client(self) -> Container: + """Setup client saml-sp container which we test SAML against""" + sleep(1) + client: DockerClient = from_env() + container = client.containers.run( + image="beryju/oidc-test-client", + detach=True, + network_mode="host", + auto_remove=True, + healthcheck=Healthcheck( + test=["CMD", "wget", "--spider", "http://localhost:9009/health"], + interval=5 * 100 * 1000000, + start_period=1 * 100 * 1000000, + ), + environment={ + "OIDC_CLIENT_ID": self.client_id, + "OIDC_CLIENT_SECRET": self.client_secret, + "OIDC_PROVIDER": f"{self.live_server_url}/application/o/{self.application_slug}/", + }, + ) + while True: + container.reload() + status = container.attrs.get("State", {}).get("Health", {}).get("Status") + if status == "healthy": + return container + LOGGER.info("Container failed healthcheck") + sleep(1) + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @apply_migration("authentik_flows", "0010_provider_flows") + @apply_migration("authentik_crypto", "0002_create_self_signed_kp") + def test_redirect_uri_error(self): + """test OpenID Provider flow (invalid redirect URI, check error message)""" + sleep(1) + # Bootstrap all needed objects + authorization_flow = Flow.objects.get( + slug="default-provider-authorization-implicit-consent" + ) + provider = OAuth2Provider.objects.create( + name=self.application_slug, + client_type=ClientTypes.CONFIDENTIAL, + client_id=self.client_id, + client_secret=self.client_secret, + rsa_key=CertificateKeyPair.objects.first(), + redirect_uris="http://localhost:9009/", + authorization_flow=authorization_flow, + ) + provider.property_mappings.set( + ScopeMapping.objects.filter( + scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE] + ) + ) + provider.save() + Application.objects.create( + name=self.application_slug, + slug=self.application_slug, + provider=provider, + ) + self.container = self.setup_client() + + self.driver.get("http://localhost:9009/implicit/") + sleep(2) + self.assertEqual( + self.driver.find_element(By.CLASS_NAME, "pf-c-title").text, + "Redirect URI Error", + ) + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @apply_migration("authentik_flows", "0010_provider_flows") + @apply_migration("authentik_crypto", "0002_create_self_signed_kp") + @object_manager + def test_authorization_consent_implied(self): + """test OpenID Provider flow (default authorization flow with implied consent)""" + sleep(1) + # Bootstrap all needed objects + authorization_flow = Flow.objects.get( + slug="default-provider-authorization-implicit-consent" + ) + provider = OAuth2Provider.objects.create( + name=self.application_slug, + client_type=ClientTypes.CONFIDENTIAL, + client_id=self.client_id, + client_secret=self.client_secret, + rsa_key=CertificateKeyPair.objects.first(), + redirect_uris="http://localhost:9009/implicit/", + authorization_flow=authorization_flow, + ) + provider.property_mappings.set( + ScopeMapping.objects.filter( + scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE] + ) + ) + provider.save() + Application.objects.create( + name=self.application_slug, + slug=self.application_slug, + provider=provider, + ) + self.container = self.setup_client() + + self.driver.get("http://localhost:9009/implicit/") + self.login() + self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre"))) + sleep(1) + body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text) + print(body) + self.assertEqual(body["profile"]["nickname"], USER().username) + self.assertEqual(body["profile"]["name"], USER().name) + self.assertEqual(body["profile"]["email"], USER().email) + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @apply_migration("authentik_flows", "0010_provider_flows") + @apply_migration("authentik_crypto", "0002_create_self_signed_kp") + @object_manager + def test_authorization_consent_explicit(self): + """test OpenID Provider flow (default authorization flow with explicit consent)""" + sleep(1) + # Bootstrap all needed objects + authorization_flow = Flow.objects.get( + slug="default-provider-authorization-explicit-consent" + ) + provider = OAuth2Provider.objects.create( + name=self.application_slug, + authorization_flow=authorization_flow, + client_type=ClientTypes.CONFIDENTIAL, + client_id=self.client_id, + client_secret=self.client_secret, + rsa_key=CertificateKeyPair.objects.first(), + redirect_uris="http://localhost:9009/implicit/", + ) + provider.property_mappings.set( + ScopeMapping.objects.filter( + scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE] + ) + ) + provider.save() + app = Application.objects.create( + name=self.application_slug, + slug=self.application_slug, + provider=provider, + ) + self.container = self.setup_client() + + self.driver.get("http://localhost:9009/implicit/") + self.login() + + self.wait.until( + ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor")) + ) + + flow_executor = self.get_shadow_root("ak-flow-executor") + consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor) + + self.assertIn( + app.name, + consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text, + ) + consent_stage.find_element( + By.CSS_SELECTOR, + ("[type=submit]"), + ).click() + + self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre"))) + sleep(1) + body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text) + + self.assertEqual(body["profile"]["nickname"], USER().username) + self.assertEqual(body["profile"]["name"], USER().name) + self.assertEqual(body["profile"]["email"], USER().email) + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @apply_migration("authentik_flows", "0010_provider_flows") + @apply_migration("authentik_crypto", "0002_create_self_signed_kp") + def test_authorization_denied(self): + """test OpenID Provider flow (default authorization with access deny)""" + sleep(1) + # Bootstrap all needed objects + authorization_flow = Flow.objects.get( + slug="default-provider-authorization-explicit-consent" + ) + provider = OAuth2Provider.objects.create( + name=self.application_slug, + authorization_flow=authorization_flow, + client_type=ClientTypes.CONFIDENTIAL, + client_id=self.client_id, + client_secret=self.client_secret, + rsa_key=CertificateKeyPair.objects.first(), + redirect_uris="http://localhost:9009/implicit/", + ) + provider.property_mappings.set( + ScopeMapping.objects.filter( + scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE] + ) + ) + provider.save() + app = Application.objects.create( + name=self.application_slug, + slug=self.application_slug, + provider=provider, + ) + + negative_policy = ExpressionPolicy.objects.create( + name="negative-static", expression="return False" + ) + PolicyBinding.objects.create(target=app, policy=negative_policy, order=0) + + self.container = self.setup_client() + self.driver.get("http://localhost:9009/implicit/") + self.login() + self.wait.until( + ec.presence_of_element_located((By.CSS_SELECTOR, "header > h1")) + ) + self.assertEqual( + self.driver.find_element(By.CSS_SELECTOR, "header > h1").text, + "Permission denied", + )