flows: complete migration to FlowExecutorView, fully use context

This commit is contained in:
Jens Langhammer 2020-05-08 16:10:27 +02:00
parent 114bb1b0bd
commit 2a85e5ae87
16 changed files with 180 additions and 416 deletions

View File

@ -16,8 +16,11 @@ from passbook.core.forms.authentication import LoginForm, SignUpForm
from passbook.core.models import Invitation, Nonce, Source, User
from passbook.core.signals import invitation_used, user_signed_up
from passbook.factors.password.exceptions import PasswordPolicyInvalid
from passbook.flows.views import AuthenticationView, _redirect_with_qs
from passbook.flows.models import Flow, FlowDesignation
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.config import CONFIG
from passbook.lib.utils.urls import redirect_with_qs
LOGGER = get_logger()
@ -71,8 +74,15 @@ class LoginView(UserPassesTestMixin, FormView):
if not pre_user:
# No user found
return self.invalid_login(self.request)
self.request.session[AuthenticationView.SESSION_PENDING_USER] = pre_user.pk
return _redirect_with_qs("passbook_flows:auth-process", self.request.GET)
# We run the Flow planner here so we can pass the Pending user in the context
flow = get_object_or_404(Flow, designation=FlowDesignation.AUTHENTICATION)
planner = FlowPlanner(flow)
plan = planner.plan(self.request)
plan.context[PLAN_CONTEXT_PENDING_USER] = pre_user
self.request.session[SESSION_KEY_PLAN] = plan
return redirect_with_qs(
"passbook_flows:flow-executor", self.request.GET, flow_slug=flow.slug,
)
def invalid_login(
self, request: HttpRequest, disabled_user: User = None

View File

@ -12,14 +12,12 @@ class CaptchaFactor(FormView, AuthenticationFactor):
form_class = CaptchaForm
def form_valid(self, form):
return self.authenticator.user_ok()
return self.executor.factor_ok()
def get_form(self, form_class=None):
form = CaptchaForm(**self.get_form_kwargs())
form.fields["captcha"].public_key = self.authenticator.current_factor.public_key
form.fields[
"captcha"
].private_key = self.authenticator.current_factor.private_key
form.fields["captcha"].public_key = self.executor.current_factor.public_key
form.fields["captcha"].private_key = self.executor.current_factor.private_key
form.fields["captcha"].widget.attrs["data-sitekey"] = form.fields[
"captcha"
].public_key

View File

@ -9,4 +9,4 @@ class DummyFactor(AuthenticationFactor):
def post(self, request: HttpRequest):
"""Just redirect to next factor"""
return self.authenticator.user_ok()
return self.executor.factor_ok()

View File

@ -1,7 +1,7 @@
"""passbook multi-factor authentication engine"""
from django.contrib import messages
from django.http import HttpRequest
from django.shortcuts import redirect, reverse
from django.shortcuts import reverse
from django.utils.translation import gettext as _
from structlog import get_logger
@ -9,6 +9,7 @@ from passbook.core.models import Nonce
from passbook.factors.email.tasks import send_mails
from passbook.factors.email.utils import TemplateEmailMessage
from passbook.flows.factor_base import AuthenticationFactor
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.lib.config import CONFIG
LOGGER = get_logger()
@ -24,12 +25,13 @@ class EmailFactorView(AuthenticationFactor):
return super().get_context_data(**kwargs)
def get(self, request, *args, **kwargs):
nonce = Nonce.objects.create(user=self.pending_user)
pending_user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
nonce = Nonce.objects.create(user=pending_user)
# Send mail to user
message = TemplateEmailMessage(
subject=_("Forgotten password"),
template_name="email/account_password_reset.html",
to=[self.pending_user.email],
to=[pending_user.email],
template_context={
"url": self.request.build_absolute_uri(
reverse(
@ -39,11 +41,10 @@ class EmailFactorView(AuthenticationFactor):
)
},
)
send_mails(self.authenticator.current_factor, message)
self.authenticator.cleanup()
send_mails(self.executor.current_factor, message)
messages.success(request, _("Check your E-Mails for a password reset link."))
return redirect("passbook_core:auth-login")
return self.executor.cancel()
def post(self, request: HttpRequest):
"""Just redirect to next factor"""
return self.authenticator.user_ok()
return self.executor.factor_ok()

View File

@ -8,6 +8,7 @@ from structlog import get_logger
from passbook.factors.otp.forms import OTPVerifyForm
from passbook.factors.otp.views import OTP_SETTING_UP_KEY, EnableView
from passbook.flows.factor_base import AuthenticationFactor
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
LOGGER = get_logger()
@ -25,31 +26,34 @@ class OTPFactor(FormView, AuthenticationFactor):
def get(self, request, *args, **kwargs):
"""Check if User has OTP enabled and if OTP is enforced"""
if not user_has_device(self.pending_user):
pending_user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
if not user_has_device(pending_user):
LOGGER.debug("User doesn't have OTP Setup.")
if self.authenticator.current_factor.enforced:
if self.executor.current_factor.enforced:
# Redirect to setup view
LOGGER.debug("OTP is enforced, redirecting to setup")
request.user = self.pending_user
LOGGER.debug("Passing GET to EnableView")
request.user = pending_user
messages.info(request, _("OTP is enforced. Please setup OTP."))
return EnableView.as_view()(request)
LOGGER.debug("OTP is not enforced, skipping form")
return self.authenticator.user_ok()
return self.executor.user_ok()
return super().get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
"""Check if setup is in progress and redirect to EnableView"""
if OTP_SETTING_UP_KEY in request.session:
LOGGER.debug("Passing POST to EnableView")
request.user = self.pending_user
request.user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
return EnableView.as_view()(request)
return super().post(self, request, *args, **kwargs)
def form_valid(self, form: OTPVerifyForm):
"""Verify OTP Token"""
device = match_token(self.pending_user, form.cleaned_data.get("code"))
device = match_token(
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER],
form.cleaned_data.get("code"),
)
if device:
return self.authenticator.user_ok()
return self.executor.factor_ok()
messages.error(self.request, _("Invalid OTP."))
return self.form_invalid(form)

View File

@ -13,11 +13,12 @@ from structlog import get_logger
from passbook.core.models import User
from passbook.factors.password.forms import PasswordForm
from passbook.flows.factor_base import AuthenticationFactor
from passbook.flows.views import AuthenticationView
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.lib.config import CONFIG
from passbook.lib.utils.reflection import path_to_class
LOGGER = get_logger()
PLAN_CONTEXT_AUTHENTICATION_BACKEND = "user_backend"
def authenticate(request, backends, **credentials) -> Optional[User]:
@ -56,7 +57,7 @@ class PasswordFactor(FormView, AuthenticationFactor):
"""Authentication factor which authenticates against django's AuthBackend"""
form_class = PasswordForm
template_name = "login/factors/backend.html"
template_name = "factors/password/backend.html"
def form_valid(self, form):
"""Authenticate against django's authentication backend"""
@ -65,18 +66,20 @@ class PasswordFactor(FormView, AuthenticationFactor):
"password": form.cleaned_data.get("password"),
}
for uid_field in uid_fields:
kwargs[uid_field] = getattr(self.authenticator.pending_user, uid_field)
kwargs[uid_field] = getattr(
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER], uid_field
)
try:
user = authenticate(
self.request, self.authenticator.current_factor.backends, **kwargs
self.request, self.executor.current_factor.backends, **kwargs
)
if user:
# User instance returned from authenticate() has .backend property set
self.authenticator.pending_user = user
self.request.session[
AuthenticationView.SESSION_USER_BACKEND
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
self.executor.plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND
] = user.backend
return self.authenticator.user_ok()
return self.executor.factor_ok()
# No user was found -> invalid credentials
LOGGER.debug("Invalid credentials")
# Manually inject error into form
@ -87,4 +90,4 @@ class PasswordFactor(FormView, AuthenticationFactor):
except PermissionDenied:
# User was found, but permission was denied (i.e. user is not active)
LOGGER.debug("Denied access", **kwargs)
return self.authenticator.user_invalid()
return self.executor.factor_invalid()

