stages/identification: migrate from core to separate stage

This commit is contained in:
Jens Langhammer 2020-05-09 21:31:29 +02:00
parent 131c3fdb32
commit 0aad0604d8
16 changed files with 285 additions and 139 deletions

View File

@ -1,38 +1,14 @@
"""passbook core authentication forms""" """passbook core authentication forms"""
from django import forms from django import forms
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from structlog import get_logger from structlog import get_logger
from passbook.core.models import User from passbook.core.models import User
from passbook.lib.config import CONFIG
from passbook.lib.utils.ui import human_list
LOGGER = get_logger() LOGGER = get_logger()
class LoginForm(forms.Form):
"""Allow users to login"""
title = _("Log in to your account")
uid_field = forms.CharField(label=_(""))
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if CONFIG.y("passbook.uid_fields") == ["e-mail"]:
self.fields["uid_field"] = forms.EmailField()
self.fields["uid_field"].label = human_list(
[x.title() for x in CONFIG.y("passbook.uid_fields")]
)
def clean_uid_field(self):
"""Validate uid_field after EmailValidator if 'email' is the only selected uid_fields"""
if CONFIG.y("passbook.uid_fields") == ["email"]:
validate_email(self.cleaned_data.get("uid_field"))
return self.cleaned_data.get("uid_field")
class SignUpForm(forms.Form): class SignUpForm(forms.Form):
"""SignUp Form""" """SignUp Form"""

View File

@ -5,9 +5,8 @@ from random import SystemRandom
from django.test import TestCase from django.test import TestCase
from django.urls import reverse from django.urls import reverse
from passbook.core.forms.authentication import LoginForm, SignUpForm from passbook.core.forms.authentication import SignUpForm
from passbook.core.models import User from passbook.core.models import User
from passbook.flows.models import Flow, FlowDesignation
class TestAuthenticationViews(TestCase): class TestAuthenticationViews(TestCase):
@ -40,20 +39,6 @@ class TestAuthenticationViews(TestCase):
response = self.client.get(reverse("passbook_core:auth-sign-up")) response = self.client.get(reverse("passbook_core:auth-sign-up"))
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
def test_login_view(self):
"""Test account.login view (Anonymous)"""
self.client.logout()
response = self.client.get(reverse("passbook_core:auth-login"))
self.assertEqual(response.status_code, 200)
# test login with post
form = LoginForm(self.login_data)
self.assertTrue(form.is_valid())
response = self.client.post(
reverse("passbook_core:auth-login"), data=form.cleaned_data
)
self.assertEqual(response.status_code, 302)
def test_logout_view(self): def test_logout_view(self):
"""Test account.logout view""" """Test account.logout view"""
self.client.force_login(self.user) self.client.force_login(self.user)
@ -66,24 +51,6 @@ class TestAuthenticationViews(TestCase):
response = self.client.get(reverse("passbook_core:auth-logout")) response = self.client.get(reverse("passbook_core:auth-logout"))
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
def test_login_view_auth(self):
"""Test account.login view (Authenticated)"""
self.client.force_login(self.user)
response = self.client.get(reverse("passbook_core:auth-login"))
self.assertEqual(response.status_code, 302)
def test_login_view_post(self):
"""Test account.login view POST (Anonymous)"""
login_response = self.client.post(
reverse("passbook_core:auth-login"), data=self.login_data
)
self.assertEqual(login_response.status_code, 302)
flow = Flow.objects.get(designation=FlowDesignation.AUTHENTICATION)
expected = reverse(
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
)
self.assertEqual(login_response.url, expected)
def test_sign_up_view_post(self): def test_sign_up_view_post(self):
"""Test account.sign_up view POST (Anonymous)""" """Test account.sign_up view POST (Anonymous)"""
form = SignUpForm(self.sign_up_data) form = SignUpForm(self.sign_up_data)

View File

