providers/proxy: add tests for proxy basic auth (#4357)

* add tests for proxy basic auth

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* stop bandit from complaining

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* add API tests

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* more tests

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens L 2023-01-04 22:04:16 +01:00 committed by Jens Langhammer
parent 1e01e9813d
commit 24e8915e0a
No known key found for this signature in database
4 changed files with 212 additions and 4 deletions

View file

@ -1,6 +1,7 @@
"""ProxyProvider API Views""" """ProxyProvider API Views"""
from typing import Any, Optional from typing import Any, Optional
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema_field from drf_spectacular.utils import extend_schema_field
from rest_framework.exceptions import ValidationError from rest_framework.exceptions import ValidationError
from rest_framework.fields import CharField, ListField, ReadOnlyField, SerializerMethodField from rest_framework.fields import CharField, ListField, ReadOnlyField, SerializerMethodField
@ -39,22 +40,34 @@ class ProxyProviderSerializer(ProviderSerializer):
redirect_uris = CharField(read_only=True) redirect_uris = CharField(read_only=True)
outpost_set = ListField(child=CharField(), read_only=True, source="outpost_set.all") outpost_set = ListField(child=CharField(), read_only=True, source="outpost_set.all")
def validate_basic_auth_enabled(self, value: bool) -> bool:
"""Ensure user and password attributes are set"""
if value:
if (
self.initial_data.get("basic_auth_password_attribute", "") == ""
or self.initial_data.get("basic_auth_user_attribute", "") == ""
):
raise ValidationError(
_("User and password attributes must be set when basic auth is enabled.")
)
return value
def validate(self, attrs) -> dict[Any, str]: def validate(self, attrs) -> dict[Any, str]:
"""Check that internal_host is set when mode is Proxy""" """Check that internal_host is set when mode is Proxy"""
if ( if (
attrs.get("mode", ProxyMode.PROXY) == ProxyMode.PROXY attrs.get("mode", ProxyMode.PROXY) == ProxyMode.PROXY
and attrs.get("internal_host", "") == "" and attrs.get("internal_host", "") == ""
): ):
raise ValidationError("Internal host cannot be empty when forward auth is disabled.") raise ValidationError(_("Internal host cannot be empty when forward auth is disabled."))
return attrs return attrs
def create(self, validated_data): def create(self, validated_data: dict):
instance: ProxyProvider = super().create(validated_data) instance: ProxyProvider = super().create(validated_data)
instance.set_oauth_defaults() instance.set_oauth_defaults()
instance.save() instance.save()
return instance return instance
def update(self, instance: ProxyProvider, validated_data): def update(self, instance: ProxyProvider, validated_data: dict):
instance = super().update(instance, validated_data) instance = super().update(instance, validated_data)
instance.set_oauth_defaults() instance.set_oauth_defaults()
instance.save() instance.save()

View file

@ -0,0 +1,122 @@
"""proxy provider tests"""
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.models import ClientTypes
from authentik.providers.proxy.models import ProxyMode, ProxyProvider
class ProxyProviderTests(APITestCase):
"""proxy provider tests"""
def setUp(self) -> None:
self.user = create_test_admin_user()
self.client.force_login(self.user)
def test_basic_auth(self):
"""Test basic_auth_enabled"""
response = self.client.post(
reverse("authentik_api:proxyprovider-list"),
{
"name": generate_id(),
"mode": ProxyMode.PROXY,
"authorization_flow": create_test_flow().pk.hex,
"external_host": "http://localhost",
"internal_host": "http://localhost",
"basic_auth_enabled": True,
"basic_auth_user_attribute": generate_id(),
"basic_auth_password_attribute": generate_id(),
},
)
self.assertEqual(response.status_code, 201)
def test_basic_auth_invalid(self):
"""Test basic_auth_enabled"""
response = self.client.post(
reverse("authentik_api:proxyprovider-list"),
{
"name": generate_id(),
"mode": ProxyMode.PROXY,
"authorization_flow": create_test_flow().pk.hex,
"external_host": "http://localhost",
"internal_host": "http://localhost",
"basic_auth_enabled": True,
},
)
self.assertEqual(response.status_code, 400)
self.assertJSONEqual(
response.content.decode(),
{
"basic_auth_enabled": [
"User and password attributes must be set when basic auth is enabled."
]
},
)
def test_validate(self):
"""Test validate"""
response = self.client.post(
reverse("authentik_api:proxyprovider-list"),
{
"name": generate_id(),
"mode": ProxyMode.PROXY,
"authorization_flow": create_test_flow().pk.hex,
"external_host": "http://localhost",
},
)
self.assertEqual(response.status_code, 400)
self.assertJSONEqual(
response.content.decode(),
{"non_field_errors": ["Internal host cannot be empty when forward auth is disabled."]},
)
def test_create_defaults(self):
"""Test create"""
name = generate_id()
response = self.client.post(
reverse("authentik_api:proxyprovider-list"),
{
"name": name,
"mode": ProxyMode.PROXY,
"authorization_flow": create_test_flow().pk.hex,
"external_host": "http://localhost",
"internal_host": "http://localhost",
},
)
self.assertEqual(response.status_code, 201)
provider: ProxyProvider = ProxyProvider.objects.get(name=name)
self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
def test_update_defaults(self):
"""Test create"""
name = generate_id()
response = self.client.post(
reverse("authentik_api:proxyprovider-list"),
{
"name": name,
"mode": ProxyMode.PROXY,
"authorization_flow": create_test_flow().pk.hex,
"external_host": "http://localhost",
"internal_host": "http://localhost",
},
)
self.assertEqual(response.status_code, 201)
provider: ProxyProvider = ProxyProvider.objects.get(name=name)
self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
provider.client_type = ClientTypes.PUBLIC
provider.save()
response = self.client.put(
reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}),
{
"name": name,
"mode": ProxyMode.PROXY,
"authorization_flow": create_test_flow().pk.hex,
"external_host": "http://localhost",
"internal_host": "http://localhost",
},
)
self.assertEqual(response.status_code, 200)
provider: ProxyProvider = ProxyProvider.objects.get(name=name)
self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)