View File

@ -1,11 +1,13 @@
"""passbook multi-factor authentication engine"""
from typing import Any, Dict
from django.forms import ModelForm
from django.http import HttpRequest
from django.utils.translation import gettext as _
from django.views.generic import TemplateView
from passbook.core.models import User
from passbook.flows.views import AuthenticationView
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.flows.views import FlowExecutorView
from passbook.lib.config import CONFIG
@ -13,19 +15,19 @@ class AuthenticationFactor(TemplateView):
"""Abstract Authentication factor, inherits TemplateView but can be combined with FormView"""
form: ModelForm = None
required: bool = True
authenticator: AuthenticationView
pending_user: User
executor: FlowExecutorView
request: HttpRequest = None
template_name = "login/form_with_user.html"
def __init__(self, authenticator: AuthenticationView):
self.authenticator = authenticator
self.pending_user = None
def __init__(self, executor: FlowExecutorView):
self.executor = executor
def get_context_data(self, **kwargs):
def get_context_data(self, **kwargs: Dict[str, Any]) -> Dict[str, Any]:
kwargs["config"] = CONFIG.y("passbook")
kwargs["title"] = _("Log in to your account")
kwargs["primary_action"] = _("Log in")
kwargs["user"] = self.pending_user
if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
kwargs["user"] = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
return super().get_context_data(**kwargs)

