From a2578ffaade7c35641b1c0f277026b0e9d155d06 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Mon, 23 Aug 2021 20:00:27 +0200 Subject: [PATCH] core: add token tests for invalid intent and token auth Signed-off-by: Jens Langhammer --- .../core/tests/test_source_flow_manager.py | 37 +++++----------- authentik/core/tests/test_token_api.py | 8 ++++ authentik/core/tests/test_token_auth.py | 40 +++++++++++++++++ authentik/flows/tests/test_planner.py | 7 +-- authentik/lib/tests/utils.py | 27 ++++++++++++ .../saml/tests/test_auth_n_request.py | 44 +++---------------- authentik/providers/saml/tests/test_schema.py | 17 ++----- .../stages/authenticator_validate/tests.py | 8 +--- 8 files changed, 99 insertions(+), 89 deletions(-) create mode 100644 authentik/core/tests/test_token_auth.py create mode 100644 authentik/lib/tests/utils.py diff --git a/authentik/core/tests/test_source_flow_manager.py b/authentik/core/tests/test_source_flow_manager.py index a869b403b..02f3cf8d4 100644 --- a/authentik/core/tests/test_source_flow_manager.py +++ b/authentik/core/tests/test_source_flow_manager.py @@ -1,15 +1,12 @@ """Test Source flow_manager""" from django.contrib.auth.models import AnonymousUser -from django.contrib.messages.middleware import MessageMiddleware -from django.contrib.sessions.middleware import SessionMiddleware -from django.http.request import HttpRequest from django.test import TestCase from django.test.client import RequestFactory from guardian.utils import get_anonymous_user from authentik.core.models import SourceUserMatchingModes, User from authentik.core.sources.flow_manager import Action -from authentik.flows.tests.test_planner import dummy_get_response +from authentik.lib.tests.utils import get_request from authentik.providers.oauth2.generators import generate_client_id from authentik.sources.oauth.models import OAuthSource, UserOAuthSourceConnection from authentik.sources.oauth.views.callback import OAuthSourceFlowManager @@ -24,22 +21,10 @@ class TestSourceFlowManager(TestCase): self.factory = RequestFactory() self.identifier = generate_client_id() - def get_request(self, user: User) -> HttpRequest: - """Helper to create a get request with session and message middleware""" - request = self.factory.get("/") - request.user = user - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(request) - request.session.save() - middleware = MessageMiddleware(dummy_get_response) - middleware.process_request(request) - request.session.save() - return request - def test_unauthenticated_enroll(self): """Test un-authenticated user enrolling""" flow_manager = OAuthSourceFlowManager( - self.source, self.get_request(AnonymousUser()), self.identifier, {} + self.source, get_request("/", user=AnonymousUser()), self.identifier, {} ) action, _ = flow_manager.get_action() self.assertEqual(action, Action.ENROLL) @@ -52,7 +37,7 @@ class TestSourceFlowManager(TestCase): ) flow_manager = OAuthSourceFlowManager( - self.source, self.get_request(AnonymousUser()), self.identifier, {} + self.source, get_request("/", user=AnonymousUser()), self.identifier, {} ) action, _ = flow_manager.get_action() self.assertEqual(action, Action.AUTH) @@ -65,7 +50,7 @@ class TestSourceFlowManager(TestCase): ) user = User.objects.create(username="foo", email="foo@bar.baz") flow_manager = OAuthSourceFlowManager( - self.source, self.get_request(user), self.identifier, {} + self.source, get_request("/", user=user), self.identifier, {} ) action, _ = flow_manager.get_action() self.assertEqual(action, Action.LINK) @@ -78,7 +63,7 @@ class TestSourceFlowManager(TestCase): # Without email, deny flow_manager = OAuthSourceFlowManager( - self.source, self.get_request(AnonymousUser()), self.identifier, {} + self.source, get_request("/", user=AnonymousUser()), self.identifier, {} ) action, _ = flow_manager.get_action() self.assertEqual(action, Action.DENY) @@ -86,7 +71,7 @@ class TestSourceFlowManager(TestCase): # With email flow_manager = OAuthSourceFlowManager( self.source, - self.get_request(AnonymousUser()), + get_request("/", user=AnonymousUser()), self.identifier, {"email": "foo@bar.baz"}, ) @@ -101,7 +86,7 @@ class TestSourceFlowManager(TestCase): # Without username, deny flow_manager = OAuthSourceFlowManager( - self.source, self.get_request(AnonymousUser()), self.identifier, {} + self.source, get_request("/", user=AnonymousUser()), self.identifier, {} ) action, _ = flow_manager.get_action() self.assertEqual(action, Action.DENY) @@ -109,7 +94,7 @@ class TestSourceFlowManager(TestCase): # With username flow_manager = OAuthSourceFlowManager( self.source, - self.get_request(AnonymousUser()), + get_request("/", user=AnonymousUser()), self.identifier, {"username": "foo"}, ) @@ -125,7 +110,7 @@ class TestSourceFlowManager(TestCase): # With non-existent username, enroll flow_manager = OAuthSourceFlowManager( self.source, - self.get_request(AnonymousUser()), + get_request("/", user=AnonymousUser()), self.identifier, { "username": "bar", @@ -137,7 +122,7 @@ class TestSourceFlowManager(TestCase): # With username flow_manager = OAuthSourceFlowManager( self.source, - self.get_request(AnonymousUser()), + get_request("/", user=AnonymousUser()), self.identifier, {"username": "foo"}, ) @@ -151,7 +136,7 @@ class TestSourceFlowManager(TestCase): flow_manager = OAuthSourceFlowManager( self.source, - self.get_request(AnonymousUser()), + get_request("/", user=AnonymousUser()), self.identifier, {"username": "foo"}, ) diff --git a/authentik/core/tests/test_token_api.py b/authentik/core/tests/test_token_api.py index 0e19a9a98..4a34f6bca 100644 --- a/authentik/core/tests/test_token_api.py +++ b/authentik/core/tests/test_token_api.py @@ -27,6 +27,14 @@ class TestTokenAPI(APITestCase): self.assertEqual(token.intent, TokenIntents.INTENT_API) self.assertEqual(token.expiring, True) + def test_token_create_invalid(self): + """Test token creation endpoint (invalid data)""" + response = self.client.post( + reverse("authentik_api:token-list"), + {"identifier": "test-token", "intent": TokenIntents.INTENT_RECOVERY}, + ) + self.assertEqual(response.status_code, 400) + def test_token_create_non_expiring(self): """Test token creation endpoint""" self.user.attributes[USER_ATTRIBUTE_TOKEN_EXPIRING] = False diff --git a/authentik/core/tests/test_token_auth.py b/authentik/core/tests/test_token_auth.py new file mode 100644 index 000000000..f7b285bbc --- /dev/null +++ b/authentik/core/tests/test_token_auth.py @@ -0,0 +1,40 @@ +"""Test token auth""" +from django.test import TestCase + +from authentik.core.auth import TokenBackend +from authentik.core.models import Token, TokenIntents, User +from authentik.flows.planner import FlowPlan +from authentik.flows.views import SESSION_KEY_PLAN +from authentik.lib.tests.utils import get_request + + +class TestTokenAuth(TestCase): + """Test token auth""" + + def setUp(self) -> None: + self.user = User.objects.create(username="test-user") + self.token = Token.objects.create( + expiring=False, user=self.user, intent=TokenIntents.INTENT_APP_PASSWORD + ) + # To test with session we need to create a request and pass it through all middlewares + self.request = get_request("/") + self.request.session[SESSION_KEY_PLAN] = FlowPlan("test") + + def test_token_auth(self): + """Test auth with token""" + self.assertEqual( + TokenBackend().authenticate(self.request, "test-user", self.token.key), self.user + ) + + def test_token_auth_none(self): + """Test auth with token (non-existent user)""" + self.assertIsNone( + TokenBackend().authenticate(self.request, "test-user-foo", self.token.key), self.user + ) + + def test_token_auth_invalid(self): + """Test auth with token (invalid token)""" + self.assertIsNone( + TokenBackend().authenticate(self.request, "test-user", self.token.key + "foo"), + self.user, + ) diff --git a/authentik/flows/tests/test_planner.py b/authentik/flows/tests/test_planner.py index 63c4c5193..cb7bf44f7 100644 --- a/authentik/flows/tests/test_planner.py +++ b/authentik/flows/tests/test_planner.py @@ -3,7 +3,6 @@ from unittest.mock import MagicMock, Mock, PropertyMock, patch from django.contrib.sessions.middleware import SessionMiddleware from django.core.cache import cache -from django.http import HttpRequest from django.test import RequestFactory, TestCase from django.urls import reverse from guardian.shortcuts import get_anonymous_user @@ -13,6 +12,7 @@ from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableExce from authentik.flows.markers import ReevaluateMarker, StageMarker from authentik.flows.models import Flow, FlowDesignation, FlowStageBinding from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner, cache_key +from authentik.lib.tests.utils import dummy_get_response from authentik.policies.dummy.models import DummyPolicy from authentik.policies.models import PolicyBinding from authentik.policies.types import PolicyResult @@ -24,11 +24,6 @@ CACHE_MOCK = Mock(wraps=cache) POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True)) -def dummy_get_response(request: HttpRequest): # pragma: no cover - """Dummy get_response for SessionMiddleware""" - return None - - class TestFlowPlanner(TestCase): """Test planner logic""" diff --git a/authentik/lib/tests/utils.py b/authentik/lib/tests/utils.py new file mode 100644 index 000000000..35ec8391d --- /dev/null +++ b/authentik/lib/tests/utils.py @@ -0,0 +1,27 @@ +"""Test utils""" +from django.contrib.messages.middleware import MessageMiddleware +from django.contrib.sessions.middleware import SessionMiddleware +from django.http import HttpRequest +from django.test.client import RequestFactory +from guardian.utils import get_anonymous_user + + +def dummy_get_response(request: HttpRequest): # pragma: no cover + """Dummy get_response for SessionMiddleware""" + return None + + +def get_request(*args, user=None, **kwargs): + """Get a request with usable session""" + request = RequestFactory().get(*args, **kwargs) + if user: + request.user = user + else: + request.user = get_anonymous_user() + middleware = SessionMiddleware(dummy_get_response) + middleware.process_request(request) + request.session.save() + middleware = MessageMiddleware(dummy_get_response) + middleware.process_request(request) + request.session.save() + return request diff --git a/authentik/providers/saml/tests/test_auth_n_request.py b/authentik/providers/saml/tests/test_auth_n_request.py index ae8986956..a78373fd8 100644 --- a/authentik/providers/saml/tests/test_auth_n_request.py +++ b/authentik/providers/saml/tests/test_auth_n_request.py @@ -1,16 +1,14 @@ """Test AuthN Request generator and parser""" from base64 import b64encode -from django.contrib.sessions.middleware import SessionMiddleware from django.http.request import QueryDict from django.test import RequestFactory, TestCase -from guardian.utils import get_anonymous_user from authentik.core.models import User from authentik.crypto.models import CertificateKeyPair from authentik.events.models import Event, EventAction from authentik.flows.models import Flow -from authentik.flows.tests.test_planner import dummy_get_response +from authentik.lib.tests.utils import get_request from authentik.managed.manager import ObjectManager from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider from authentik.providers.saml.processors.assertion import AssertionProcessor @@ -99,11 +97,7 @@ class TestAuthNRequest(TestCase): def test_signed_valid(self): """Test generated AuthNRequest with valid signature""" - http_request = self.factory.get("/") - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/") # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") @@ -117,12 +111,7 @@ class TestAuthNRequest(TestCase): def test_request_full_signed(self): """Test full SAML Request/Response flow, fully signed""" - http_request = self.factory.get("/") - http_request.user = get_anonymous_user() - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/") # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") @@ -145,12 +134,7 @@ class TestAuthNRequest(TestCase): def test_request_id_invalid(self): """Test generated AuthNRequest with invalid request ID""" - http_request = self.factory.get("/") - http_request.user = get_anonymous_user() - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/") # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") @@ -179,11 +163,7 @@ class TestAuthNRequest(TestCase): def test_signed_valid_detached(self): """Test generated AuthNRequest with valid signature (detached)""" - http_request = self.factory.get("/") - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/") # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") @@ -243,12 +223,7 @@ class TestAuthNRequest(TestCase): def test_request_attributes(self): """Test full SAML Request/Response flow, fully signed""" - http_request = self.factory.get("/") - http_request.user = User.objects.get(username="akadmin") - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/", user=User.objects.get(username="akadmin")) # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") @@ -264,12 +239,7 @@ class TestAuthNRequest(TestCase): def test_request_attributes_invalid(self): """Test full SAML Request/Response flow, fully signed""" - http_request = self.factory.get("/") - http_request.user = User.objects.get(username="akadmin") - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/", user=User.objects.get(username="akadmin")) # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") diff --git a/authentik/providers/saml/tests/test_schema.py b/authentik/providers/saml/tests/test_schema.py index b5f2d62e3..800df49d9 100644 --- a/authentik/providers/saml/tests/test_schema.py +++ b/authentik/providers/saml/tests/test_schema.py @@ -1,18 +1,16 @@ """Test Requests and Responses against schema""" from base64 import b64encode -from django.contrib.sessions.middleware import SessionMiddleware from django.test import RequestFactory, TestCase -from guardian.utils import get_anonymous_user from lxml import etree # nosec from authentik.crypto.models import CertificateKeyPair from authentik.flows.models import Flow +from authentik.lib.tests.utils import get_request from authentik.managed.manager import ObjectManager from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider from authentik.providers.saml.processors.assertion import AssertionProcessor from authentik.providers.saml.processors.request_parser import AuthNRequestParser -from authentik.providers.saml.tests.test_auth_n_request import dummy_get_response from authentik.sources.saml.models import SAMLSource from authentik.sources.saml.processors.request import RequestProcessor @@ -43,11 +41,7 @@ class TestSchema(TestCase): def test_request_schema(self): """Test generated AuthNRequest against Schema""" - http_request = self.factory.get("/") - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/") # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") @@ -60,12 +54,7 @@ class TestSchema(TestCase): def test_response_schema(self): """Test generated AuthNRequest against Schema""" - http_request = self.factory.get("/") - http_request.user = get_anonymous_user() - - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(http_request) - http_request.session.save() + http_request = get_request("/") # First create an AuthNRequest request_proc = RequestProcessor(self.source, http_request, "test_state") diff --git a/authentik/stages/authenticator_validate/tests.py b/authentik/stages/authenticator_validate/tests.py index 91b0dc8d3..c5071b5cc 100644 --- a/authentik/stages/authenticator_validate/tests.py +++ b/authentik/stages/authenticator_validate/tests.py @@ -1,7 +1,6 @@ """Test validator stage""" from unittest.mock import MagicMock, patch -from django.contrib.sessions.middleware import SessionMiddleware from django.test import TestCase from django.test.client import RequestFactory from django.urls.base import reverse @@ -12,7 +11,7 @@ from rest_framework.exceptions import ValidationError from authentik.core.models import User from authentik.flows.challenge import ChallengeTypes from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction -from authentik.flows.tests.test_planner import dummy_get_response +from authentik.lib.tests.utils import get_request from authentik.providers.oauth2.generators import generate_client_id, generate_client_secret from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice from authentik.stages.authenticator_validate.api import AuthenticatorValidateStageSerializer @@ -97,11 +96,8 @@ class AuthenticatorValidateStageTests(TestCase): def test_device_challenge_webauthn(self): """Test webauthn""" - request = self.request_factory.get("/") + request = get_request("/") request.user = self.user - middleware = SessionMiddleware(dummy_get_response) - middleware.process_request(request) - request.session.save() webauthn_device = WebAuthnDevice.objects.create( user=self.user,