375 lines
14 KiB
Python
375 lines
14 KiB
Python
|
"""passbook OAuth2 Authorization views"""
|
||
|
from dataclasses import dataclass, field
|
||
|
from typing import List, Optional, Set
|
||
|
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
|
||
|
from uuid import uuid4
|
||
|
|
||
|
from django.http import HttpRequest, HttpResponse
|
||
|
from django.shortcuts import get_object_or_404, redirect
|
||
|
from django.utils import timezone
|
||
|
from django.views import View
|
||
|
from structlog import get_logger
|
||
|
|
||
|
from passbook.core.models import Application, Token
|
||
|
from passbook.flows.models import in_memory_stage
|
||
|
from passbook.flows.planner import (
|
||
|
PLAN_CONTEXT_APPLICATION,
|
||
|
PLAN_CONTEXT_SSO,
|
||
|
FlowPlan,
|
||
|
FlowPlanner,
|
||
|
)
|
||
|
from passbook.flows.stage import StageView
|
||
|
from passbook.flows.views import SESSION_KEY_PLAN
|
||
|
from passbook.lib.utils.time import timedelta_from_string
|
||
|
from passbook.lib.utils.urls import redirect_with_qs
|
||
|
from passbook.lib.views import bad_request_message
|
||
|
from passbook.policies.mixins import PolicyAccessMixin
|
||
|
from passbook.providers.oauth2.constants import (
|
||
|
PROMPT_CONSNET,
|
||
|
PROMPT_NONE,
|
||
|
SCOPE_OPENID,
|
||
|
)
|
||
|
from passbook.providers.oauth2.errors import (
|
||
|
AuthorizeError,
|
||
|
ClientIdError,
|
||
|
OAuth2Error,
|
||
|
RedirectUriError,
|
||
|
)
|
||
|
from passbook.providers.oauth2.models import (
|
||
|
AuthorizationCode,
|
||
|
GrantTypes,
|
||
|
OAuth2Provider,
|
||
|
ResponseTypes,
|
||
|
)
|
||
|
from passbook.providers.oauth2.views.userinfo import UserInfoView
|
||
|
from passbook.stages.consent.models import ConsentMode, ConsentStage
|
||
|
from passbook.stages.consent.stage import (
|
||
|
PLAN_CONTEXT_CONSENT_TEMPLATE,
|
||
|
ConsentStageView,
|
||
|
)
|
||
|
|
||
|
LOGGER = get_logger()
|
||
|
|
||
|
PLAN_CONTEXT_PARAMS = "params"
|
||
|
PLAN_CONTEXT_SCOPE_DESCRIPTIONS = "scope_descriptions"
|
||
|
|
||
|
ALLOWED_PROMPT_PARAMS = {PROMPT_NONE, PROMPT_CONSNET}
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
# pylint: disable=too-many-instance-attributes
|
||
|
class OAuthAuthorizationParams:
|
||
|
"""Parameteres required to authorize an OAuth Client"""
|
||
|
|
||
|
client_id: str
|
||
|
redirect_uri: str
|
||
|
response_type: str
|
||
|
scope: List[str]
|
||
|
state: str
|
||
|
nonce: str
|
||
|
prompt: Set[str]
|
||
|
grant_type: str
|
||
|
|
||
|
provider: OAuth2Provider = field(default_factory=OAuth2Provider)
|
||
|
|
||
|
code_challenge: Optional[str] = None
|
||
|
code_challenge_method: Optional[str] = None
|
||
|
|
||
|
@staticmethod
|
||
|
def from_request(request: HttpRequest) -> "OAuthAuthorizationParams":
|
||
|
"""
|
||
|
Get all the params used by the Authorization Code Flow
|
||
|
(and also for the Implicit and Hybrid).
|
||
|
|
||
|
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
|
||
|
"""
|
||
|
# Because in this endpoint we handle both GET
|
||
|
# and POST request.
|
||
|
query_dict = request.POST if request.method == "POST" else request.GET
|
||
|
|
||
|
response_type = query_dict.get("response_type", "")
|
||
|
grant_type = None
|
||
|
# Determine which flow to use.
|
||
|
if response_type in [ResponseTypes.CODE]:
|
||
|
grant_type = GrantTypes.AUTHORIZATION_CODE
|
||
|
elif response_type in [
|
||
|
ResponseTypes.id_token,
|
||
|
ResponseTypes.id_token_token,
|
||
|
ResponseTypes.token,
|
||
|
]:
|
||
|
grant_type = GrantTypes.IMPLICIT
|
||
|
elif response_type in [
|
||
|
ResponseTypes.CODE_TOKEN,
|
||
|
ResponseTypes.CODE_ID_TOKEN,
|
||
|
ResponseTypes.CODE_ID_TOKEN_TOKEN,
|
||
|
]:
|
||
|
grant_type = GrantTypes.HYBRID
|
||
|
|
||
|
# Grant type validation.
|
||
|
if not grant_type:
|
||
|
LOGGER.warning("Invalid response type", type=response_type)
|
||
|
raise AuthorizeError(
|
||
|
query_dict.get("redirect_uri", ""),
|
||
|
"unsupported_response_type",
|
||
|
grant_type,
|
||
|
)
|
||
|
|
||
|
return OAuthAuthorizationParams(
|
||
|
client_id=query_dict.get("client_id", ""),
|
||
|
redirect_uri=query_dict.get("redirect_uri", ""),
|
||
|
response_type=response_type,
|
||
|
grant_type=grant_type,
|
||
|
scope=query_dict.get("scope", "").split(),
|
||
|
state=query_dict.get("state", ""),
|
||
|
nonce=query_dict.get("nonce", ""),
|
||
|
prompt=ALLOWED_PROMPT_PARAMS.intersection(
|
||
|
set(query_dict.get("prompt", "").split())
|
||
|
),
|
||
|
code_challenge=query_dict.get("code_challenge"),
|
||
|
code_challenge_method=query_dict.get("code_challenge_method"),
|
||
|
)
|
||
|
|
||
|
def __post_init__(self):
|
||
|
try:
|
||
|
self.provider: OAuth2Provider = OAuth2Provider.objects.get(
|
||
|
client_id=self.client_id
|
||
|
)
|
||
|
except OAuth2Provider.DoesNotExist:
|
||
|
LOGGER.warning("Invalid client identifier", client_id=self.client_id)
|
||
|
raise ClientIdError()
|
||
|
is_open_id = SCOPE_OPENID in self.scope
|
||
|
|
||
|
# Redirect URI validation.
|
||
|
if is_open_id and not self.redirect_uri:
|
||
|
LOGGER.warning("Missing redirect uri.")
|
||
|
raise RedirectUriError()
|
||
|
if self.redirect_uri not in self.provider.redirect_uris:
|
||
|
LOGGER.warning("Invalid redirect uri", redirect_uri=self.redirect_uri)
|
||
|
raise RedirectUriError()
|
||
|
|
||
|
if not is_open_id and (
|
||
|
self.grant_type == GrantTypes.HYBRID
|
||
|
or self.response_type
|
||
|
in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
|
||
|
):
|
||
|
LOGGER.warning("Missing 'openid' scope.")
|
||
|
raise AuthorizeError(self.redirect_uri, "invalid_scope", self.grant_type)
|
||
|
|
||
|
# Nonce parameter validation.
|
||
|
if is_open_id and self.grant_type == GrantTypes.IMPLICIT and not self.nonce:
|
||
|
raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type)
|
||
|
|
||
|
# Response type parameter validation.
|
||
|
if is_open_id and self.response_type != self.provider.response_type:
|
||
|
raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type)
|
||
|
|
||
|
# PKCE validation of the transformation method.
|
||
|
if self.code_challenge:
|
||
|
if not (self.code_challenge_method in ["plain", "S256"]):
|
||
|
raise AuthorizeError(
|
||
|
self.redirect_uri, "invalid_request", self.grant_type
|
||
|
)
|
||
|
|
||
|
def create_code(self, request: HttpRequest) -> AuthorizationCode:
|
||
|
"""Create an AuthorizationCode object for the request"""
|
||
|
code = AuthorizationCode()
|
||
|
code.user = request.user
|
||
|
code.provider = self.provider
|
||
|
|
||
|
code.code = uuid4().hex
|
||
|
|
||
|
if self.code_challenge and self.code_challenge_method:
|
||
|
code.code_challenge = self.code_challenge
|
||
|
code.code_challenge_method = self.code_challenge_method
|
||
|
|
||
|
code.expires_at = timezone.now() + timedelta_from_string(
|
||
|
self.provider.token_validity
|
||
|
)
|
||
|
code.scope = self.scope
|
||
|
code.nonce = self.nonce
|
||
|
code.is_open_id = SCOPE_OPENID in self.scope
|
||
|
|
||
|
return code
|
||
|
|
||
|
|
||
|
class OAuthFulfillmentStage(StageView):
|
||
|
"""Final stage, restores params from Flow."""
|
||
|
|
||
|
params: OAuthAuthorizationParams
|
||
|
provider: OAuth2Provider
|
||
|
|
||
|
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
|
||
|
self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(
|
||
|
PLAN_CONTEXT_PARAMS
|
||
|
)
|
||
|
application: Application = self.executor.plan.context.pop(
|
||
|
PLAN_CONTEXT_APPLICATION
|
||
|
)
|
||
|
self.provider = get_object_or_404(OAuth2Provider, pk=application.provider_id)
|
||
|
try:
|
||
|
# At this point we don't need to check permissions anymore
|
||
|
if {PROMPT_NONE, PROMPT_CONSNET}.issubset(self.params.prompt):
|
||
|
raise AuthorizeError(
|
||
|
self.params.redirect_uri,
|
||
|
"consent_required",
|
||
|
self.params.grant_type,
|
||
|
)
|
||
|
return redirect(self.create_response_uri())
|
||
|
except (ClientIdError, RedirectUriError) as error:
|
||
|
# pylint: disable=no-member
|
||
|
return bad_request_message(request, error.description, title=error.error)
|
||
|
except AuthorizeError as error:
|
||
|
uri = error.create_uri(self.params.redirect_uri, self.params.state)
|
||
|
return redirect(uri)
|
||
|
|
||
|
def create_response_uri(self) -> str:
|
||
|
"""Create a final Response URI the user is redirected to."""
|
||
|
uri = urlsplit(self.params.redirect_uri)
|
||
|
query_params = parse_qs(uri.query)
|
||
|
query_fragment = {}
|
||
|
|
||
|
try:
|
||
|
code = None
|
||
|
|
||
|
if self.params.grant_type in [
|
||
|
GrantTypes.AUTHORIZATION_CODE,
|
||
|
GrantTypes.HYBRID,
|
||
|
]:
|
||
|
code = self.params.create_code(self.request)
|
||
|
code.save()
|
||
|
|
||
|
if self.params.grant_type == GrantTypes.AUTHORIZATION_CODE:
|
||
|
query_params["code"] = code.code
|
||
|
query_params["state"] = [
|
||
|
str(self.params.state) if self.params.state else ""
|
||
|
]
|
||
|
elif self.params.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]:
|
||
|
token: Token = self.provider.create_token(
|
||
|
user=self.request.user, scope=self.params.scope,
|
||
|
)
|
||
|
|
||
|
# Check if response_type must include access_token in the response.
|
||
|
if self.params.response_type in [
|
||
|
ResponseTypes.id_token_token,
|
||
|
ResponseTypes.code_id_token_token,
|
||
|
ResponseTypes.token,
|
||
|
ResponseTypes.code_token,
|
||
|
]:
|
||
|
query_fragment["access_token"] = token.access_token
|
||
|
|
||
|
# We don't need id_token if it's an OAuth2 request.
|
||
|
if SCOPE_OPENID in self.params.scope:
|
||
|
id_token = token.create_id_token(
|
||
|
user=self.request.user,
|
||
|
request=self.request,
|
||
|
scope=self.params.scope,
|
||
|
)
|
||
|
id_token.nonce = self.params.nonce
|
||
|
id_token.scope = self.params.scope
|
||
|
# Include at_hash when access_token is being returned.
|
||
|
if "access_token" in query_fragment:
|
||
|
id_token.at_hash = token.at_hash
|
||
|
|
||
|
# Check if response_type must include id_token in the response.
|
||
|
if self.params.response_type in [
|
||
|
ResponseTypes.ID_TOKEN,
|
||
|
ResponseTypes.ID_TOKEN_TOKEN,
|
||
|
ResponseTypes.CODE_ID_TOKEN,
|
||
|
ResponseTypes.CODE_ID_TOKEN_TOKEN,
|
||
|
]:
|
||
|
query_fragment["id_token"] = id_token.encode(self.provider)
|
||
|
token.id_token = id_token
|
||
|
else:
|
||
|
token.id_token = {}
|
||
|
|
||
|
# Store the token.
|
||
|
token.save()
|
||
|
|
||
|
# Code parameter must be present if it's Hybrid Flow.
|
||
|
if self.params.grant_type == GrantTypes.HYBRID:
|
||
|
query_fragment["code"] = code.code
|
||
|
|
||
|
query_fragment["token_type"] = "bearer"
|
||
|
query_fragment["expires_in"] = timedelta_from_string(
|
||
|
self.provider.token_validity
|
||
|
).seconds
|
||
|
query_fragment["state"] = self.params.state if self.params.state else ""
|
||
|
|
||
|
except OAuth2Error as error:
|
||
|
LOGGER.exception("Error when trying to create response uri", error=error)
|
||
|
raise AuthorizeError(
|
||
|
self.params.redirect_uri, "server_error", self.params.grant_type
|
||
|
)
|
||
|
|
||
|
uri = uri._replace(
|
||
|
query=urlencode(query_params, doseq=True),
|
||
|
fragment=uri.fragment + urlencode(query_fragment, doseq=True),
|
||
|
)
|
||
|
|
||
|
return urlunsplit(uri)
|
||
|
|
||
|
|
||
|
class AuthorizationFlowInitView(PolicyAccessMixin, View):
|
||
|
"""OAuth2 Flow initializer, checks access to application and starts flow"""
|
||
|
|
||
|
# pylint: disable=unused-argument
|
||
|
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
|
||
|
"""Check access to application, start FlowPLanner, return to flow executor shell"""
|
||
|
client_id = request.GET.get("client_id")
|
||
|
# TODO: This whole block should be moved to a base class
|
||
|
provider = get_object_or_404(OAuth2Provider, client_id=client_id)
|
||
|
try:
|
||
|
application = self.provider_to_application(provider)
|
||
|
except Application.DoesNotExist:
|
||
|
return self.handle_no_permission_authorized()
|
||
|
# Check if user is unauthenticated, so we pass the application
|
||
|
# for the identification stage
|
||
|
if not request.user.is_authenticated:
|
||
|
return self.handle_no_permission(application)
|
||
|
# Check permissions
|
||
|
result = self.user_has_access(application)
|
||
|
if not result.passing:
|
||
|
return self.handle_no_permission_authorized()
|
||
|
# TODO: End block
|
||
|
# Extract params so we can save them in the plan context
|
||
|
try:
|
||
|
params = OAuthAuthorizationParams.from_request(request)
|
||
|
except (ClientIdError, RedirectUriError) as error:
|
||
|
# pylint: disable=no-member
|
||
|
return bad_request_message(request, error.description, title=error.error)
|
||
|
# Regardless, we start the planner and return to it
|
||
|
planner = FlowPlanner(provider.authorization_flow)
|
||
|
# planner.use_cache = False
|
||
|
planner.allow_empty_flows = True
|
||
|
plan: FlowPlan = planner.plan(
|
||
|
self.request,
|
||
|
{
|
||
|
PLAN_CONTEXT_SSO: True,
|
||
|
PLAN_CONTEXT_APPLICATION: application,
|
||
|
# OAuth2 related params
|
||
|
PLAN_CONTEXT_PARAMS: params,
|
||
|
PLAN_CONTEXT_SCOPE_DESCRIPTIONS: UserInfoView().get_scope_descriptions(
|
||
|
params.scope
|
||
|
),
|
||
|
# Consent related params
|
||
|
PLAN_CONTEXT_CONSENT_TEMPLATE: "providers/oauth2/consent.html",
|
||
|
},
|
||
|
)
|
||
|
# OpenID clients can specify a `prompt` parameter, and if its set to consent we
|
||
|
# need to inject a consent stage
|
||
|
if PROMPT_CONSNET in params.prompt:
|
||
|
if not any([isinstance(x, ConsentStageView) for x in plan.stages]):
|
||
|
# Plan does not have any consent stage, so we add an in-memory one
|
||
|
stage = ConsentStage(
|
||
|
name="OAuth2 Provider In-memory consent stage",
|
||
|
mode=ConsentMode.ALWAYS_REQUIRE,
|
||
|
)
|
||
|
plan.append(stage)
|
||
|
plan.append(in_memory_stage(OAuthFulfillmentStage))
|
||
|
self.request.session[SESSION_KEY_PLAN] = plan
|
||
|
return redirect_with_qs(
|
||
|
"passbook_flows:flow-executor-shell",
|
||
|
self.request.GET,
|
||
|
flow_slug=provider.authorization_flow.slug,
|
||
|
)
|