stages/prompt: add prompt stage: dynamically created forms based on database

This commit is contained in:
Jens Langhammer 2020-05-10 16:20:17 +02:00
parent 9def45c8d7
commit 4315d1a03c
14 changed files with 303 additions and 311 deletions

View file

@ -37,6 +37,7 @@ from passbook.stages.identification.api import IdentificationStageViewSet
from passbook.stages.login.api import LoginStageViewSet
from passbook.stages.otp.api import OTPStageViewSet
from passbook.stages.password.api import PasswordStageViewSet
from passbook.stages.prompt.api import PromptStageViewSet, PromptViewSet
LOGGER = get_logger()
router = routers.DefaultRouter()
@ -83,6 +84,8 @@ router.register("stages/otp", OTPStageViewSet)
router.register("stages/password", PasswordStageViewSet)
router.register("stages/identification", IdentificationStageViewSet)
router.register("stages/login", LoginStageViewSet)
router.register("stages/prompt", PromptStageViewSet)
router.register("stages/prompt/prompts", PromptViewSet)
router.register("flows", FlowViewSet)
router.register("flows/bindings", FlowStageBindingViewSet)

View file

@ -1,65 +0,0 @@
"""passbook core authentication forms"""
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from structlog import get_logger
from passbook.core.models import User
LOGGER = get_logger()
class SignUpForm(forms.Form):
"""SignUp Form"""
title = _("Sign Up")
name = forms.CharField(
label=_("Name"), widget=forms.TextInput(attrs={"placeholder": _("Name")})
)
username = forms.CharField(
label=_("Username"),
widget=forms.TextInput(attrs={"placeholder": _("Username")}),
)
email = forms.EmailField(
label=_("E-Mail"), widget=forms.TextInput(attrs={"placeholder": _("E-Mail")})
)
password = forms.CharField(
label=_("Password"),
widget=forms.PasswordInput(attrs={"placeholder": _("Password")}),
)
password_repeat = forms.CharField(
label=_("Repeat Password"),
widget=forms.PasswordInput(attrs={"placeholder": _("Repeat Password")}),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# All fields which have initial data supplied are set to read only
if "initial" in kwargs:
for field in kwargs.get("initial").keys():
self.fields[field].widget.attrs["readonly"] = "readonly"
def clean_username(self):
"""Check if username is used already"""
username = self.cleaned_data.get("username")
if User.objects.filter(username=username).exists():
LOGGER.warning("username already exists", username=username)
raise ValidationError(_("Username already exists"))
return username
def clean_email(self):
"""Check if email is already used in django or other auth sources"""
email = self.cleaned_data.get("email")
# Check if user exists already, error early
if User.objects.filter(email=email).exists():
LOGGER.debug("email already exists", email=email)
raise ValidationError(_("Email already exists"))
return email
def clean_password_repeat(self):
"""Check if Password adheres to filter and if passwords matche"""
password = self.cleaned_data.get("password")
password_repeat = self.cleaned_data.get("password_repeat")
if password != password_repeat:
raise ValidationError(_("Passwords don't match"))
return self.cleaned_data.get("password_repeat")

View file

@ -5,7 +5,6 @@ from random import SystemRandom
from django.test import TestCase
from django.urls import reverse
from passbook.core.forms.authentication import SignUpForm
from passbook.core.models import User
@ -33,12 +32,6 @@ class TestAuthenticationViews(TestCase):
),
)
def test_sign_up_view(self):
"""Test account.sign_up view (Anonymous)"""
self.client.logout()
response = self.client.get(reverse("passbook_core:auth-sign-up"))
self.assertEqual(response.status_code, 200)
def test_logout_view(self):
"""Test account.logout view"""
self.client.force_login(self.user)
@ -50,81 +43,3 @@ class TestAuthenticationViews(TestCase):
self.client.force_login(self.user)
response = self.client.get(reverse("passbook_core:auth-logout"))
self.assertEqual(response.status_code, 302)
def test_sign_up_view_post(self):
"""Test account.sign_up view POST (Anonymous)"""
form = SignUpForm(self.sign_up_data)
self.assertTrue(form.is_valid())
response = self.client.post(
reverse("passbook_core:auth-sign-up"), data=form.cleaned_data
)
self.assertEqual(response.status_code, 302)
# def test_reset_password_init_view(self):
# """Test account.reset_password_init view POST (Anonymous)"""
# form = SignUpForm(self.sign_up_data)
# self.assertTrue(form.is_valid())
# res = test_request(accounts.SignUpView.as_view(),
# method='POST',
# req_kwargs=form.cleaned_data)
# self.assertEqual(res.status_code, 302)
# res = test_request(accounts.PasswordResetInitView.as_view())
# self.assertEqual(res.status_code, 200)
# def test_resend_confirmation(self):
# """Test AccountController.resend_confirmation"""
# form = SignUpForm(self.sign_up_data)
# self.assertTrue(form.is_valid())
# res = test_request(accounts.SignUpView.as_view(),
# method='POST',
# req_kwargs=form.cleaned_data)
# self.assertEqual(res.status_code, 302)
# user = User.objects.get(email=self.sign_up_data['email'])
# # Invalidate all other links for this user
# old_acs = AccountConfirmation.objects.filter(
# user=user)
# for old_ac in old_acs:
# old_ac.confirmed = True
# old_ac.save()
# # Create Account Confirmation UUID
# new_ac = AccountConfirmation.objects.create(user=user)
# self.assertFalse(new_ac.is_expired)
# on_user_confirm_resend.send(
# sender=None,
# user=user,
# request=None)
# def test_reset_passowrd(self):
# """Test reset password POST"""
# # Signup user first
# sign_up_form = SignUpForm(self.sign_up_data)
# self.assertTrue(sign_up_form.is_valid())
# sign_up_res = test_request(accounts.SignUpView.as_view(),
# method='POST',
# req_kwargs=sign_up_form.cleaned_data)
# self.assertEqual(sign_up_res.status_code, 302)
# user = User.objects.get(email=self.sign_up_data['email'])
# # Invalidate all other links for this user
# old_acs = AccountConfirmation.objects.filter(
# user=user)
# for old_ac in old_acs:
# old_ac.confirmed = True
# old_ac.save()
# # Create Account Confirmation UUID
# new_ac = AccountConfirmation.objects.create(user=user)
# self.assertFalse(new_ac.is_expired)
# uuid = AccountConfirmation.objects.filter(user=user).first().pk
# reset_res = test_request(accounts.PasswordResetFinishView.as_view(),
# method='POST',
# user=user,
# url_kwargs={'uuid': uuid},
# req_kwargs=self.change_data)
# self.assertEqual(reset_res.status_code, 302)
# self.assertEqual(reset_res.url, reverse('common-index'))

