From 66f4a31b4c4256eab0d071dafc6d66b084a0261b Mon Sep 17 00:00:00 2001 From: Jens L Date: Wed, 8 Jun 2022 20:50:48 +0200 Subject: [PATCH] stages/authenticator_validate: add webauthn tests (#3069) Signed-off-by: Jens Langhammer --- authentik/root/test_runner.py | 1 + .../authenticator_validate/challenge.py | 2 +- .../tests/test_webauthn.py | 203 +++++++++++++++++- .../stages/authenticator_webauthn/stage.py | 2 +- .../stages/authenticator_webauthn/tests.py | 87 +++++++- 5 files changed, 282 insertions(+), 13 deletions(-) diff --git a/authentik/root/test_runner.py b/authentik/root/test_runner.py index 806c3c0b6..9cbbf7bb3 100644 --- a/authentik/root/test_runner.py +++ b/authentik/root/test_runner.py @@ -43,6 +43,7 @@ class PytestTestRunner: # pragma: no cover def add_arguments(cls, parser: ArgumentParser): """Add more pytest-specific arguments""" parser.add_argument("--randomly-seed", type=int) + parser.add_argument("--keepdb", action="store_true") def run_tests(self, test_labels): """Run pytest and return the exitcode. diff --git a/authentik/stages/authenticator_validate/challenge.py b/authentik/stages/authenticator_validate/challenge.py index c747be7a4..2ee28d96e 100644 --- a/authentik/stages/authenticator_validate/challenge.py +++ b/authentik/stages/authenticator_validate/challenge.py @@ -120,7 +120,7 @@ def validate_challenge_webauthn(data: dict, stage_view: StageView, user: User) - device = WebAuthnDevice.objects.filter(credential_id=credential_id).first() if not device: - raise Http404() + raise ValidationError("Invalid device") try: authentication_verification = verify_authentication_response( diff --git a/authentik/stages/authenticator_validate/tests/test_webauthn.py b/authentik/stages/authenticator_validate/tests/test_webauthn.py index d13c04035..ad1ac7fce 100644 --- a/authentik/stages/authenticator_validate/tests/test_webauthn.py +++ b/authentik/stages/authenticator_validate/tests/test_webauthn.py @@ -1,12 +1,12 @@ """Test validator stage""" from time import sleep -from django.http import Http404 from django.test.client import RequestFactory from django.urls.base import reverse -from webauthn.helpers import bytes_to_base64url +from rest_framework.serializers import ValidationError +from webauthn.helpers import base64url_to_bytes, bytes_to_base64url -from authentik.core.tests.utils import create_test_admin_user +from authentik.core.tests.utils import create_test_admin_user, create_test_flow from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction from authentik.flows.stage import StageView from authentik.flows.tests import FlowTestCase @@ -15,10 +15,13 @@ from authentik.lib.generators import generate_id from authentik.lib.tests.utils import get_request from authentik.stages.authenticator_validate.challenge import ( get_challenge_for_device, + get_webauthn_challenge_without_user, validate_challenge_webauthn, ) from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses +from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView from authentik.stages.authenticator_webauthn.models import WebAuthnDevice +from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE from authentik.stages.identification.models import IdentificationStage, UserFields @@ -104,7 +107,199 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase): }, ) - with self.assertRaises(Http404): + with self.assertRaises(ValidationError): validate_challenge_webauthn( {}, StageView(FlowExecutorView(current_stage=stage), request=request), self.user ) + + def test_get_challenge(self): + """Test webauthn""" + request = get_request("/") + request.user = self.user + + webauthn_device = WebAuthnDevice.objects.create( + user=self.user, + public_key=( + "pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J" + "H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU" + ), + credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + sign_count=0, + rp_id=generate_id(), + ) + challenge = get_challenge_for_device(request, webauthn_device) + webauthn_challenge = request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] + self.assertEqual( + challenge, + { + "allowCredentials": [ + { + "id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + "type": "public-key", + } + ], + "challenge": bytes_to_base64url(webauthn_challenge), + "rpId": "testserver", + "timeout": 60000, + "userVerification": "preferred", + }, + ) + + def test_get_challenge_userless(self): + """Test webauthn (userless)""" + request = get_request("/") + + WebAuthnDevice.objects.create( + user=self.user, + public_key=( + "pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J" + "H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU" + ), + credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + sign_count=0, + rp_id=generate_id(), + ) + challenge = get_webauthn_challenge_without_user(request) + webauthn_challenge = request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] + self.assertEqual( + challenge, + { + "allowCredentials": [], + "challenge": bytes_to_base64url(webauthn_challenge), + "rpId": "testserver", + "timeout": 60000, + "userVerification": "preferred", + }, + ) + + def test_validate_challenge(self): + """Test webauthn""" + request = get_request("/") + request.user = self.user + + WebAuthnDevice.objects.create( + user=self.user, + public_key=( + "pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J" + "H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU" + ), + credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + sign_count=4, + rp_id=generate_id(), + ) + flow = create_test_flow() + stage = AuthenticatorValidateStage.objects.create( + name=generate_id(), + not_configured_action=NotConfiguredAction.CONFIGURE, + device_classes=[DeviceClasses.WEBAUTHN], + ) + stage_view = AuthenticatorValidateStageView( + FlowExecutorView(flow=flow, current_stage=stage), request=request + ) + request = get_request("/") + request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes( + ( + "g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1" + "jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA" + ) + ) + request.session.save() + + stage_view = AuthenticatorValidateStageView( + FlowExecutorView(flow=flow, current_stage=stage), request=request + ) + request.META["SERVER_NAME"] = "localhost" + request.META["SERVER_PORT"] = "9000" + validate_challenge_webauthn( + { + "id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + "rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + "type": "public-key", + "assertionClientExtensions": "{}", + "response": { + "clientDataJSON": ( + "eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4TGZo" + "ckQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdoLUJxYTU1" + "NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDAiLCJjcm9z" + "c09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hlcmUiOiJkbyBub3Qg" + "Y29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxhdGUuIFNlZSBodHRwczov" + "L2dvby5nbC95YWJQZXgifQ==", + ), + "signature": ( + "MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI" + "AiAqtOK6mIZPk62kZN0OzFsHfuvu_RlOl7zlqSNzDdz_Ag==" + ), + "authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABQ==", + "userHandle": None, + }, + }, + stage_view, + self.user, + ) + + def test_validate_challenge_invalid(self): + """Test webauthn""" + request = get_request("/") + request.user = self.user + + WebAuthnDevice.objects.create( + user=self.user, + public_key=( + "pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc4" + "3i0JH6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU" + ), + credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + # One more sign count than above, make it invalid + sign_count=5, + rp_id=generate_id(), + ) + flow = create_test_flow() + stage = AuthenticatorValidateStage.objects.create( + name=generate_id(), + not_configured_action=NotConfiguredAction.CONFIGURE, + device_classes=[DeviceClasses.WEBAUTHN], + ) + stage_view = AuthenticatorValidateStageView( + FlowExecutorView(flow=flow, current_stage=stage), request=request + ) + request = get_request("/") + request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes( + ( + "g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1j" + "wSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA" + ) + ) + request.session.save() + + stage_view = AuthenticatorValidateStageView( + FlowExecutorView(flow=flow, current_stage=stage), request=request + ) + request.META["SERVER_NAME"] = "localhost" + request.META["SERVER_PORT"] = "9000" + with self.assertRaises(ValidationError): + validate_challenge_webauthn( + { + "id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + "rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", + "type": "public-key", + "assertionClientExtensions": "{}", + "response": { + "clientDataJSON": ( + "eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4" + "TGZockQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdo" + "LUJxYTU1NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0Ojkw" + "MDAiLCJjcm9zc09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hl" + "cmUiOiJkbyBub3QgY29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxh" + "dGUuIFNlZSBodHRwczovL2dvby5nbC95YWJQZXgifQ==" + ), + "signature": ( + "MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI" + "AiAqtOK6mIZPk62kZN0OzFsHfuvu_RlOl7zlqSNzDdz_Ag==" + ), + "authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABQ==", + "userHandle": None, + }, + }, + stage_view, + self.user, + ) diff --git a/authentik/stages/authenticator_webauthn/stage.py b/authentik/stages/authenticator_webauthn/stage.py index f8fba9ffe..dfac992df 100644 --- a/authentik/stages/authenticator_webauthn/stage.py +++ b/authentik/stages/authenticator_webauthn/stage.py @@ -144,4 +144,4 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView): return self.executor.stage_ok() def cleanup(self): - self.request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE) + self.request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE, None) diff --git a/authentik/stages/authenticator_webauthn/tests.py b/authentik/stages/authenticator_webauthn/tests.py index 9d4441bba..652a2b471 100644 --- a/authentik/stages/authenticator_webauthn/tests.py +++ b/authentik/stages/authenticator_webauthn/tests.py @@ -1,20 +1,93 @@ """Test WebAuthn API""" +from base64 import b64decode + from django.urls import reverse -from rest_framework.test import APITestCase +from webauthn.helpers import bytes_to_base64url -from authentik.core.models import User -from authentik.stages.authenticator_webauthn.models import WebAuthnDevice +from authentik.core.tests.utils import create_test_admin_user, create_test_flow +from authentik.flows.markers import StageMarker +from authentik.flows.models import FlowStageBinding +from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan +from authentik.flows.tests import FlowTestCase +from authentik.flows.views.executor import SESSION_KEY_PLAN +from authentik.lib.generators import generate_id +from authentik.stages.authenticator_webauthn.models import AuthenticateWebAuthnStage, WebAuthnDevice +from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE -class AuthenticatorWebAuthnStage(APITestCase): +class TestAuthenticatorWebAuthnStage(FlowTestCase): """Test WebAuthn API""" + def setUp(self) -> None: + self.stage = AuthenticateWebAuthnStage.objects.create( + name=generate_id(), + ) + self.flow = create_test_flow() + self.binding = FlowStageBinding.objects.create( + target=self.flow, + stage=self.stage, + order=0, + ) + self.user = create_test_admin_user() + def test_api_delete(self): """Test api delete""" - user = User.objects.create(username="foo") - self.client.force_login(user) - dev = WebAuthnDevice.objects.create(user=user) + self.client.force_login(self.user) + dev = WebAuthnDevice.objects.create(user=self.user) response = self.client.delete( reverse("authentik_api:webauthndevice-detail", kwargs={"pk": dev.pk}) ) self.assertEqual(response.status_code, 204) + + def test_registration_options(self): + """Test registration options""" + plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) + plan.context[PLAN_CONTEXT_PENDING_USER] = self.user + session = self.client.session + session[SESSION_KEY_PLAN] = plan + session[SESSION_KEY_WEBAUTHN_CHALLENGE] = b64decode( + ( + "o90Yh1osqW3mjGift+6WclWOya5lcdff/G0mqueN3hChacMUz" + "V4mxiDafuQ0x0e1d/fcPai0fx/jMBZ8/nG2qQ==" + ).encode() + ) + session.save() + + response = self.client.get( + reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), + ) + self.assertEqual(response.status_code, 200) + session = self.client.session + self.assertStageResponse( + response, + self.flow, + self.user, + registration={ + "rp": {"name": "authentik", "id": "testserver"}, + "user": { + "id": bytes_to_base64url(self.user.uid.encode("utf-8")), + "name": self.user.username, + "displayName": self.user.name, + }, + "challenge": bytes_to_base64url(session[SESSION_KEY_WEBAUTHN_CHALLENGE]), + "pubKeyCredParams": [ + {"type": "public-key", "alg": -7}, + {"type": "public-key", "alg": -8}, + {"type": "public-key", "alg": -36}, + {"type": "public-key", "alg": -37}, + {"type": "public-key", "alg": -38}, + {"type": "public-key", "alg": -39}, + {"type": "public-key", "alg": -257}, + {"type": "public-key", "alg": -258}, + {"type": "public-key", "alg": -259}, + ], + "timeout": 60000, + "excludeCredentials": [], + "authenticatorSelection": { + "residentKey": "preferred", + "requireResidentKey": False, + "userVerification": "preferred", + }, + "attestation": "none", + }, + )