stages/authenticator_validate: add webauthn tests (#3069)

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens L 2022-06-08 20:50:48 +02:00 committed by GitHub
parent beddd6a460
commit 66f4a31b4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 282 additions and 13 deletions

View file

@ -43,6 +43,7 @@ class PytestTestRunner: # pragma: no cover
def add_arguments(cls, parser: ArgumentParser): def add_arguments(cls, parser: ArgumentParser):
"""Add more pytest-specific arguments""" """Add more pytest-specific arguments"""
parser.add_argument("--randomly-seed", type=int) parser.add_argument("--randomly-seed", type=int)
parser.add_argument("--keepdb", action="store_true")
def run_tests(self, test_labels): def run_tests(self, test_labels):
"""Run pytest and return the exitcode. """Run pytest and return the exitcode.

View file

@ -120,7 +120,7 @@ def validate_challenge_webauthn(data: dict, stage_view: StageView, user: User) -
device = WebAuthnDevice.objects.filter(credential_id=credential_id).first() device = WebAuthnDevice.objects.filter(credential_id=credential_id).first()
if not device: if not device:
raise Http404() raise ValidationError("Invalid device")
try: try:
authentication_verification = verify_authentication_response( authentication_verification = verify_authentication_response(

View file

@ -1,12 +1,12 @@
"""Test validator stage""" """Test validator stage"""
from time import sleep from time import sleep
from django.http import Http404
from django.test.client import RequestFactory from django.test.client import RequestFactory
from django.urls.base import reverse from django.urls.base import reverse
from webauthn.helpers import bytes_to_base64url from rest_framework.serializers import ValidationError
from webauthn.helpers import base64url_to_bytes, bytes_to_base64url
from authentik.core.tests.utils import create_test_admin_user from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction
from authentik.flows.stage import StageView from authentik.flows.stage import StageView
from authentik.flows.tests import FlowTestCase from authentik.flows.tests import FlowTestCase
@ -15,10 +15,13 @@ from authentik.lib.generators import generate_id
from authentik.lib.tests.utils import get_request from authentik.lib.tests.utils import get_request
from authentik.stages.authenticator_validate.challenge import ( from authentik.stages.authenticator_validate.challenge import (
get_challenge_for_device, get_challenge_for_device,
get_webauthn_challenge_without_user,
validate_challenge_webauthn, validate_challenge_webauthn,
) )
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView
from authentik.stages.authenticator_webauthn.models import WebAuthnDevice from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
from authentik.stages.identification.models import IdentificationStage, UserFields from authentik.stages.identification.models import IdentificationStage, UserFields
@ -104,7 +107,199 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
}, },
) )
with self.assertRaises(Http404): with self.assertRaises(ValidationError):
validate_challenge_webauthn( validate_challenge_webauthn(
{}, StageView(FlowExecutorView(current_stage=stage), request=request), self.user {}, StageView(FlowExecutorView(current_stage=stage), request=request), self.user
) )
def test_get_challenge(self):
"""Test webauthn"""
request = get_request("/")
request.user = self.user
webauthn_device = WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J"
"H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU"
),
credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
sign_count=0,
rp_id=generate_id(),
)
challenge = get_challenge_for_device(request, webauthn_device)
webauthn_challenge = request.session[SESSION_KEY_WEBAUTHN_CHALLENGE]
self.assertEqual(
challenge,
{
"allowCredentials": [
{
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"type": "public-key",
}
],
"challenge": bytes_to_base64url(webauthn_challenge),
"rpId": "testserver",
"timeout": 60000,
"userVerification": "preferred",
},
)
def test_get_challenge_userless(self):
"""Test webauthn (userless)"""
request = get_request("/")
WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J"
"H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU"
),
credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
sign_count=0,
rp_id=generate_id(),
)
challenge = get_webauthn_challenge_without_user(request)
webauthn_challenge = request.session[SESSION_KEY_WEBAUTHN_CHALLENGE]
self.assertEqual(
challenge,
{
"allowCredentials": [],
"challenge": bytes_to_base64url(webauthn_challenge),
"rpId": "testserver",
"timeout": 60000,
"userVerification": "preferred",
},
)
def test_validate_challenge(self):
"""Test webauthn"""
request = get_request("/")
request.user = self.user
WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J"
"H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU"
),
credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
sign_count=4,
rp_id=generate_id(),
)
flow = create_test_flow()
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.WEBAUTHN],
)
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=flow, current_stage=stage), request=request
)
request = get_request("/")
request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1"
"jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
)
)
request.session.save()
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=flow, current_stage=stage), request=request
)
request.META["SERVER_NAME"] = "localhost"
request.META["SERVER_PORT"] = "9000"
validate_challenge_webauthn(
{
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"type": "public-key",
"assertionClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4TGZo"
"ckQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdoLUJxYTU1"
"NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDAiLCJjcm9z"
"c09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hlcmUiOiJkbyBub3Qg"
"Y29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxhdGUuIFNlZSBodHRwczov"
"L2dvby5nbC95YWJQZXgifQ==",
),
"signature": (
"MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI"
"AiAqtOK6mIZPk62kZN0OzFsHfuvu_RlOl7zlqSNzDdz_Ag=="
),
"authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABQ==",
"userHandle": None,
},
},
stage_view,
self.user,
)
def test_validate_challenge_invalid(self):
"""Test webauthn"""
request = get_request("/")
request.user = self.user
WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc4"
"3i0JH6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU"
),
credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
# One more sign count than above, make it invalid
sign_count=5,
rp_id=generate_id(),
)
flow = create_test_flow()
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.WEBAUTHN],
)
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=flow, current_stage=stage), request=request
)
request = get_request("/")
request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1j"
"wSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
)
)
request.session.save()
stage_view = AuthenticatorValidateStageView(
FlowExecutorView(flow=flow, current_stage=stage), request=request
)
request.META["SERVER_NAME"] = "localhost"
request.META["SERVER_PORT"] = "9000"
with self.assertRaises(ValidationError):
validate_challenge_webauthn(
{
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"type": "public-key",
"assertionClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4"
"TGZockQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdo"
"LUJxYTU1NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0Ojkw"
"MDAiLCJjcm9zc09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hl"
"cmUiOiJkbyBub3QgY29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxh"
"dGUuIFNlZSBodHRwczovL2dvby5nbC95YWJQZXgifQ=="
),
"signature": (
"MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI"
"AiAqtOK6mIZPk62kZN0OzFsHfuvu_RlOl7zlqSNzDdz_Ag=="
),
"authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABQ==",
"userHandle": None,
},
},
stage_view,
self.user,
)