View file

@ -13,14 +13,13 @@ urlpatterns = [
name="auth-login",
),
path("auth/logout/", authentication.LogoutView.as_view(), name="auth-logout"),
path("auth/sign_up/", authentication.SignUpView.as_view(), name="auth-sign-up"),
path(
"auth/sign_up/<uuid:nonce>/confirm/",
authentication.SignUpConfirmView.as_view(),
name="auth-sign-up-confirm",
"auth/sign_up/",
ToDefaultFlow.as_view(designation=FlowDesignation.ENROLLMENT),
name="auth-sign-up",
),
path(
"auth/password/reset/<uuid:nonce>/",
"auth/password/reset/<uuid:nonce_uuid>/",
authentication.PasswordResetView.as_view(),
name="auth-password-reset",
),

View file

@ -1,22 +1,14 @@
"""passbook core authentication views"""
from typing import Dict
from django.contrib import messages
from django.contrib.auth import login, logout
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.forms.utils import ErrorList
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect, reverse
from django.utils.translation import ugettext as _
from django.views import View
from django.views.generic import FormView
from structlog import get_logger
from passbook.core.forms.authentication import SignUpForm
from passbook.core.models import Invitation, Nonce, User
from passbook.core.signals import invitation_used, user_signed_up
from passbook.lib.config import CONFIG
from passbook.stages.password.exceptions import PasswordPolicyInvalid
from passbook.core.models import Nonce
LOGGER = get_logger()
@ -24,146 +16,20 @@ LOGGER = get_logger()
class LogoutView(LoginRequiredMixin, View):
"""Log current user out"""
def dispatch(self, request):
def dispatch(self, request: HttpRequest) -> HttpResponse:
"""Log current user out"""
logout(request)
messages.success(request, _("You've successfully been logged out."))
return redirect(reverse("passbook_core:auth-login"))
class SignUpView(UserPassesTestMixin, FormView):
"""Sign up new user, optionally consume one-use invitation link."""
template_name = "login/form.html"
form_class = SignUpForm
success_url = "."
# Invitation instance, if invitation link was used
_invitation = None
# Instance of newly created user
_user = None
# Allow only not authenticated users to login
def test_func(self):
return self.request.user.is_authenticated is False
def handle_no_permission(self):
return redirect(reverse("passbook_core:overview"))
def dispatch(self, request, *args, **kwargs):
"""Check if sign-up is enabled or invitation link given"""
allowed = False
if "invitation" in request.GET:
invitations = Invitation.objects.filter(uuid=request.GET.get("invitation"))
allowed = invitations.exists()
if allowed:
self._invitation = invitations.first()
if CONFIG.y("passbook.sign_up.enabled"):
allowed = True
if not allowed:
messages.error(request, _("Sign-ups are currently disabled."))
return redirect(reverse("passbook_core:auth-login"))
return super().dispatch(request, *args, **kwargs)
def get_initial(self):
if self._invitation:
initial = {}
if self._invitation.fixed_username:
initial["username"] = self._invitation.fixed_username
if self._invitation.fixed_email:
initial["email"] = self._invitation.fixed_email
return initial
return super().get_initial()
def get_context_data(self, **kwargs):
kwargs["config"] = CONFIG.y("passbook")
kwargs["title"] = _("Sign Up")
kwargs["primary_action"] = _("Sign up")
return super().get_context_data(**kwargs)
def form_valid(self, form: SignUpForm) -> HttpResponse:
"""Create user"""
try:
self._user = SignUpView.create_user(form.cleaned_data, self.request)
except PasswordPolicyInvalid as exc:
# Manually inject error into form
# pylint: disable=protected-access
errors = form._errors.setdefault("password", ErrorList())
for error in exc.messages:
errors.append(error)
return self.form_invalid(form)
self.consume_invitation()
messages.success(self.request, _("Successfully signed up!"))
LOGGER.debug("Successfully signed up", email=form.cleaned_data.get("email"))
return redirect(reverse("passbook_core:auth-login"))
def consume_invitation(self):
"""Consume invitation if an invitation was used"""
if self._invitation:
invitation_used.send(
sender=self,
request=self.request,
invitation=self._invitation,
user=self._user,
)
self._invitation.delete()
@staticmethod
def create_user(data: Dict, request: HttpRequest = None) -> User:
"""Create user from data
Args:
data: Dictionary as returned by SignUpForm's cleaned_data
request: Optional current request.
Returns:
The user created
Raises:
PasswordPolicyInvalid: if any policy are not fulfilled.
This also deletes the created user.
"""
# Create user
new_user = User.objects.create(
username=data.get("username"),
email=data.get("email"),
name=data.get("name"),
)
new_user.is_active = True
try:
new_user.set_password(data.get("password"))
new_user.save()
request.user = new_user
# Send signal for other auth sources
user_signed_up.send(sender=SignUpView, user=new_user, request=request)
return new_user
except PasswordPolicyInvalid as exc:
new_user.delete()
raise exc
class SignUpConfirmView(View):
"""Confirm registration from Nonce"""
def get(self, request, nonce):
"""Verify UUID and activate user"""
nonce = get_object_or_404(Nonce, uuid=nonce)
nonce.user.is_active = True
nonce.user.save()
# Workaround: hardcoded reference to ModelBackend, needs testing
nonce.user.backend = "django.contrib.auth.backends.ModelBackend"
login(request, nonce.user)
nonce.delete()
messages.success(request, _("Successfully confirmed registration."))
return redirect("passbook_core:overview")
class PasswordResetView(View):
"""Temporarily authenticate User and allow them to reset their password"""
def get(self, request, nonce):
def get(self, request: HttpRequest, nonce_uuid: str) -> HttpResponse:
"""Authenticate user with nonce and redirect to password change view"""
# 3. (Optional) Trap user in password change view
nonce = get_object_or_404(Nonce, uuid=nonce)
nonce = get_object_or_404(Nonce, uuid=nonce_uuid)
# Workaround: hardcoded reference to ModelBackend, needs testing
nonce.user.backend = "django.contrib.auth.backends.ModelBackend"
login(request, nonce.user)

