diff --git a/authentik/providers/oauth2/tests/test_views_authorize.py b/authentik/providers/oauth2/tests/test_views_authorize.py index 14aca975f..92435a484 100644 --- a/authentik/providers/oauth2/tests/test_views_authorize.py +++ b/authentik/providers/oauth2/tests/test_views_authorize.py @@ -1,13 +1,23 @@ """Test authorize view""" from django.test import RequestFactory, TestCase +from django.urls import reverse +from django.utils.encoding import force_str +from authentik.core.models import Application, User +from authentik.flows.challenge import ChallengeTypes from authentik.flows.models import Flow from authentik.providers.oauth2.errors import ( AuthorizeError, ClientIdError, RedirectUriError, ) -from authentik.providers.oauth2.models import OAuth2Provider +from authentik.providers.oauth2.generators import generate_client_id +from authentik.providers.oauth2.models import ( + AuthorizationCode, + GrantTypes, + OAuth2Provider, + RefreshToken, +) from authentik.providers.oauth2.views.authorize import OAuthAuthorizationParams @@ -32,15 +42,194 @@ class TestViewsAuthorize(TestCase): ) OAuthAuthorizationParams.from_request(request) - def test_missing_redirect_uri(self): - """test missing redirect URI""" + def test_request(self): + """test request param""" OAuth2Provider.objects.create( name="test", client_id="test", authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", + ) + with self.assertRaises(AuthorizeError): + request = self.factory.get( + "/", + data={ + "response_type": "code", + "client_id": "test", + "redirect_uri": "http://local.invalid", + "request": "foo", + }, + ) + OAuthAuthorizationParams.from_request(request) + + def test_redirect_uri(self): + """test missing/invalid redirect URI""" + OAuth2Provider.objects.create( + name="test", + client_id="test", + authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", ) with self.assertRaises(RedirectUriError): request = self.factory.get( "/", data={"response_type": "code", "client_id": "test"} ) OAuthAuthorizationParams.from_request(request) + with self.assertRaises(RedirectUriError): + request = self.factory.get( + "/", + data={ + "response_type": "code", + "client_id": "test", + "redirect_uri": "http://localhost", + }, + ) + OAuthAuthorizationParams.from_request(request) + + def test_response_type(self): + """test response_type""" + OAuth2Provider.objects.create( + name="test", + client_id="test", + authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", + ) + request = self.factory.get( + "/", + data={ + "response_type": "code", + "client_id": "test", + "redirect_uri": "http://local.invalid", + }, + ) + self.assertEqual( + OAuthAuthorizationParams.from_request(request).grant_type, + GrantTypes.AUTHORIZATION_CODE, + ) + request = self.factory.get( + "/", + data={ + "response_type": "id_token", + "client_id": "test", + "redirect_uri": "http://local.invalid", + "scope": "openid", + "state": "foo", + }, + ) + self.assertEqual( + OAuthAuthorizationParams.from_request(request).grant_type, + GrantTypes.IMPLICIT, + ) + # Implicit without openid scope + with self.assertRaises(AuthorizeError): + request = self.factory.get( + "/", + data={ + "response_type": "id_token", + "client_id": "test", + "redirect_uri": "http://local.invalid", + "state": "foo", + }, + ) + self.assertEqual( + OAuthAuthorizationParams.from_request(request).grant_type, + GrantTypes.IMPLICIT, + ) + request = self.factory.get( + "/", + data={ + "response_type": "code token", + "client_id": "test", + "redirect_uri": "http://local.invalid", + "scope": "openid", + "state": "foo", + }, + ) + self.assertEqual( + OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID + ) + with self.assertRaises(AuthorizeError): + request = self.factory.get( + "/", + data={ + "response_type": "invalid", + "client_id": "test", + "redirect_uri": "http://local.invalid", + }, + ) + OAuthAuthorizationParams.from_request(request) + + def test_full_code(self): + """Test full authorization""" + flow = Flow.objects.create(slug="empty") + provider = OAuth2Provider.objects.create( + name="test", + client_id="test", + authorization_flow=flow, + redirect_uris="http://localhost", + ) + Application.objects.create(name="app", slug="app", provider=provider) + state = generate_client_id() + user = User.objects.get(username="akadmin") + self.client.force_login(user) + # Step 1, initiate params and get redirect to flow + self.client.get( + reverse("authentik_providers_oauth2:authorize"), + data={ + "response_type": "code", + "client_id": "test", + "state": state, + "redirect_uri": "http://localhost", + }, + ) + response = self.client.get( + reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), + ) + code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() + self.assertJSONEqual( + force_str(response.content), + { + "type": ChallengeTypes.REDIRECT.value, + "to": f"http://localhost?code={code.code}&state={state}", + }, + ) + + def test_full_implicit(self): + """Test full authorization""" + flow = Flow.objects.create(slug="empty") + provider = OAuth2Provider.objects.create( + name="test", + client_id="test", + authorization_flow=flow, + redirect_uris="http://localhost", + ) + Application.objects.create(name="app", slug="app", provider=provider) + state = generate_client_id() + user = User.objects.get(username="akadmin") + self.client.force_login(user) + # Step 1, initiate params and get redirect to flow + self.client.get( + reverse("authentik_providers_oauth2:authorize"), + data={ + "response_type": "id_token", + "client_id": "test", + "state": state, + "scope": "openid", + "redirect_uri": "http://localhost", + }, + ) + response = self.client.get( + reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), + ) + token: RefreshToken = RefreshToken.objects.filter(user=user).first() + self.assertJSONEqual( + force_str(response.content), + { + "type": ChallengeTypes.REDIRECT.value, + "to": ( + f"http://localhost#access_token={token.access_token}" + f"&id_token={provider.encode(token.id_token.to_dict())}&token_type=bearer" + f"&expires_in=600&state={state}" + ), + }, + ) diff --git a/authentik/providers/oauth2/tests/test_views_token.py b/authentik/providers/oauth2/tests/test_views_token.py new file mode 100644 index 000000000..5e4dcae64 --- /dev/null +++ b/authentik/providers/oauth2/tests/test_views_token.py @@ -0,0 +1,171 @@ +"""Test token view""" +from base64 import b64encode + +from django.test import RequestFactory, TestCase +from django.urls import reverse +from django.utils.encoding import force_str + +from authentik.core.models import User +from authentik.flows.models import Flow +from authentik.providers.oauth2.constants import ( + GRANT_TYPE_AUTHORIZATION_CODE, + GRANT_TYPE_REFRESH_TOKEN, +) +from authentik.providers.oauth2.generators import ( + generate_client_id, + generate_client_secret, +) +from authentik.providers.oauth2.models import ( + AuthorizationCode, + OAuth2Provider, + RefreshToken, +) +from authentik.providers.oauth2.views.token import TokenParams + + +class TestViewsToken(TestCase): + """Test token view""" + + def setUp(self) -> None: + super().setUp() + self.factory = RequestFactory() + + def test_request_auth_code(self): + """test request param""" + provider = OAuth2Provider.objects.create( + name="test", + client_id=generate_client_id(), + client_secret=generate_client_secret(), + authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", + ) + header = b64encode( + f"{provider.client_id}:{provider.client_secret}".encode() + ).decode() + user = User.objects.get(username="akadmin") + code = AuthorizationCode.objects.create( + code="foobar", provider=provider, user=user + ) + request = self.factory.post( + "/", + data={ + "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, + "code": code.code, + "redirect_uri": "http://local.invalid", + }, + HTTP_AUTHORIZATION=f"Basic {header}", + ) + params = TokenParams.from_request(request) + self.assertEqual(params.provider, provider) + + def test_request_refresh_token(self): + """test request param""" + provider = OAuth2Provider.objects.create( + name="test", + client_id=generate_client_id(), + client_secret=generate_client_secret(), + authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", + ) + header = b64encode( + f"{provider.client_id}:{provider.client_secret}".encode() + ).decode() + user = User.objects.get(username="akadmin") + token: RefreshToken = RefreshToken.objects.create( + provider=provider, + user=user, + refresh_token=generate_client_id(), + ) + request = self.factory.post( + "/", + data={ + "grant_type": GRANT_TYPE_REFRESH_TOKEN, + "refresh_token": token.refresh_token, + "redirect_uri": "http://local.invalid", + }, + HTTP_AUTHORIZATION=f"Basic {header}", + ) + params = TokenParams.from_request(request) + self.assertEqual(params.provider, provider) + + def test_auth_code_view(self): + """test request param""" + provider = OAuth2Provider.objects.create( + name="test", + client_id=generate_client_id(), + client_secret=generate_client_secret(), + authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", + ) + header = b64encode( + f"{provider.client_id}:{provider.client_secret}".encode() + ).decode() + user = User.objects.get(username="akadmin") + code = AuthorizationCode.objects.create( + code="foobar", provider=provider, user=user + ) + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + data={ + "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, + "code": code.code, + "redirect_uri": "http://local.invalid", + }, + HTTP_AUTHORIZATION=f"Basic {header}", + ) + new_token: RefreshToken = RefreshToken.objects.filter(user=user).first() + self.assertJSONEqual( + force_str(response.content), + { + "access_token": new_token.access_token, + "refresh_token": new_token.refresh_token, + "token_type": "bearer", + "expires_in": 600, + "id_token": provider.encode( + new_token.id_token.to_dict(), + ), + }, + ) + + def test_refresh_token_view(self): + """test request param""" + provider = OAuth2Provider.objects.create( + name="test", + client_id=generate_client_id(), + client_secret=generate_client_secret(), + authorization_flow=Flow.objects.first(), + redirect_uris="http://local.invalid", + ) + header = b64encode( + f"{provider.client_id}:{provider.client_secret}".encode() + ).decode() + user = User.objects.get(username="akadmin") + token: RefreshToken = RefreshToken.objects.create( + provider=provider, + user=user, + refresh_token=generate_client_id(), + ) + response = self.client.post( + reverse("authentik_providers_oauth2:token"), + data={ + "grant_type": GRANT_TYPE_REFRESH_TOKEN, + "refresh_token": token.refresh_token, + "redirect_uri": "http://local.invalid", + }, + HTTP_AUTHORIZATION=f"Basic {header}", + ) + new_token: RefreshToken = ( + RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() + ) + self.assertJSONEqual( + force_str(response.content), + { + "access_token": new_token.access_token, + "refresh_token": new_token.refresh_token, + "token_type": "bearer", + "expires_in": 600, + "id_token": provider.encode( + new_token.id_token.to_dict(), + ), + }, + ) diff --git a/authentik/providers/oauth2/views/token.py b/authentik/providers/oauth2/views/token.py index ece6a5c89..5360c0f9e 100644 --- a/authentik/providers/oauth2/views/token.py +++ b/authentik/providers/oauth2/views/token.py @@ -198,7 +198,7 @@ class TokenView(View): response_dict = { "access_token": refresh_token.access_token, "refresh_token": refresh_token.refresh_token, - "token_type": "Bearer", + "token_type": "bearer", "expires_in": timedelta_from_string( self.params.provider.token_validity ).seconds,