providers/oauth2: add unittests for authorize and token views
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
dd2cd09637
commit
ce082ead5e
|
@ -1,13 +1,23 @@
|
|||
"""Test authorize view"""
|
||||
from django.test import RequestFactory, TestCase
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_str
|
||||
|
||||
from authentik.core.models import Application, User
|
||||
from authentik.flows.challenge import ChallengeTypes
|
||||
from authentik.flows.models import Flow
|
||||
from authentik.providers.oauth2.errors import (
|
||||
AuthorizeError,
|
||||
ClientIdError,
|
||||
RedirectUriError,
|
||||
)
|
||||
from authentik.providers.oauth2.models import OAuth2Provider
|
||||
from authentik.providers.oauth2.generators import generate_client_id
|
||||
from authentik.providers.oauth2.models import (
|
||||
AuthorizationCode,
|
||||
GrantTypes,
|
||||
OAuth2Provider,
|
||||
RefreshToken,
|
||||
)
|
||||
from authentik.providers.oauth2.views.authorize import OAuthAuthorizationParams
|
||||
|
||||
|
||||
|
@ -32,15 +42,194 @@ class TestViewsAuthorize(TestCase):
|
|||
)
|
||||
OAuthAuthorizationParams.from_request(request)
|
||||
|
||||
def test_missing_redirect_uri(self):
|
||||
"""test missing redirect URI"""
|
||||
def test_request(self):
|
||||
"""test request param"""
|
||||
OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id="test",
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
with self.assertRaises(AuthorizeError):
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "code",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://local.invalid",
|
||||
"request": "foo",
|
||||
},
|
||||
)
|
||||
OAuthAuthorizationParams.from_request(request)
|
||||
|
||||
def test_redirect_uri(self):
|
||||
"""test missing/invalid redirect URI"""
|
||||
OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id="test",
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
with self.assertRaises(RedirectUriError):
|
||||
request = self.factory.get(
|
||||
"/", data={"response_type": "code", "client_id": "test"}
|
||||
)
|
||||
OAuthAuthorizationParams.from_request(request)
|
||||
with self.assertRaises(RedirectUriError):
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "code",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://localhost",
|
||||
},
|
||||
)
|
||||
OAuthAuthorizationParams.from_request(request)
|
||||
|
||||
def test_response_type(self):
|
||||
"""test response_type"""
|
||||
OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id="test",
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "code",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://local.invalid",
|
||||
},
|
||||
)
|
||||
self.assertEqual(
|
||||
OAuthAuthorizationParams.from_request(request).grant_type,
|
||||
GrantTypes.AUTHORIZATION_CODE,
|
||||
)
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "id_token",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://local.invalid",
|
||||
"scope": "openid",
|
||||
"state": "foo",
|
||||
},
|
||||
)
|
||||
self.assertEqual(
|
||||
OAuthAuthorizationParams.from_request(request).grant_type,
|
||||
GrantTypes.IMPLICIT,
|
||||
)
|
||||
# Implicit without openid scope
|
||||
with self.assertRaises(AuthorizeError):
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "id_token",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://local.invalid",
|
||||
"state": "foo",
|
||||
},
|
||||
)
|
||||
self.assertEqual(
|
||||
OAuthAuthorizationParams.from_request(request).grant_type,
|
||||
GrantTypes.IMPLICIT,
|
||||
)
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "code token",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://local.invalid",
|
||||
"scope": "openid",
|
||||
"state": "foo",
|
||||
},
|
||||
)
|
||||
self.assertEqual(
|
||||
OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID
|
||||
)
|
||||
with self.assertRaises(AuthorizeError):
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
data={
|
||||
"response_type": "invalid",
|
||||
"client_id": "test",
|
||||
"redirect_uri": "http://local.invalid",
|
||||
},
|
||||
)
|
||||
OAuthAuthorizationParams.from_request(request)
|
||||
|
||||
def test_full_code(self):
|
||||
"""Test full authorization"""
|
||||
flow = Flow.objects.create(slug="empty")
|
||||
provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id="test",
|
||||
authorization_flow=flow,
|
||||
redirect_uris="http://localhost",
|
||||
)
|
||||
Application.objects.create(name="app", slug="app", provider=provider)
|
||||
state = generate_client_id()
|
||||
user = User.objects.get(username="akadmin")
|
||||
self.client.force_login(user)
|
||||
# Step 1, initiate params and get redirect to flow
|
||||
self.client.get(
|
||||
reverse("authentik_providers_oauth2:authorize"),
|
||||
data={
|
||||
"response_type": "code",
|
||||
"client_id": "test",
|
||||
"state": state,
|
||||
"redirect_uri": "http://localhost",
|
||||
},
|
||||
)
|
||||
response = self.client.get(
|
||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||
)
|
||||
code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
|
||||
self.assertJSONEqual(
|
||||
force_str(response.content),
|
||||
{
|
||||
"type": ChallengeTypes.REDIRECT.value,
|
||||
"to": f"http://localhost?code={code.code}&state={state}",
|
||||
},
|
||||
)
|
||||
|
||||
def test_full_implicit(self):
|
||||
"""Test full authorization"""
|
||||
flow = Flow.objects.create(slug="empty")
|
||||
provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id="test",
|
||||
authorization_flow=flow,
|
||||
redirect_uris="http://localhost",
|
||||
)
|
||||
Application.objects.create(name="app", slug="app", provider=provider)
|
||||
state = generate_client_id()
|
||||
user = User.objects.get(username="akadmin")
|
||||
self.client.force_login(user)
|
||||
# Step 1, initiate params and get redirect to flow
|
||||
self.client.get(
|
||||
reverse("authentik_providers_oauth2:authorize"),
|
||||
data={
|
||||
"response_type": "id_token",
|
||||
"client_id": "test",
|
||||
"state": state,
|
||||
"scope": "openid",
|
||||
"redirect_uri": "http://localhost",
|
||||
},
|
||||
)
|
||||
response = self.client.get(
|
||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||
)
|
||||
token: RefreshToken = RefreshToken.objects.filter(user=user).first()
|
||||
self.assertJSONEqual(
|
||||
force_str(response.content),
|
||||
{
|
||||
"type": ChallengeTypes.REDIRECT.value,
|
||||
"to": (
|
||||
f"http://localhost#access_token={token.access_token}"
|
||||
f"&id_token={provider.encode(token.id_token.to_dict())}&token_type=bearer"
|
||||
f"&expires_in=600&state={state}"
|
||||
),
|
||||
},
|
||||
)
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
"""Test token view"""
|
||||
from base64 import b64encode
|
||||
|
||||
from django.test import RequestFactory, TestCase
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_str
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.flows.models import Flow
|
||||
from authentik.providers.oauth2.constants import (
|
||||
GRANT_TYPE_AUTHORIZATION_CODE,
|
||||
GRANT_TYPE_REFRESH_TOKEN,
|
||||
)
|
||||
from authentik.providers.oauth2.generators import (
|
||||
generate_client_id,
|
||||
generate_client_secret,
|
||||
)
|
||||
from authentik.providers.oauth2.models import (
|
||||
AuthorizationCode,
|
||||
OAuth2Provider,
|
||||
RefreshToken,
|
||||
)
|
||||
from authentik.providers.oauth2.views.token import TokenParams
|
||||
|
||||
|
||||
class TestViewsToken(TestCase):
|
||||
"""Test token view"""
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.factory = RequestFactory()
|
||||
|
||||
def test_request_auth_code(self):
|
||||
"""test request param"""
|
||||
provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id=generate_client_id(),
|
||||
client_secret=generate_client_secret(),
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
header = b64encode(
|
||||
f"{provider.client_id}:{provider.client_secret}".encode()
|
||||
).decode()
|
||||
user = User.objects.get(username="akadmin")
|
||||
code = AuthorizationCode.objects.create(
|
||||
code="foobar", provider=provider, user=user
|
||||
)
|
||||
request = self.factory.post(
|
||||
"/",
|
||||
data={
|
||||
"grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
|
||||
"code": code.code,
|
||||
"redirect_uri": "http://local.invalid",
|
||||
},
|
||||
HTTP_AUTHORIZATION=f"Basic {header}",
|
||||
)
|
||||
params = TokenParams.from_request(request)
|
||||
self.assertEqual(params.provider, provider)
|
||||
|
||||
def test_request_refresh_token(self):
|
||||
"""test request param"""
|
||||
provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id=generate_client_id(),
|
||||
client_secret=generate_client_secret(),
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
header = b64encode(
|
||||
f"{provider.client_id}:{provider.client_secret}".encode()
|
||||
).decode()
|
||||
user = User.objects.get(username="akadmin")
|
||||
token: RefreshToken = RefreshToken.objects.create(
|
||||
provider=provider,
|
||||
user=user,
|
||||
refresh_token=generate_client_id(),
|
||||
)
|
||||
request = self.factory.post(
|
||||
"/",
|
||||
data={
|
||||
"grant_type": GRANT_TYPE_REFRESH_TOKEN,
|
||||
"refresh_token": token.refresh_token,
|
||||
"redirect_uri": "http://local.invalid",
|
||||
},
|
||||
HTTP_AUTHORIZATION=f"Basic {header}",
|
||||
)
|
||||
params = TokenParams.from_request(request)
|
||||
self.assertEqual(params.provider, provider)
|
||||
|
||||
def test_auth_code_view(self):
|
||||
"""test request param"""
|
||||
provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id=generate_client_id(),
|
||||
client_secret=generate_client_secret(),
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
header = b64encode(
|
||||
f"{provider.client_id}:{provider.client_secret}".encode()
|
||||
).decode()
|
||||
user = User.objects.get(username="akadmin")
|
||||
code = AuthorizationCode.objects.create(
|
||||
code="foobar", provider=provider, user=user
|
||||
)
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
data={
|
||||
"grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
|
||||
"code": code.code,
|
||||
"redirect_uri": "http://local.invalid",
|
||||
},
|
||||
HTTP_AUTHORIZATION=f"Basic {header}",
|
||||
)
|
||||
new_token: RefreshToken = RefreshToken.objects.filter(user=user).first()
|
||||
self.assertJSONEqual(
|
||||
force_str(response.content),
|
||||
{
|
||||
"access_token": new_token.access_token,
|
||||
"refresh_token": new_token.refresh_token,
|
||||
"token_type": "bearer",
|
||||
"expires_in": 600,
|
||||
"id_token": provider.encode(
|
||||
new_token.id_token.to_dict(),
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
def test_refresh_token_view(self):
|
||||
"""test request param"""
|
||||
provider = OAuth2Provider.objects.create(
|
||||
name="test",
|
||||
client_id=generate_client_id(),
|
||||
client_secret=generate_client_secret(),
|
||||
authorization_flow=Flow.objects.first(),
|
||||
redirect_uris="http://local.invalid",
|
||||
)
|
||||
header = b64encode(
|
||||
f"{provider.client_id}:{provider.client_secret}".encode()
|
||||
).decode()
|
||||
user = User.objects.get(username="akadmin")
|
||||
token: RefreshToken = RefreshToken.objects.create(
|
||||
provider=provider,
|
||||
user=user,
|
||||
refresh_token=generate_client_id(),
|
||||
)
|
||||
response = self.client.post(
|
||||
reverse("authentik_providers_oauth2:token"),
|
||||
data={
|
||||
"grant_type": GRANT_TYPE_REFRESH_TOKEN,
|
||||
"refresh_token": token.refresh_token,
|
||||
"redirect_uri": "http://local.invalid",
|
||||
},
|
||||
HTTP_AUTHORIZATION=f"Basic {header}",
|
||||
)
|
||||
new_token: RefreshToken = (
|
||||
RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
|
||||
)
|
||||
self.assertJSONEqual(
|
||||
force_str(response.content),
|
||||
{
|
||||
"access_token": new_token.access_token,
|
||||
"refresh_token": new_token.refresh_token,
|
||||
"token_type": "bearer",
|
||||
"expires_in": 600,
|
||||
"id_token": provider.encode(
|
||||
new_token.id_token.to_dict(),
|
||||
),
|
||||
},
|
||||
)
|
|
@ -198,7 +198,7 @@ class TokenView(View):
|
|||
response_dict = {
|
||||
"access_token": refresh_token.access_token,
|
||||
"refresh_token": refresh_token.refresh_token,
|
||||
"token_type": "Bearer",
|
||||
"token_type": "bearer",
|
||||
"expires_in": timedelta_from_string(
|
||||
self.params.provider.token_validity
|
||||
).seconds,
|
||||
|
|
Reference in New Issue