View file

@ -79,37 +79,38 @@ INSTALLED_APPS = [
"drf_yasg",
"guardian",
"django_prometheus",
"passbook.static.apps.PassbookStaticConfig",
"passbook.admin.apps.PassbookAdminConfig",
"passbook.api.apps.PassbookAPIConfig",
"passbook.lib.apps.PassbookLibConfig",
"passbook.flows.apps.PassbookFlowsConfig",
"passbook.policies.apps.PassbookPoliciesConfig",
"passbook.audit.apps.PassbookAuditConfig",
"passbook.crypto.apps.PassbookCryptoConfig",
"passbook.recovery.apps.PassbookRecoveryConfig",
"passbook.sources.saml.apps.PassbookSourceSAMLConfig",
"passbook.sources.ldap.apps.PassbookSourceLDAPConfig",
"passbook.sources.oauth.apps.PassbookSourceOAuthConfig",
"passbook.flows.apps.PassbookFlowsConfig",
"passbook.lib.apps.PassbookLibConfig",
"passbook.policies.apps.PassbookPoliciesConfig",
"passbook.policies.dummy.apps.PassbookPolicyDummyConfig",
"passbook.policies.expiry.apps.PassbookPolicyExpiryConfig",
"passbook.policies.expression.apps.PassbookPolicyExpressionConfig",
"passbook.policies.hibp.apps.PassbookPolicyHIBPConfig",
"passbook.policies.password.apps.PassbookPoliciesPasswordConfig",
"passbook.policies.reputation.apps.PassbookPolicyReputationConfig",
"passbook.policies.webhook.apps.PassbookPoliciesWebhookConfig",
"passbook.providers.app_gw.apps.PassbookApplicationApplicationGatewayConfig",
"passbook.providers.oauth.apps.PassbookProviderOAuthConfig",
"passbook.providers.oidc.apps.PassbookProviderOIDCConfig",
"passbook.providers.saml.apps.PassbookProviderSAMLConfig",
"passbook.providers.samlv2.apps.PassbookProviderSAMLv2Config",
"passbook.recovery.apps.PassbookRecoveryConfig",
"passbook.sources.ldap.apps.PassbookSourceLDAPConfig",
"passbook.sources.oauth.apps.PassbookSourceOAuthConfig",
"passbook.sources.saml.apps.PassbookSourceSAMLConfig",
"passbook.stages.captcha.apps.PassbookStageCaptchaConfig",
"passbook.stages.dummy.apps.PassbookStageDummyConfig",
"passbook.stages.login.apps.PassbookStageLoginConfig",
"passbook.stages.email.apps.PassbookStageEmailConfig",
"passbook.stages.prompt.apps.PassbookStagPromptConfig",
"passbook.stages.identification.apps.PassbookStageIdentificationConfig",
"passbook.stages.otp.apps.PassbookStageOTPConfig",
"passbook.stages.captcha.apps.PassbookStageCaptchaConfig",
"passbook.stages.password.apps.PassbookStagePasswordConfig",
"passbook.stages.email.apps.PassbookStageEmailConfig",
"passbook.policies.dummy.apps.PassbookPolicyDummyConfig",
"passbook.policies.expiry.apps.PassbookPolicyExpiryConfig",
"passbook.policies.reputation.apps.PassbookPolicyReputationConfig",
"passbook.policies.hibp.apps.PassbookPolicyHIBPConfig",
"passbook.policies.password.apps.PassbookPoliciesPasswordConfig",
"passbook.policies.webhook.apps.PassbookPoliciesWebhookConfig",
"passbook.policies.expression.apps.PassbookPolicyExpressionConfig",
"passbook.static.apps.PassbookStaticConfig",
]
GUARDIAN_MONKEY_PATCH = False