View File

@ -1,7 +1,7 @@
"""Flows Planner"""
from dataclasses import dataclass, field
from time import time
from typing import List, Tuple
from typing import Any, Dict, List, Tuple
from django.http import HttpRequest
from structlog import get_logger
@ -12,6 +12,9 @@ from passbook.policies.engine import PolicyEngine
LOGGER = get_logger()
PLAN_CONTEXT_PENDING_USER = "pending_user"
PLAN_CONTEXT_SSO = "is_sso"
@dataclass
class FlowPlan:
@ -19,6 +22,7 @@ class FlowPlan:
of all Factors that should be run."""
factors: List[Factor] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
def next(self) -> Factor:
"""Return next pending factor from the bottom of the list"""

View File

@ -1,137 +0,0 @@
"""passbook Core Authentication Test"""
import string
from random import SystemRandom
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.test import RequestFactory, TestCase
from django.urls import reverse
from passbook.core.models import User
from passbook.factors.dummy.models import DummyFactor
from passbook.factors.password.models import PasswordFactor
from passbook.flows.views import AuthenticationView
class TestFactorAuthentication(TestCase):
"""passbook Core Authentication Test"""
def setUp(self):
super().setUp()
self.password = "".join(
SystemRandom().choice(string.ascii_uppercase + string.digits)
for _ in range(8)
)
self.factor, _ = PasswordFactor.objects.get_or_create(
slug="password",
defaults={
"name": "password",
"slug": "password",
"order": 0,
"backends": ["django.contrib.auth.backends.ModelBackend"],
},
)
self.user = User.objects.create_user(
username="test", email="test@test.test", password=self.password
)
def test_unauthenticated_raw(self):
"""test direct call to AuthenticationView"""
response = self.client.get(reverse("passbook_flows:auth-process"))
# Response should be 400 since no pending user is set
self.assertEqual(response.status_code, 400)
def test_unauthenticated_prepared(self):
"""test direct call but with pending_uesr in session"""
request = RequestFactory().get(reverse("passbook_flows:auth-process"))
request.user = AnonymousUser()
request.session = {}
request.session[AuthenticationView.SESSION_PENDING_USER] = self.user.pk
response = AuthenticationView.as_view()(request)
self.assertEqual(response.status_code, 200)
def test_no_factors(self):
"""Test with all factors disabled"""
self.factor.enabled = False
self.factor.save()
request = RequestFactory().get(reverse("passbook_flows:auth-process"))
request.user = AnonymousUser()
request.session = {}
request.session[AuthenticationView.SESSION_PENDING_USER] = self.user.pk
response = AuthenticationView.as_view()(request)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("passbook_flows:auth-denied"))
self.factor.enabled = True
self.factor.save()
def test_authenticated(self):
"""Test with already logged in user"""
self.client.force_login(self.user)
response = self.client.get(reverse("passbook_flows:auth-process"))
# Response should be 400 since no pending user is set
self.assertEqual(response.status_code, 400)
self.client.logout()
def test_unauthenticated_post(self):
"""Test post request as unauthenticated user"""
request = RequestFactory().post(
reverse("passbook_flows:auth-process"), data={"password": self.password}
)
request.user = AnonymousUser()
middleware = SessionMiddleware()
middleware.process_request(request)
request.session.save() # pylint: disable=no-member
request.session[AuthenticationView.SESSION_PENDING_USER] = self.user.pk
response = AuthenticationView.as_view()(request)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("passbook_core:overview"))
self.client.logout()
def test_unauthenticated_post_invalid(self):
"""Test post request as unauthenticated user"""
request = RequestFactory().post(
reverse("passbook_flows:auth-process"),
data={"password": self.password + "a"},
)
request.user = AnonymousUser()
middleware = SessionMiddleware()
middleware.process_request(request)
request.session.save() # pylint: disable=no-member
request.session[AuthenticationView.SESSION_PENDING_USER] = self.user.pk
response = AuthenticationView.as_view()(request)
self.assertEqual(response.status_code, 200)
self.client.logout()
def test_multifactor(self):
"""Test view with multiple active factors"""
DummyFactor.objects.get_or_create(name="dummy", slug="dummy", order=1)
request = RequestFactory().post(
reverse("passbook_flows:auth-process"), data={"password": self.password}
)
request.user = AnonymousUser()
middleware = SessionMiddleware()
middleware.process_request(request)
request.session.save() # pylint: disable=no-member
request.session[AuthenticationView.SESSION_PENDING_USER] = self.user.pk
response = AuthenticationView.as_view()(request)
session_copy = request.session.items()
self.assertEqual(response.status_code, 302)
# Verify view redirects to itself after auth
self.assertEqual(response.url, reverse("passbook_flows:auth-process"))
# Run another request with same session which should result in a logged in user
request = RequestFactory().post(reverse("passbook_flows:auth-process"))
request.user = AnonymousUser()
middleware = SessionMiddleware()
middleware.process_request(request)
for key, value in session_copy:
request.session[key] = value
request.session.save() # pylint: disable=no-member
response = AuthenticationView.as_view()(request)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("passbook_core:overview"))

View File

@ -1,23 +1,8 @@
"""flow urls"""
from django.urls import path
from passbook.flows.views import (
AuthenticationView,
FactorPermissionDeniedView,
FlowExecutorView,
)
from passbook.flows.views import FlowExecutorView
urlpatterns = [
path("auth/process/", AuthenticationView.as_view(), name="auth-process"),
path(
"auth/process/<slug:factor>/",
AuthenticationView.as_view(),
name="auth-process",
),
path(
"auth/process/denied/",
FactorPermissionDeniedView.as_view(),
name="auth-denied",
),
path("<slug:flow_slug>/", FlowExecutorView.as_view(), name="flow-executor"),
]

View File

@ -1,228 +1,24 @@
"""passbook multi-factor authentication engine"""
from typing import List, Optional, Tuple
from typing import Optional
from django.contrib.auth import login
from django.contrib.auth.mixins import UserPassesTestMixin
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect, reverse
from django.utils.http import urlencode
from django.shortcuts import get_object_or_404, redirect
from django.views.generic import View
from structlog import get_logger
from passbook.core.models import Factor, User
from passbook.core.views.utils import PermissionDeniedView
from passbook.core.models import Factor
from passbook.flows.exceptions import FlowNonApplicableError
from passbook.flows.models import Flow
from passbook.flows.planner import FlowPlan, FlowPlanner
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan, FlowPlanner
from passbook.lib.config import CONFIG
from passbook.lib.utils.reflection import class_to_path, path_to_class
from passbook.lib.utils.urls import is_url_absolute
from passbook.lib.utils.urls import is_url_absolute, redirect_with_qs
from passbook.lib.views import bad_request_message
from passbook.policies.engine import PolicyEngine
LOGGER = get_logger()
# Argument used to redirect user after login
NEXT_ARG_NAME = "next"
def _redirect_with_qs(view, get_query_set=None):
"""Wrapper to redirect whilst keeping GET Parameters"""
target = reverse(view)
if get_query_set:
target += "?" + urlencode(get_query_set.items())
return redirect(target)
class AuthenticationView(UserPassesTestMixin, View):
"""Wizard-like Multi-factor authenticator"""
SESSION_FACTOR = "passbook_factor"
SESSION_PENDING_FACTORS = "passbook_pending_factors"
SESSION_PENDING_USER = "passbook_pending_user"
SESSION_USER_BACKEND = "passbook_user_backend"
SESSION_IS_SSO_LOGIN = "passbook_sso_login"
pending_user: User
pending_factors: List[Tuple[str, str]] = []
_current_factor_class: Factor
current_factor: Factor
# Allow only not authenticated users to login
def test_func(self) -> bool:
return AuthenticationView.SESSION_PENDING_USER in self.request.session
def _check_config_domain(self) -> Optional[HttpResponse]:
"""Checks if current request's domain matches configured Domain, and
adds a warning if not."""
current_domain = self.request.get_host()
if ":" in current_domain:
current_domain, _ = current_domain.split(":")
config_domain = CONFIG.y("domain")
if current_domain != config_domain:
message = (
f"Current domain of '{current_domain}' doesn't "
f"match configured domain of '{config_domain}'."
)
LOGGER.warning(message)
return bad_request_message(self.request, message)
return None
def handle_no_permission(self) -> HttpResponse:
# Function from UserPassesTestMixin
if NEXT_ARG_NAME in self.request.GET:
return redirect(self.request.GET.get(NEXT_ARG_NAME))
if self.request.user.is_authenticated:
return _redirect_with_qs("passbook_core:overview", self.request.GET)
return _redirect_with_qs("passbook_core:auth-login", self.request.GET)
def get_pending_factors(self) -> List[Tuple[str, str]]:
"""Loading pending factors from Database or load from session variable"""
# Write pending factors to session
if AuthenticationView.SESSION_PENDING_FACTORS in self.request.session:
return self.request.session[AuthenticationView.SESSION_PENDING_FACTORS]
# Get an initial list of factors which are currently enabled
# and apply to the current user. We check policies here and block the request
_all_factors = (
Factor.objects.filter(enabled=True).order_by("order").select_subclasses()
)
pending_factors = []
for factor in _all_factors:
factor: Factor
LOGGER.debug(
"Checking if factor applies to user",
factor=factor,
user=self.pending_user,
)
policy_engine = PolicyEngine(
factor.policies.all(), self.pending_user, self.request
)
policy_engine.build()
if policy_engine.passing:
pending_factors.append((factor.uuid.hex, factor.type))
LOGGER.debug("Factor applies", factor=factor, user=self.pending_user)
return pending_factors
def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
# Check if user passes test (i.e. SESSION_PENDING_USER is set)
user_test_result = self.get_test_func()()
if not user_test_result:
incorrect_domain_message = self._check_config_domain()
if incorrect_domain_message:
return incorrect_domain_message
return self.handle_no_permission()
# Extract pending user from session (only remember uid)
self.pending_user = get_object_or_404(
User, id=self.request.session[AuthenticationView.SESSION_PENDING_USER]
)
self.pending_factors = self.get_pending_factors()
# Read and instantiate factor from session
factor_uuid, factor_class = None, None
if AuthenticationView.SESSION_FACTOR not in request.session:
# Case when no factors apply to user, return error denied
if not self.pending_factors:
# Case when user logged in from SSO provider and no more factors apply
if AuthenticationView.SESSION_IS_SSO_LOGIN in request.session:
LOGGER.debug("User authenticated with SSO, logging in...")
return self._user_passed()
return self.user_invalid()
factor_uuid, factor_class = self.pending_factors[0]
else:
factor_uuid, factor_class = request.session[
AuthenticationView.SESSION_FACTOR
]
# Lookup current factor object
self.current_factor = (
Factor.objects.filter(uuid=factor_uuid).select_subclasses().first()
)
# Instantiate Next Factor and pass request
factor = path_to_class(factor_class)
self._current_factor_class = factor(self)
self._current_factor_class.pending_user = self.pending_user
self._current_factor_class.request = request
return super().dispatch(request, *args, **kwargs)
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""pass get request to current factor"""
LOGGER.debug(
"Passing GET",
view_class=class_to_path(self._current_factor_class.__class__),
)
return self._current_factor_class.get(request, *args, **kwargs)
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""pass post request to current factor"""
LOGGER.debug(
"Passing POST",
view_class=class_to_path(self._current_factor_class.__class__),
)
return self._current_factor_class.post(request, *args, **kwargs)
def user_ok(self) -> HttpResponse:
"""Redirect to next Factor"""
LOGGER.debug(
"Factor passed",
factor_class=class_to_path(self._current_factor_class.__class__),
)
# Remove passed factor from pending factors
current_factor_tuple = (
self.current_factor.uuid.hex,
class_to_path(self._current_factor_class.__class__),
)
if current_factor_tuple in self.pending_factors:
self.pending_factors.remove(current_factor_tuple)
next_factor = None
if self.pending_factors:
next_factor = self.pending_factors.pop()
# Save updated pening_factor list to session
self.request.session[
AuthenticationView.SESSION_PENDING_FACTORS
] = self.pending_factors
self.request.session[AuthenticationView.SESSION_FACTOR] = next_factor
LOGGER.debug("Rendering Factor", next_factor=next_factor)
return _redirect_with_qs("passbook_flows:auth-process", self.request.GET)
# User passed all factors
LOGGER.debug("User passed all factors, logging in", user=self.pending_user)
return self._user_passed()
def user_invalid(self) -> HttpResponse:
"""Show error message, user cannot login.
This should only be shown if user authenticated successfully, but is disabled/locked/etc"""
LOGGER.debug("User invalid")
self.cleanup()
return _redirect_with_qs("passbook_flows:auth-denied", self.request.GET)
def _user_passed(self) -> HttpResponse:
"""User Successfully passed all factors"""
backend = self.request.session[AuthenticationView.SESSION_USER_BACKEND]
login(self.request, self.pending_user, backend=backend)
LOGGER.debug("Logged in", user=self.pending_user)
# Cleanup
self.cleanup()
next_param = self.request.GET.get(NEXT_ARG_NAME, None)
if next_param and not is_url_absolute(next_param):
return redirect(next_param)
return _redirect_with_qs("passbook_core:overview")
def cleanup(self):
"""Remove temporary data from session"""
session_keys = [
self.SESSION_FACTOR,
self.SESSION_PENDING_FACTORS,
self.SESSION_PENDING_USER,
self.SESSION_USER_BACKEND,
]
for key in session_keys:
if key in self.request.session:
del self.request.session[key]
LOGGER.debug("Cleaned up sessions")
class FactorPermissionDeniedView(PermissionDeniedView):
"""User could not be authenticated"""
SESSION_KEY_PLAN = "passbook_flows_plan"
@ -240,6 +36,32 @@ class FlowExecutorView(View):
# TODO: Do we always need this?
self.flow = get_object_or_404(Flow, slug=flow_slug)
def _check_config_domain(self) -> Optional[HttpResponse]:
"""Checks if current request's domain matches configured Domain, and
adds a warning if not."""
current_domain = self.request.get_host()
if ":" in current_domain:
current_domain, _ = current_domain.split(":")
config_domain = CONFIG.y("domain")
if current_domain != config_domain:
message = (
f"Current domain of '{current_domain}' doesn't "
f"match configured domain of '{config_domain}'."
)
LOGGER.warning(message)
return bad_request_message(self.request, message)
return None
def handle_flow_non_applicable(self) -> HttpResponse:
"""When a flow is non-applicable check if user is on the correct domain"""
if NEXT_ARG_NAME in self.request.GET:
return redirect(self.request.GET.get(NEXT_ARG_NAME))
incorrect_domain_message = self._check_config_domain()
if incorrect_domain_message:
return incorrect_domain_message
# TODO: Add message
return redirect("passbook_core:index")
def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
# Early check if theres an active Plan for the current session
if SESSION_KEY_PLAN not in self.request.session:
@ -250,7 +72,7 @@ class FlowExecutorView(View):
self.plan = self._initiate_plan()
except FlowNonApplicableError as exc:
LOGGER.warning("Flow not applicable to current user", exc=exc)
return redirect("passbook_core:index")
return self.handle_flow_non_applicable()
else:
LOGGER.debug("Continuing existing plan", flow_slug=flow_slug)
self.plan = self.request.session[SESSION_KEY_PLAN]
@ -260,7 +82,6 @@ class FlowExecutorView(View):
LOGGER.debug("Current factor", current_factor=self.current_factor)
factor_cls = path_to_class(self.current_factor.type)
self.current_factor_view = factor_cls(self)
# self.current_factor_view.pending_user = self.pending_user
self.current_factor_view.request = request
return super().dispatch(request)
@ -284,3 +105,48 @@ class FlowExecutorView(View):
plan = planner.plan(self.request)
self.request.session[SESSION_KEY_PLAN] = plan
return plan
def _flow_done(self) -> HttpResponse:
"""User Successfully passed all factors"""
backend = self.plan.context[PLAN_CONTEXT_PENDING_USER].backend
login(
self.request, self.plan.context[PLAN_CONTEXT_PENDING_USER], backend=backend
)
LOGGER.debug("Logged in", user=self.plan.context[PLAN_CONTEXT_PENDING_USER])
self.cancel()
next_param = self.request.GET.get(NEXT_ARG_NAME, None)
if next_param and not is_url_absolute(next_param):
return redirect(next_param)
return redirect_with_qs("passbook_core:overview")
def factor_ok(self) -> HttpResponse:
"""Callback called by factors upon successful completion.
Persists updated plan and context to session."""
LOGGER.debug(
"Factor ok", factor_class=class_to_path(self.current_factor_view.__class__),
)
self.request.session[SESSION_KEY_PLAN] = self.plan
if self.plan.factors:
LOGGER.debug(
"Continuing with next factor", reamining=len(self.plan.factors)
)
return redirect_with_qs(
"passbook_flows:flow-executor", self.request.GET, **self.kwargs
)
# User passed all factors
LOGGER.debug(
"User passed all factors", user=self.plan.context[PLAN_CONTEXT_PENDING_USER]
)
return self._flow_done()
def factor_invalid(self) -> HttpResponse:
"""Callback used factor when data is correct but a policy denies access
or the user account is disabled."""
LOGGER.debug("User invalid")
self.cancel()
return redirect_with_qs("passbook_flows:auth-denied", self.request.GET)
def cancel(self) -> HttpResponse:
"""Cancel current execution and return a redirect"""
del self.request.session[SESSION_KEY_PLAN]
return redirect_with_qs("passbook_flows:auth-denied", self.request.GET)

View File

@ -1,7 +1,19 @@
"""URL-related utils"""
from urllib.parse import urlparse
from django.http import HttpResponse
from django.shortcuts import redirect, reverse
from django.utils.http import urlencode
def is_url_absolute(url):
"""Check if domain is absolute to prevent user from being redirect somewhere else"""
return bool(urlparse(url).netloc)
def redirect_with_qs(view: str, get_query_set=None, **kwargs) -> HttpResponse:
"""Wrapper to redirect whilst keeping GET Parameters"""
target = reverse(view, kwargs=kwargs)
if get_query_set:
target += "?" + urlencode(get_query_set.items())
return redirect(target)

View File

@ -8,7 +8,7 @@ from jinja2.exceptions import TemplateSyntaxError, UndefinedError
from jinja2.nativetypes import NativeEnvironment
from structlog import get_logger
from passbook.flows.views import AuthenticationView
from passbook.flows.planner import PLAN_CONTEXT_SSO
from passbook.lib.utils.http import get_client_ip
from passbook.policies.types import PolicyRequest, PolicyResult
@ -54,8 +54,9 @@ class Evaluator:
kwargs["pb_is_group_member"] = Evaluator.jinja2_func_is_group_member
kwargs["pb_logger"] = get_logger()
if request.http_request:
# TODO: Get access to current plan
kwargs["pb_is_sso_flow"] = request.http_request.session.get(
AuthenticationView.SESSION_IS_SSO_LOGIN, False
PLAN_CONTEXT_SSO, False
)
kwargs["pb_client_ip"] = (
get_client_ip(request.http_request) or "255.255.255.255"

View File

@ -13,7 +13,15 @@ from django.views.generic import RedirectView, View
from structlog import get_logger
from passbook.audit.models import Event, EventAction
from passbook.flows.views import AuthenticationView, _redirect_with_qs
from passbook.factors.password.factor import PLAN_CONTEXT_AUTHENTICATION_BACKEND
from passbook.flows.models import Flow, FlowDesignation
from passbook.flows.planner import (
PLAN_CONTEXT_PENDING_USER,
PLAN_CONTEXT_SSO,
FlowPlanner,
)
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.utils.urls import redirect_with_qs
from passbook.sources.oauth.clients import get_client
from passbook.sources.oauth.models import OAuthSource, UserOAuthSourceConnection
@ -165,10 +173,17 @@ class OAuthCallback(OAuthClientMixin, View):
user = authenticate(
source=access.source, identifier=access.identifier, request=self.request
)
self.request.session[AuthenticationView.SESSION_PENDING_USER] = user.pk
self.request.session[AuthenticationView.SESSION_USER_BACKEND] = user.backend
self.request.session[AuthenticationView.SESSION_IS_SSO_LOGIN] = True
return _redirect_with_qs("passbook_flows:auth-process", self.request.GET)
# We run the Flow planner here so we can pass the Pending user in the context
flow = get_object_or_404(Flow, designation=FlowDesignation.AUTHENTICATION)
planner = FlowPlanner(flow)
plan = planner.plan(self.request)
plan.context[PLAN_CONTEXT_PENDING_USER] = user
plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend
plan.context[PLAN_CONTEXT_SSO] = True
self.request.session[SESSION_KEY_PLAN] = plan
return redirect_with_qs(
"passbook_flows:flow-executor", self.request.GET, flow_slug=flow.slug,
)
# pylint: disable=unused-argument
def handle_existing_user(self, source, user, access, info):

View File

@ -2,7 +2,7 @@
isort -rc passbook
pyright
black passbook
scripts/coverage.sh
# scripts/coverage.sh
bandit -r passbook
pylint passbook
prospector