This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/passbook/providers/oauth2/views/authorize.py

395 lines
15 KiB
Python
Raw Normal View History

2020-08-19 08:32:44 +00:00
"""passbook OAuth2 Authorization views"""
from dataclasses import dataclass, field
from typing import List, Optional, Set
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
from uuid import uuid4
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils import timezone
from django.views import View
from structlog import get_logger
2020-10-05 20:09:57 +00:00
from passbook.audit.models import Event, EventAction
from passbook.core.models import Application
2020-08-19 08:32:44 +00:00
from passbook.flows.models import in_memory_stage
from passbook.flows.planner import (
PLAN_CONTEXT_APPLICATION,
PLAN_CONTEXT_SSO,
FlowPlan,
FlowPlanner,
)
from passbook.flows.stage import StageView
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.utils.time import timedelta_from_string
from passbook.lib.utils.urls import redirect_with_qs
from passbook.lib.views import bad_request_message
from passbook.policies.mixins import PolicyAccessMixin
from passbook.providers.oauth2.constants import (
PROMPT_CONSNET,
PROMPT_NONE,
SCOPE_OPENID,
)
from passbook.providers.oauth2.errors import (
AuthorizeError,
ClientIdError,
OAuth2Error,
RedirectUriError,
)
from passbook.providers.oauth2.models import (
AuthorizationCode,
GrantTypes,
OAuth2Provider,
ResponseTypes,
)
from passbook.providers.oauth2.views.userinfo import UserInfoView
from passbook.stages.consent.models import ConsentMode, ConsentStage
from passbook.stages.consent.stage import (
PLAN_CONTEXT_CONSENT_TEMPLATE,
ConsentStageView,
)
LOGGER = get_logger()
PLAN_CONTEXT_PARAMS = "params"
PLAN_CONTEXT_SCOPE_DESCRIPTIONS = "scope_descriptions"
ALLOWED_PROMPT_PARAMS = {PROMPT_NONE, PROMPT_CONSNET}
@dataclass
class OAuthAuthorizationParams:
"""Parameteres required to authorize an OAuth Client"""
client_id: str
redirect_uri: str
response_type: str
scope: List[str]
state: str
nonce: str
prompt: Set[str]
grant_type: str
provider: OAuth2Provider = field(default_factory=OAuth2Provider)
code_challenge: Optional[str] = None
code_challenge_method: Optional[str] = None
@staticmethod
def from_request(request: HttpRequest) -> "OAuthAuthorizationParams":
"""
Get all the params used by the Authorization Code Flow
(and also for the Implicit and Hybrid).
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
# Because in this endpoint we handle both GET
# and POST request.
query_dict = request.POST if request.method == "POST" else request.GET
response_type = query_dict.get("response_type", "")
grant_type = None
# Determine which flow to use.
if response_type in [ResponseTypes.CODE, ResponseTypes.CODE_ADFS]:
2020-08-19 08:32:44 +00:00
grant_type = GrantTypes.AUTHORIZATION_CODE
elif response_type in [
ResponseTypes.ID_TOKEN,
ResponseTypes.ID_TOKEN_TOKEN,
ResponseTypes.CODE_TOKEN,
2020-08-19 08:32:44 +00:00
]:
grant_type = GrantTypes.IMPLICIT
elif response_type in [
ResponseTypes.CODE_TOKEN,
ResponseTypes.CODE_ID_TOKEN,
ResponseTypes.CODE_ID_TOKEN_TOKEN,
]:
grant_type = GrantTypes.HYBRID
# Grant type validation.
if not grant_type:
LOGGER.warning("Invalid response type", type=response_type)
raise AuthorizeError(
query_dict.get("redirect_uri", ""),
"unsupported_response_type",
grant_type,
)
return OAuthAuthorizationParams(
client_id=query_dict.get("client_id", ""),
redirect_uri=query_dict.get("redirect_uri", ""),
response_type=response_type,
grant_type=grant_type,
scope=query_dict.get("scope", "").split(),
state=query_dict.get("state", ""),
nonce=query_dict.get("nonce", ""),
prompt=ALLOWED_PROMPT_PARAMS.intersection(
set(query_dict.get("prompt", "").split())
),
code_challenge=query_dict.get("code_challenge"),
code_challenge_method=query_dict.get("code_challenge_method"),
)
def __post_init__(self):
try:
self.provider: OAuth2Provider = OAuth2Provider.objects.get(
client_id=self.client_id
)
except OAuth2Provider.DoesNotExist:
LOGGER.warning("Invalid client identifier", client_id=self.client_id)
raise ClientIdError()
is_open_id = SCOPE_OPENID in self.scope
# Redirect URI validation.
if is_open_id and not self.redirect_uri:
LOGGER.warning("Missing redirect uri.")
raise RedirectUriError()
if self.redirect_uri.lower() not in [
x.lower() for x in self.provider.redirect_uris.split()
]:
LOGGER.warning(
"Invalid redirect uri",
redirect_uri=self.redirect_uri,
excepted=self.provider.redirect_uris.split(),
)
2020-08-19 08:32:44 +00:00
raise RedirectUriError()
if not is_open_id and (
self.grant_type == GrantTypes.HYBRID
or self.response_type
in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
):
LOGGER.warning("Missing 'openid' scope.")
raise AuthorizeError(self.redirect_uri, "invalid_scope", self.grant_type)
# Nonce parameter validation.
if is_open_id and self.grant_type == GrantTypes.IMPLICIT and not self.nonce:
raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type)
# Response type parameter validation.
if is_open_id:
actual_response_type = self.provider.response_type
if "#" in self.provider.response_type:
hash_index = actual_response_type.index("#")
actual_response_type = actual_response_type[:hash_index]
if self.response_type != actual_response_type:
raise AuthorizeError(
self.redirect_uri, "invalid_request", self.grant_type
)
2020-08-19 08:32:44 +00:00
# PKCE validation of the transformation method.
if self.code_challenge:
if not (self.code_challenge_method in ["plain", "S256"]):
raise AuthorizeError(
self.redirect_uri, "invalid_request", self.grant_type
)
def create_code(self, request: HttpRequest) -> AuthorizationCode:
"""Create an AuthorizationCode object for the request"""
code = AuthorizationCode()
code.user = request.user
code.provider = self.provider
code.code = uuid4().hex
if self.code_challenge and self.code_challenge_method:
code.code_challenge = self.code_challenge
code.code_challenge_method = self.code_challenge_method
code.expires_at = timezone.now() + timedelta_from_string(
self.provider.token_validity
)
code.scope = self.scope
code.nonce = self.nonce
code.is_open_id = SCOPE_OPENID in self.scope
return code
class OAuthFulfillmentStage(StageView):
"""Final stage, restores params from Flow."""
params: OAuthAuthorizationParams
provider: OAuth2Provider
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(
PLAN_CONTEXT_PARAMS
)
application: Application = self.executor.plan.context.pop(
PLAN_CONTEXT_APPLICATION
)
self.provider = get_object_or_404(OAuth2Provider, pk=application.provider_id)
try:
# At this point we don't need to check permissions anymore
if {PROMPT_NONE, PROMPT_CONSNET}.issubset(self.params.prompt):
raise AuthorizeError(
self.params.redirect_uri,
"consent_required",
self.params.grant_type,
)
2020-10-05 20:09:57 +00:00
Event.new(
EventAction.AUTHORIZE_APPLICATION,
authorized_application=application,
flow=self.executor.plan.flow_pk,
).from_http(self.request)
2020-08-19 08:32:44 +00:00
return redirect(self.create_response_uri())
except (ClientIdError, RedirectUriError) as error:
self.executor.stage_invalid()
2020-08-19 08:32:44 +00:00
# pylint: disable=no-member
return bad_request_message(request, error.description, title=error.error)
except AuthorizeError as error:
self.executor.stage_invalid()
2020-08-19 08:32:44 +00:00
uri = error.create_uri(self.params.redirect_uri, self.params.state)
return redirect(uri)
def create_response_uri(self) -> str:
"""Create a final Response URI the user is redirected to."""
uri = urlsplit(self.params.redirect_uri)
query_params = parse_qs(uri.query)
query_fragment = {}
try:
code = None
if self.params.grant_type in [
GrantTypes.AUTHORIZATION_CODE,
GrantTypes.HYBRID,
]:
code = self.params.create_code(self.request)
code.save()
if self.params.grant_type == GrantTypes.AUTHORIZATION_CODE:
query_params["code"] = code.code
query_params["state"] = [
str(self.params.state) if self.params.state else ""
]
elif self.params.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]:
token = self.provider.create_refresh_token(
2020-09-30 17:34:22 +00:00
user=self.request.user,
scope=self.params.scope,
2020-08-19 08:32:44 +00:00
)
# Check if response_type must include access_token in the response.
if self.params.response_type in [
ResponseTypes.ID_TOKEN_TOKEN,
ResponseTypes.CODE_ID_TOKEN_TOKEN,
ResponseTypes.ID_TOKEN,
ResponseTypes.CODE_TOKEN,
2020-08-19 08:32:44 +00:00
]:
query_fragment["access_token"] = token.access_token
# We don't need id_token if it's an OAuth2 request.
if SCOPE_OPENID in self.params.scope:
id_token = token.create_id_token(
2020-09-30 17:34:22 +00:00
user=self.request.user,
request=self.request,
2020-08-19 08:32:44 +00:00
)
id_token.nonce = self.params.nonce
2020-08-19 08:32:44 +00:00
# Include at_hash when access_token is being returned.
if "access_token" in query_fragment:
id_token.at_hash = token.at_hash
# Check if response_type must include id_token in the response.
if self.params.response_type in [
ResponseTypes.ID_TOKEN,
ResponseTypes.ID_TOKEN_TOKEN,
ResponseTypes.CODE_ID_TOKEN,
ResponseTypes.CODE_ID_TOKEN_TOKEN,
]:
query_fragment["id_token"] = self.provider.encode(
id_token.to_dict()
)
2020-08-19 08:32:44 +00:00
token.id_token = id_token
# Store the token.
token.save()
# Code parameter must be present if it's Hybrid Flow.
if self.params.grant_type == GrantTypes.HYBRID:
query_fragment["code"] = code.code
query_fragment["token_type"] = "bearer"
query_fragment["expires_in"] = timedelta_from_string(
self.provider.token_validity
).seconds
query_fragment["state"] = self.params.state if self.params.state else ""
except OAuth2Error as error:
LOGGER.exception("Error when trying to create response uri", error=error)
raise AuthorizeError(
self.params.redirect_uri, "server_error", self.params.grant_type
)
uri = uri._replace(
query=urlencode(query_params, doseq=True),
fragment=uri.fragment + urlencode(query_fragment, doseq=True),
)
return urlunsplit(uri)
class AuthorizationFlowInitView(PolicyAccessMixin, View):
"""OAuth2 Flow initializer, checks access to application and starts flow"""
# pylint: disable=unused-argument
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Check access to application, start FlowPLanner, return to flow executor shell"""
client_id = request.GET.get("client_id")
# TODO: This whole block should be moved to a base class
provider = get_object_or_404(OAuth2Provider, client_id=client_id)
try:
application = self.provider_to_application(provider)
except Application.DoesNotExist:
return self.handle_no_permission_authenticated()
2020-08-19 08:32:44 +00:00
# Check if user is unauthenticated, so we pass the application
# for the identification stage
if not request.user.is_authenticated:
return self.handle_no_permission(application)
# Check permissions
result = self.user_has_access(application)
if not result.passing:
return self.handle_no_permission_authenticated(result)
2020-08-19 08:32:44 +00:00
# TODO: End block
# Extract params so we can save them in the plan context
try:
params = OAuthAuthorizationParams.from_request(request)
except (ClientIdError, RedirectUriError) as error:
# pylint: disable=no-member
return bad_request_message(request, error.description, title=error.error)
# Regardless, we start the planner and return to it
planner = FlowPlanner(provider.authorization_flow)
# planner.use_cache = False
planner.allow_empty_flows = True
plan: FlowPlan = planner.plan(
self.request,
{
PLAN_CONTEXT_SSO: True,
PLAN_CONTEXT_APPLICATION: application,
# OAuth2 related params
PLAN_CONTEXT_PARAMS: params,
PLAN_CONTEXT_SCOPE_DESCRIPTIONS: UserInfoView().get_scope_descriptions(
params.scope
),
# Consent related params
PLAN_CONTEXT_CONSENT_TEMPLATE: "providers/oauth2/consent.html",
},
)
# OpenID clients can specify a `prompt` parameter, and if its set to consent we
# need to inject a consent stage
if PROMPT_CONSNET in params.prompt:
if not any([isinstance(x, ConsentStageView) for x in plan.stages]):
# Plan does not have any consent stage, so we add an in-memory one
stage = ConsentStage(
name="OAuth2 Provider In-memory consent stage",
mode=ConsentMode.ALWAYS_REQUIRE,
)
plan.append(stage)
plan.append(in_memory_stage(OAuthFulfillmentStage))
self.request.session[SESSION_KEY_PLAN] = plan
return redirect_with_qs(
"passbook_flows:flow-executor-shell",
self.request.GET,
flow_slug=provider.authorization_flow.slug,
)