View file

@ -144,4 +144,4 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
return self.executor.stage_ok() return self.executor.stage_ok()
def cleanup(self): def cleanup(self):
self.request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE) self.request.session.pop(SESSION_KEY_WEBAUTHN_CHALLENGE, None)

View file

@ -1,20 +1,93 @@
"""Test WebAuthn API""" """Test WebAuthn API"""
from base64 import b64decode
from django.urls import reverse from django.urls import reverse
from rest_framework.test import APITestCase from webauthn.helpers import bytes_to_base64url
from authentik.core.models import User from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.stages.authenticator_webauthn.models import WebAuthnDevice from authentik.flows.markers import StageMarker
from authentik.flows.models import FlowStageBinding
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.generators import generate_id
from authentik.stages.authenticator_webauthn.models import AuthenticateWebAuthnStage, WebAuthnDevice
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
class AuthenticatorWebAuthnStage(APITestCase): class TestAuthenticatorWebAuthnStage(FlowTestCase):
"""Test WebAuthn API""" """Test WebAuthn API"""
def setUp(self) -> None:
self.stage = AuthenticateWebAuthnStage.objects.create(
name=generate_id(),
)
self.flow = create_test_flow()
self.binding = FlowStageBinding.objects.create(
target=self.flow,
stage=self.stage,
order=0,
)
self.user = create_test_admin_user()
def test_api_delete(self): def test_api_delete(self):
"""Test api delete""" """Test api delete"""
user = User.objects.create(username="foo") self.client.force_login(self.user)
self.client.force_login(user) dev = WebAuthnDevice.objects.create(user=self.user)
dev = WebAuthnDevice.objects.create(user=user)
response = self.client.delete( response = self.client.delete(
reverse("authentik_api:webauthndevice-detail", kwargs={"pk": dev.pk}) reverse("authentik_api:webauthndevice-detail", kwargs={"pk": dev.pk})
) )
self.assertEqual(response.status_code, 204) self.assertEqual(response.status_code, 204)
def test_registration_options(self):
"""Test registration options"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = b64decode(
(
"o90Yh1osqW3mjGift+6WclWOya5lcdff/G0mqueN3hChacMUz"
"V4mxiDafuQ0x0e1d/fcPai0fx/jMBZ8/nG2qQ=="
).encode()
)
session.save()
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
)
self.assertEqual(response.status_code, 200)
session = self.client.session
self.assertStageResponse(
response,
self.flow,
self.user,
registration={
"rp": {"name": "authentik", "id": "testserver"},
"user": {
"id": bytes_to_base64url(self.user.uid.encode("utf-8")),
"name": self.user.username,
"displayName": self.user.name,
},
"challenge": bytes_to_base64url(session[SESSION_KEY_WEBAUTHN_CHALLENGE]),
"pubKeyCredParams": [
{"type": "public-key", "alg": -7},
{"type": "public-key", "alg": -8},
{"type": "public-key", "alg": -36},
{"type": "public-key", "alg": -37},
{"type": "public-key", "alg": -38},
{"type": "public-key", "alg": -39},
{"type": "public-key", "alg": -257},
{"type": "public-key", "alg": -258},
{"type": "public-key", "alg": -259},
],
"timeout": 60000,
"excludeCredentials": [],
"authenticatorSelection": {
"residentKey": "preferred",
"requireResidentKey": False,
"userVerification": "preferred",
},
"attestation": "none",
},
)