@ -2,10 +2,16 @@
from django.urls import path from django.urls import path
from passbook.core.views import authentication, overview, user from passbook.core.views import authentication, overview, user
from passbook.flows.models import FlowDesignation
from passbook.flows.views import ToDefaultFlow
urlpatterns = [ urlpatterns = [
# Authentication views # Authentication views
path("auth/login/", authentication.LoginView.as_view(), name="auth-login"), path(
"auth/login/",
ToDefaultFlow.as_view(designation=FlowDesignation.AUTHENTICATION),
name="auth-login",
),
path("auth/logout/", authentication.LogoutView.as_view(), name="auth-logout"), path("auth/logout/", authentication.LogoutView.as_view(), name="auth-logout"),
path("auth/sign_up/", authentication.SignUpView.as_view(), name="auth-sign-up"), path("auth/sign_up/", authentication.SignUpView.as_view(), name="auth-sign-up"),
path( path(

View File

@ -1,5 +1,5 @@
"""passbook core authentication views""" """passbook core authentication views"""
from typing import Dict, Optional from typing import Dict
from django.contrib import messages from django.contrib import messages
from django.contrib.auth import login, logout from django.contrib.auth import login, logout
@ -12,87 +12,15 @@ from django.views import View
from django.views.generic import FormView from django.views.generic import FormView
from structlog import get_logger from structlog import get_logger
from passbook.core.forms.authentication import LoginForm, SignUpForm from passbook.core.forms.authentication import SignUpForm
from passbook.core.models import Invitation, Nonce, Source, User from passbook.core.models import Invitation, Nonce, User
from passbook.core.signals import invitation_used, user_signed_up from passbook.core.signals import invitation_used, user_signed_up
from passbook.flows.models import Flow, FlowDesignation
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.config import CONFIG from passbook.lib.config import CONFIG
from passbook.lib.utils.urls import redirect_with_qs
from passbook.stages.password.exceptions import PasswordPolicyInvalid from passbook.stages.password.exceptions import PasswordPolicyInvalid
LOGGER = get_logger() LOGGER = get_logger()
class LoginView(UserPassesTestMixin, FormView):
"""Allow users to sign in"""
template_name = "login/form.html"
form_class = LoginForm
success_url = "."
# Allow only not authenticated users to login
def test_func(self):
return self.request.user.is_authenticated is False
def handle_no_permission(self):
if "next" in self.request.GET:
return redirect(self.request.GET.get("next"))
return redirect(reverse("passbook_core:overview"))
def get_context_data(self, **kwargs):
kwargs["config"] = CONFIG.y("passbook")
kwargs["title"] = _("Log in to your account")
kwargs["primary_action"] = _("Log in")
kwargs["show_sign_up_notice"] = CONFIG.y("passbook.sign_up.enabled")
kwargs["sources"] = []
sources = (
Source.objects.filter(enabled=True).order_by("name").select_subclasses()
)
for source in sources:
ui_login_button = source.ui_login_button
if ui_login_button:
kwargs["sources"].append(ui_login_button)
return super().get_context_data(**kwargs)
def get_user(self, uid_value) -> Optional[User]:
"""Find user instance. Returns None if no user was found."""
for search_field in CONFIG.y("passbook.uid_fields"):
# Workaround for E-Mail -> email
if search_field == "e-mail":
search_field = "email"
users = User.objects.filter(**{search_field: uid_value})
if users.exists():
LOGGER.debug("Found user", user=users.first(), uid_field=search_field)
return users.first()
return None
def form_valid(self, form: LoginForm) -> HttpResponse:
"""Form data is valid"""
pre_user = self.get_user(form.cleaned_data.get("uid_field"))
if not pre_user:
# No user found
return self.invalid_login(self.request)
# We run the Flow planner here so we can pass the Pending user in the context
flow = get_object_or_404(Flow, designation=FlowDesignation.AUTHENTICATION)
planner = FlowPlanner(flow)
plan = planner.plan(self.request)
plan.context[PLAN_CONTEXT_PENDING_USER] = pre_user
self.request.session[SESSION_KEY_PLAN] = plan
return redirect_with_qs(
"passbook_flows:flow-executor", self.request.GET, flow_slug=flow.slug,
)
def invalid_login(
self, request: HttpRequest, disabled_user: User = None
) -> HttpResponse:
"""Handle login for disabled users/invalid login attempts"""
LOGGER.debug("invalid_login", user=disabled_user)
messages.error(request, _("Failed to authenticate."))
return self.render_to_response(self.get_context_data())
class LogoutView(LoginRequiredMixin, View): class LogoutView(LoginRequiredMixin, View):
"""Log current user out""" """Log current user out"""

View File

@ -5,23 +5,35 @@ from django.db import migrations
from django.db.backends.base.schema import BaseDatabaseSchemaEditor from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from passbook.flows.models import FlowDesignation from passbook.flows.models import FlowDesignation
from passbook.stages.identification.models import Templates, UserFields
def create_default_flow(apps: Apps, schema_editor: BaseDatabaseSchemaEditor): def create_default_flow(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
Flow = apps.get_model("passbook_flows", "Flow") Flow = apps.get_model("passbook_flows", "Flow")
FlowStageBinding = apps.get_model("passbook_flows", "FlowStageBinding") FlowStageBinding = apps.get_model("passbook_flows", "FlowStageBinding")
PasswordStage = apps.get_model("passbook_stages_password", "PasswordStage") PasswordStage = apps.get_model("passbook_stages_password", "PasswordStage")
IdentificationStage = apps.get_model(
"passbook_stages_identification", "IdentificationStage"
)
db_alias = schema_editor.connection.alias db_alias = schema_editor.connection.alias
if Flow.objects.using(db_alias).all().exists(): if Flow.objects.using(db_alias).all().exists():
# Only create default flow when none exist # Only create default flow when none exist
return return
if not IdentificationStage.objects.using(db_alias).exists():
IdentificationStage.objects.using(db_alias).create(
name="identification",
user_fields=[UserFields.E_MAIL],
template=Templates.DEFAULT_LOGIN,
)
if not PasswordStage.objects.using(db_alias).exists(): if not PasswordStage.objects.using(db_alias).exists():
PasswordStage.objects.using(db_alias).create( PasswordStage.objects.using(db_alias).create(
name="password", backends=["django.contrib.auth.backends.ModelBackend"], name="password", backends=["django.contrib.auth.backends.ModelBackend"],
) )
ident_stage = IdentificationStage.objects.using(db_alias).first()
pw_stage = PasswordStage.objects.using(db_alias).first() pw_stage = PasswordStage.objects.using(db_alias).first()
flow = Flow.objects.using(db_alias).create( flow = Flow.objects.using(db_alias).create(
name="default-authentication-flow", name="default-authentication-flow",
@ -29,7 +41,10 @@ def create_default_flow(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
designation=FlowDesignation.AUTHENTICATION, designation=FlowDesignation.AUTHENTICATION,
) )
FlowStageBinding.objects.using(db_alias).create( FlowStageBinding.objects.using(db_alias).create(
flow=flow, stage=pw_stage, order=0, flow=flow, stage=ident_stage, order=0,
)
FlowStageBinding.objects.using(db_alias).create(
flow=flow, stage=pw_stage, order=1,
) )
@ -38,6 +53,7 @@ class Migration(migrations.Migration):
dependencies = [ dependencies = [
("passbook_flows", "0001_initial"), ("passbook_flows", "0001_initial"),
("passbook_stages_password", "0001_initial"), ("passbook_stages_password", "0001_initial"),
("passbook_stages_identification", "0001_initial"),
] ]
operations = [migrations.RunPython(create_default_flow)] operations = [migrations.RunPython(create_default_flow)]

View File

@ -1,6 +1,5 @@
"""Flow models""" """Flow models"""
from enum import Enum from typing import Optional
from typing import Optional, Tuple
from django.db import models from django.db import models
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _

View File

@ -18,7 +18,9 @@ def _get_client_ip_from_meta(meta: Dict[str, Any]) -> Optional[str]:
return None return None
def get_client_ip(request: HttpRequest) -> Optional[str]: def get_client_ip(request: Optional[HttpRequest]) -> Optional[str]:
"""Attempt to get the client's IP by checking common HTTP Headers. """Attempt to get the client's IP by checking common HTTP Headers.
Returns none if no IP Could be found""" Returns none if no IP Could be found"""
if request:
return _get_client_ip_from_meta(request.META) return _get_client_ip_from_meta(request.META)
return ""

View File

@ -0,0 +1,26 @@
"""Identification Stage API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.stages.identification.models import IdentificationStage
class IdentificationStageSerializer(ModelSerializer):
"""IdentificationStage Serializer"""
class Meta:
model = IdentificationStage
fields = [
"pk",
"name",
"user_fields",
"template",
]
class IdentificationStageViewSet(ModelViewSet):
"""IdentificationStage Viewset"""
queryset = IdentificationStage.objects.all()
serializer_class = IdentificationStageSerializer

View File

@ -0,0 +1,10 @@
"""passbook identification stage app config"""
from django.apps import AppConfig
class PassbookStageIdentificationConfig(AppConfig):
"""passbook identification stage config"""
name = "passbook.stages.identification"
label = "passbook_stages_identification"
verbose_name = "passbook Stages.Identification"

View File

@ -0,0 +1,45 @@
"""passbook flows identification forms"""
from django import forms
from django.core.validators import validate_email
from django.utils.translation import gettext_lazy as _
from structlog import get_logger
from passbook.lib.config import CONFIG
from passbook.lib.utils.ui import human_list
from passbook.stages.identification.models import IdentificationStage
LOGGER = get_logger()
class IdentificationStageForm(forms.ModelForm):
"""Form to create/edit IdentificationStage instances"""
class Meta:
model = IdentificationStage
fields = ["name", "user_fields", "template"]
widgets = {
"name": forms.TextInput(),
}
class IdentificationForm(forms.Form):
"""Allow users to login"""
title = _("Log in to your account")
uid_field = forms.CharField(label=_(""))
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO: Get UID Fields from stage config
if CONFIG.y("passbook.uid_fields") == ["e-mail"]:
self.fields["uid_field"] = forms.EmailField()
self.fields["uid_field"].label = human_list(
[x.title() for x in CONFIG.y("passbook.uid_fields")]
)
def clean_uid_field(self):
"""Validate uid_field after EmailValidator if 'email' is the only selected uid_fields"""
if CONFIG.y("passbook.uid_fields") == ["email"]:
validate_email(self.cleaned_data.get("uid_field"))
return self.cleaned_data.get("uid_field")

View File

@ -0,0 +1,50 @@
# Generated by Django 3.0.3 on 2020-05-09 18:34
import django.contrib.postgres.fields
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("passbook_flows", "0001_initial"),
]
operations = [
migrations.CreateModel(
name="IdentificationStage",
fields=[
(
"stage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="passbook_flows.Stage",
),
),
(
"user_fields",
django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(
choices=[("e-mail", "E Mail"), ("username", "Username")],
max_length=100,
),
help_text="Fields of the user object to match against.",
size=None,
),
),
("template", models.TextField()),
],
options={
"verbose_name": "Identification Stage",
"verbose_name_plural": "Identification Stages",
},
bases=("passbook_flows.stage",),
),
]

View File

@ -0,0 +1,18 @@
# Generated by Django 3.0.3 on 2020-05-09 19:16
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("passbook_stages_identification", "0001_initial"),
]
operations = [
migrations.AlterField(
model_name="identificationstage",
name="template",
field=models.TextField(choices=[("login/form.html", "Default Login")]),
),
]

View File

@ -0,0 +1,40 @@
"""identification stage models"""
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.utils.translation import gettext_lazy as _
from passbook.flows.models import Stage
class UserFields(models.TextChoices):
"""Fields which the user can identifiy themselves with"""
E_MAIL = "email"
USERNAME = "username"
class Templates(models.TextChoices):
"""Templates to be used for the stage"""
DEFAULT_LOGIN = "login/form.html"
class IdentificationStage(Stage):
"""Identification stage, allows a user to identify themselves to authenticate."""
user_fields = ArrayField(
models.CharField(max_length=100, choices=UserFields.choices),
help_text=_("Fields of the user object to match against."),
)
template = models.TextField(choices=Templates.choices)
type = "passbook.stages.identification.stage.IdentificationStageView"
form = "passbook.stages.identification.forms.IdentificationStageForm"
def __str__(self):
return f"Identification Stage {self.name}"
class Meta:
verbose_name = _("Identification Stage")
verbose_name_plural = _("Identification Stages")

View File

@ -0,0 +1,63 @@
"""Identification stage logic"""
from typing import List, Optional
from django.contrib import messages
from django.http import HttpResponse
from django.utils.translation import gettext as _
from django.views.generic import FormView
from structlog import get_logger
from passbook.core.models import Source, User
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.flows.stage import AuthenticationStage
from passbook.lib.config import CONFIG
from passbook.stages.identification.forms import IdentificationForm
from passbook.stages.identification.models import IdentificationStage
LOGGER = get_logger()
class IdentificationStageView(FormView, AuthenticationStage):
"""Form to identify the user"""
template_name = "login/form.html"
form_class = IdentificationForm
def get_template_names(self) -> List[str]:
current_stage: IdentificationStage = self.executor.current_stage
return [current_stage.template]
def get_context_data(self, **kwargs):
kwargs["config"] = CONFIG.y("passbook")
kwargs["title"] = _("Log in to your account")
kwargs["primary_action"] = _("Log in")
kwargs["show_sign_up_notice"] = CONFIG.y("passbook.sign_up.enabled")
kwargs["sources"] = []
sources = (
Source.objects.filter(enabled=True).order_by("name").select_subclasses()
)
for source in sources:
ui_login_button = source.ui_login_button
if ui_login_button:
kwargs["sources"].append(ui_login_button)
return super().get_context_data(**kwargs)
def get_user(self, uid_value) -> Optional[User]:
"""Find user instance. Returns None if no user was found."""
current_stage: IdentificationStage = self.executor.current_stage
for search_field in current_stage.user_fields:
users = User.objects.filter(**{search_field: uid_value})
if users.exists():
LOGGER.debug("Found user", user=users.first(), uid_field=search_field)
return users.first()
return None
def form_valid(self, form: IdentificationForm) -> HttpResponse:
"""Form data is valid"""
pre_user = self.get_user(form.cleaned_data.get("uid_field"))
if not pre_user:
LOGGER.debug("invalid_login")
messages.error(self.request, _("Failed to authenticate."))
return self.executor.stage_invalid()
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = pre_user
return self.executor.stage_ok()