View file

View file

@ -0,0 +1,48 @@
"""Prompt Stage API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.stages.prompt.models import Prompt, PromptStage
class PromptStageSerializer(ModelSerializer):
"""PromptStage Serializer"""
class Meta:
model = PromptStage
fields = [
"pk",
"name",
"fields",
]
class PromptStageViewSet(ModelViewSet):
"""PromptStage Viewset"""
queryset = PromptStage.objects.all()
serializer_class = PromptStageSerializer
class PromptSerializer(ModelSerializer):
"""Prompt Serializer"""
class Meta:
model = Prompt
fields = [
"pk",
"field_key",
"label",
"type",
"required",
"placeholder",
]
class PromptViewSet(ModelViewSet):
"""Prompt Viewset"""
queryset = Prompt.objects.all()
serializer_class = PromptSerializer

View file

@ -0,0 +1,10 @@
"""passbook prompt stage app config"""
from django.apps import AppConfig
class PassbookStagPromptConfig(AppConfig):
"""passbook prompt stage config"""
name = "passbook.stages.prompt"
label = "passbook_stages_prompt"
verbose_name = "passbook Stages.Prompt"

View file

@ -0,0 +1,29 @@
"""Prompt forms"""
from django import forms
from passbook.stages.prompt.models import Prompt, PromptStage
class PromptStageForm(forms.ModelForm):
"""Form to create/edit Prompt Stage instances"""
class Meta:
model = PromptStage
fields = ["name", "fields"]
widgets = {
"name": forms.TextInput(),
}
class PromptForm(forms.Form):
"""Dynamically created form based on PromptStage"""
stage: PromptStage
def __init__(self, stage: PromptStage, *args, **kwargs):
self.stage = stage
super().__init__(*args, **kwargs)
for field in self.stage.fields.all():
field: Prompt
self.fields[field.field_key] = field.field

View file

@ -0,0 +1,76 @@
# Generated by Django 3.0.5 on 2020-05-10 14:03
import uuid
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("passbook_flows", "0003_auto_20200509_1258"),
]
operations = [
migrations.CreateModel(
name="Prompt",
fields=[
(
"uuid",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"field_key",
models.SlugField(
help_text="Name of the form field, also used to store the value"
),
),
("label", models.TextField()),
(
"type",
models.CharField(
choices=[
("text", "Text"),
("e-mail", "Email"),
("password", "Password"),
("number", "Number"),
],
max_length=100,
),
),
("required", models.BooleanField(default=True)),
("placeholder", models.TextField()),
],
options={"abstract": False,},
),
migrations.CreateModel(
name="PromptStage",
fields=[
(
"stage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="passbook_flows.Stage",
),
),
("fields", models.ManyToManyField(to="passbook_stages_prompt.Prompt")),
],
options={
"verbose_name": "Prompt Stage",
"verbose_name_plural": "Prompt Stages",
},
bases=("passbook_flows.stage",),
),
]

View file

@ -0,0 +1,75 @@
"""prompt models"""
from django import forms
from django.db import models
from django.utils.translation import gettext_lazy as _
from passbook.flows.models import Stage
from passbook.lib.models import UUIDModel
class FieldTypes(models.TextChoices):
"""Field types an Prompt can be"""
TEXT = "text"
EMAIL = "e-mail"
PASSWORD = "password" # noqa # nosec
NUMBER = "number"
class Prompt(UUIDModel):
"""Single Prompt, part of a prompt stage."""
field_key = models.SlugField(
help_text=_("Name of the form field, also used to store the value")
)
label = models.TextField()
type = models.CharField(max_length=100, choices=FieldTypes.choices)
required = models.BooleanField(default=True)
placeholder = models.TextField()
@property
def field(self):
"""Return instantiated form input field"""
attrs = {"placeholder": _(self.placeholder)}
if self.type == FieldTypes.TEXT:
return forms.CharField(
label=_(self.label),
widget=forms.TextInput(attrs=attrs),
required=self.required,
)
if self.type == FieldTypes.EMAIL:
return forms.EmailField(
label=_(self.label),
widget=forms.TextInput(attrs=attrs),
required=self.required,
)
if self.type == FieldTypes.PASSWORD:
return forms.CharField(
label=_(self.label),
widget=forms.PasswordInput(attrs=attrs),
required=self.required,
)
if self.type == FieldTypes.NUMBER:
return forms.IntegerField(
label=_(self.label),
widget=forms.NumberInput(attrs=attrs),
requred=self.required,
)
raise ValueError
class PromptStage(Stage):
"""Prompt Stage, pointing to multiple prompts"""
fields = models.ManyToManyField(Prompt)
type = "passbook.stages.prompt.stage.PromptStageView"
form = "passbook.stages.prompt.forms.PromptStageForm"
def __str__(self):
return f"Prompt Stage {self.name}"
class Meta:
verbose_name = _("Prompt Stage")
verbose_name_plural = _("Prompt Stages")

View file

@ -0,0 +1,35 @@
"""Enrollment Stage Logic"""
from django.http import HttpResponse
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView
from structlog import get_logger
from passbook.flows.stage import AuthenticationStage
from passbook.stages.prompt.forms import PromptForm
LOGGER = get_logger()
PLAN_CONTEXT_PROMPT = "prompt_data"
class EnrollmentStageView(FormView, AuthenticationStage):
"""Enrollment Stage, save form data in plan context."""
template_name = "login/form.html"
form_class = PromptForm
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx["title"] = _(self.executor.current_stage.name)
return ctx
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["stage"] = self.executor.current_stage
return kwargs
def form_valid(self, form: PromptForm) -> HttpResponse:
"""Form data is valid"""
if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
self.executor.plan.context[PLAN_CONTEXT_PROMPT] = {}
self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(form.cleaned_data)
return self.executor.stage_ok()