View file

@ -47,7 +47,6 @@ class TestProviderLDAP(SeleniumTestCase):
def _prepare(self) -> User: def _prepare(self) -> User:
"""prepare user, provider, app and container""" """prepare user, provider, app and container"""
# set additionalHeaders to test later
self.user.attributes["extraAttribute"] = "bar" self.user.attributes["extraAttribute"] = "bar"
self.user.save() self.user.save()

View file

@ -1,4 +1,5 @@
"""Proxy and Outpost e2e tests""" """Proxy and Outpost e2e tests"""
from base64 import b64encode
from dataclasses import asdict from dataclasses import asdict
from sys import platform from sys import platform
from time import sleep from time import sleep
@ -14,6 +15,7 @@ from authentik import __version__
from authentik.blueprints.tests import apply_blueprint, reconcile_app from authentik.blueprints.tests import apply_blueprint, reconcile_app
from authentik.core.models import Application from authentik.core.models import Application
from authentik.flows.models import Flow from authentik.flows.models import Flow
from authentik.lib.generators import generate_id
from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType
from authentik.outposts.tasks import outpost_local_connection from authentik.outposts.tasks import outpost_local_connection
from authentik.providers.proxy.models import ProxyProvider from authentik.providers.proxy.models import ProxyProvider
@ -119,6 +121,78 @@ class TestProviderProxy(SeleniumTestCase):
full_body_text = self.driver.find_element(By.CSS_SELECTOR, ".pf-c-title.pf-m-3xl").text full_body_text = self.driver.find_element(By.CSS_SELECTOR, ".pf-c-title.pf-m-3xl").text
self.assertIn("You've logged out of proxy.", full_body_text) self.assertIn("You've logged out of proxy.", full_body_text)
@retry()
@apply_blueprint(
"default/10-flow-default-authentication-flow.yaml",
"default/10-flow-default-invalidation-flow.yaml",
)
@apply_blueprint(
"default/20-flow-default-provider-authorization-explicit-consent.yaml",
"default/20-flow-default-provider-authorization-implicit-consent.yaml",
)
@apply_blueprint(
"system/providers-oauth2.yaml",
"system/providers-proxy.yaml",
)
@reconcile_app("authentik_crypto")
def test_proxy_basic_auth(self):
"""Test simple outpost setup with single provider"""
cred = generate_id()
attr = "basic-password" # nosec
self.user.attributes["basic-username"] = cred
self.user.attributes[attr] = cred
self.user.save()
proxy: ProxyProvider = ProxyProvider.objects.create(
name="proxy_provider",
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
internal_host="http://localhost",
external_host="http://localhost:9000",
basic_auth_enabled=True,
basic_auth_user_attribute="basic-username",
basic_auth_password_attribute=attr,
)
# Ensure OAuth2 Params are set
proxy.set_oauth_defaults()
proxy.save()
# we need to create an application to actually access the proxy
Application.objects.create(name="proxy", slug="proxy", provider=proxy)
outpost: Outpost = Outpost.objects.create(
name="proxy_outpost",
type=OutpostType.PROXY,
)
outpost.providers.add(proxy)
outpost.build_user_permissions(outpost.user)
self.proxy_container = self.start_proxy(outpost)
# Wait until outpost healthcheck succeeds
healthcheck_retries = 0
while healthcheck_retries < 50:
if len(outpost.state) > 0:
state = outpost.state[0]
if state.last_seen:
break
healthcheck_retries += 1
sleep(0.5)
sleep(5)
self.driver.get("http://localhost:9000")
self.login()
sleep(1)
full_body_text = self.driver.find_element(By.CSS_SELECTOR, "pre").text
self.assertIn(f"X-Authentik-Username: {self.user.username}", full_body_text)
auth_header = b64encode(f"{cred}:{cred}".encode()).decode()
self.assertIn(f"Authorization: Basic {auth_header}", full_body_text)
self.driver.get("http://localhost:9000/outpost.goauthentik.io/sign_out")
sleep(2)
full_body_text = self.driver.find_element(By.CSS_SELECTOR, ".pf-c-title.pf-m-3xl").text
self.assertIn("You've logged out of proxy.", full_body_text)
@skipUnless(platform.startswith("linux"), "requires local docker") @skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderProxyConnect(ChannelsLiveServerTestCase): class TestProviderProxyConnect(ChannelsLiveServerTestCase):