Merge pull request #101 from BeryJu/otp-rework

OTP Stage Rework
This commit is contained in:
Jens L 2020-06-30 22:03:28 +02:00 committed by GitHub
commit 7de2ad77b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 1160 additions and 556 deletions

View File

@ -128,6 +128,7 @@ class TestProviderSAML(SeleniumTestCase):
By.XPATH, "/html/body/div[2]/div/main/div/form/div[2]/p[1]" By.XPATH, "/html/body/div[2]/div/main/div/form/div[2]/p[1]"
).text, ).text,
) )
sleep(1)
self.driver.find_element(By.CSS_SELECTOR, "[type=submit]").click() self.driver.find_element(By.CSS_SELECTOR, "[type=submit]").click()
self.wait_for_url("http://localhost:9009/") self.wait_for_url("http://localhost:9009/")
self.assertEqual( self.assertEqual(

View File

@ -46,8 +46,8 @@ class SeleniumTestCase(StaticLiveServerTestCase):
makedirs("out", exist_ok=True) makedirs("out", exist_ok=True)
self.driver = self._get_driver() self.driver = self._get_driver()
self.driver.maximize_window() self.driver.maximize_window()
self.driver.implicitly_wait(60) self.driver.implicitly_wait(300)
self.wait = WebDriverWait(self.driver, 120) self.wait = WebDriverWait(self.driver, 500)
self.apply_default_data() self.apply_default_data()
self.logger = get_logger() self.logger = get_logger()
@ -68,7 +68,10 @@ class SeleniumTestCase(StaticLiveServerTestCase):
def wait_for_url(self, desired_url): def wait_for_url(self, desired_url):
"""Wait until URL is `desired_url`.""" """Wait until URL is `desired_url`."""
self.wait.until(lambda driver: driver.current_url == desired_url) self.wait.until(
lambda driver: driver.current_url == desired_url,
f"URL {self.driver.current_url} doesn't match expected URL {desired_url}",
)
def url(self, view, **kwargs) -> str: def url(self, view, **kwargs) -> str:
"""reverse `view` with `**kwargs` into full URL using live_server_url""" """reverse `view` with `**kwargs` into full URL using live_server_url"""

View File

@ -37,7 +37,9 @@ from passbook.stages.dummy.api import DummyStageViewSet
from passbook.stages.email.api import EmailStageViewSet from passbook.stages.email.api import EmailStageViewSet
from passbook.stages.identification.api import IdentificationStageViewSet from passbook.stages.identification.api import IdentificationStageViewSet
from passbook.stages.invitation.api import InvitationStageViewSet, InvitationViewSet from passbook.stages.invitation.api import InvitationStageViewSet, InvitationViewSet
from passbook.stages.otp.api import OTPStageViewSet from passbook.stages.otp_static.api import OTPStaticStageViewSet
from passbook.stages.otp_time.api import OTPTimeStageViewSet
from passbook.stages.otp_validate.api import OTPValidateStageViewSet
from passbook.stages.password.api import PasswordStageViewSet from passbook.stages.password.api import PasswordStageViewSet
from passbook.stages.prompt.api import PromptStageViewSet, PromptViewSet from passbook.stages.prompt.api import PromptStageViewSet, PromptViewSet
from passbook.stages.user_delete.api import UserDeleteStageViewSet from passbook.stages.user_delete.api import UserDeleteStageViewSet
@ -91,10 +93,12 @@ router.register("stages/email", EmailStageViewSet)
router.register("stages/identification", IdentificationStageViewSet) router.register("stages/identification", IdentificationStageViewSet)
router.register("stages/invitation", InvitationStageViewSet) router.register("stages/invitation", InvitationStageViewSet)
router.register("stages/invitation/invitations", InvitationViewSet) router.register("stages/invitation/invitations", InvitationViewSet)
router.register("stages/otp", OTPStageViewSet) router.register("stages/otp_static", OTPStaticStageViewSet)
router.register("stages/otp_time", OTPTimeStageViewSet)
router.register("stages/otp_validate", OTPValidateStageViewSet)
router.register("stages/password", PasswordStageViewSet) router.register("stages/password", PasswordStageViewSet)
router.register("stages/prompt/stages", PromptStageViewSet)
router.register("stages/prompt/prompts", PromptViewSet) router.register("stages/prompt/prompts", PromptViewSet)
router.register("stages/prompt/stages", PromptStageViewSet)
router.register("stages/user_delete", UserDeleteStageViewSet) router.register("stages/user_delete", UserDeleteStageViewSet)
router.register("stages/user_login", UserLoginStageViewSet) router.register("stages/user_login", UserLoginStageViewSet)
router.register("stages/user_logout", UserLogoutStageViewSet) router.register("stages/user_logout", UserLogoutStageViewSet)

View File

@ -5,9 +5,6 @@
{% block above_form %} {% block above_form %}
<div class="pf-c-form__group"> <div class="pf-c-form__group">
<label class="pf-c-form__label" for="{{ field.name }}-{{ forloop.counter0 }}">
<span class="pf-c-form__label-text">{% trans "Username" %}</span>
</label>
<div class="form-control-static"> <div class="form-control-static">
<div class="left"> <div class="left">
<img class="pf-c-avatar" src="{% gravatar user.email %}" alt=""> <img class="pf-c-avatar" src="{% gravatar user.email %}" alt="">

View File

@ -25,7 +25,7 @@
<ul class="pf-c-nav__list"> <ul class="pf-c-nav__list">
{% for stage in user_stages_loc %} {% for stage in user_stages_loc %}
<li class="pf-c-nav__item"> <li class="pf-c-nav__item">
<a href="{{ stage.url }}" class="pf-c-nav__link {% is_active stage.view_name %}"> <a href="{{ stage.url }}" class="pf-c-nav__link {% if stage.url == request.get_full_path %} pf-m-current {% endif %}">
{{ stage.name }} {{ stage.name }}
</a> </a>
</li> </li>
@ -41,7 +41,7 @@
{% for source in user_sources_loc %} {% for source in user_sources_loc %}
<li class="pf-c-nav__item"> <li class="pf-c-nav__item">
<a href="{{ source.view_name }}" <a href="{{ source.view_name }}"
class="pf-c-nav__link {% if user_settings.view_name == request.get_full_path %} pf-m-current {% endif %}"> class="pf-c-nav__link {% if source.url == request.get_full_path %} pf-m-current {% endif %}">
{{ source.name }} {{ source.name }}
</a> </a>
</li> </li>
@ -54,10 +54,12 @@
</div> </div>
<main role="main" class="pf-c-page__main" tabindex="-1" id="main-content"> <main role="main" class="pf-c-page__main" tabindex="-1" id="main-content">
<section class="pf-c-page__main-section"> <section class="pf-c-page__main-section">
<div class="pf-l-split pf-m-gutter"> <div class="pf-u-display-flex pf-u-justify-content-center">
<div class="pf-u-w-75">
{% block page %} {% block page %}
{% endblock %} {% endblock %}
</div> </div>
</div>
</section> </section>
</main> </main>
{% endblock %} {% endblock %}

View File

@ -3,10 +3,9 @@
{% load i18n %} {% load i18n %}
{% block page %} {% block page %}
<div class="pf-l-split__item"> <div class="pf-c-card">
<div class="pf-c-card">
<div class="pf-c-card__header pf-c-title pf-m-md"> <div class="pf-c-card__header pf-c-title pf-m-md">
<h1>{% trans 'Update details' %}</h1> {% trans 'Update details' %}
</div> </div>
<div class="pf-c-card__body"> <div class="pf-c-card__body">
<form action="" method="post" class="pf-c-form pf-m-horizontal"> <form action="" method="post" class="pf-c-form pf-m-horizontal">
@ -25,6 +24,5 @@
</div> </div>
</form> </form>
</div> </div>
</div>
</div> </div>
{% endblock %} {% endblock %}

View File

@ -23,7 +23,7 @@ def user_stages(context: RequestContext) -> List[UIUserSettings]:
if not user_settings: if not user_settings:
continue continue
matching_stages.append(user_settings) matching_stages.append(user_settings)
return matching_stages return sorted(matching_stages, key=lambda x: x.name)
@register.simple_tag(takes_context=True) @register.simple_tag(takes_context=True)
@ -42,4 +42,4 @@ def user_sources(context: RequestContext) -> List[UIUserSettings]:
policy_engine.build() policy_engine.build()
if policy_engine.passing: if policy_engine.passing:
matching_sources.append(user_settings) matching_sources.append(user_settings)
return matching_sources return sorted(matching_sources, key=lambda x: x.name)

View File

@ -13,5 +13,5 @@ class PassbookFlowsConfig(AppConfig):
verbose_name = "passbook Flows" verbose_name = "passbook Flows"
def ready(self): def ready(self):
"""Load policy cache clearing signals""" """Flow signals that clear the cache"""
import_module("passbook.flows.signals") import_module("passbook.flows.signals")

View File

@ -15,6 +15,13 @@ from passbook.policies.models import PolicyBindingModel
LOGGER = get_logger() LOGGER = get_logger()
class NotConfiguredAction(models.TextChoices):
"""Decides how the FlowExecutor should proceed when a stage isn't configured"""
SKIP = "skip"
# CONFIGURE = "configure"
class FlowDesignation(models.TextChoices): class FlowDesignation(models.TextChoices):
"""Designation of what a Flow should be used for. At a later point, this """Designation of what a Flow should be used for. At a later point, this
should be replaced by a database entry.""" should be replaced by a database entry."""

View File

@ -7,6 +7,13 @@ from structlog import get_logger
LOGGER = get_logger() LOGGER = get_logger()
def delete_cache_prefix(prefix: str) -> int:
"""Delete keys prefixed with `prefix` and return count of deleted keys."""
keys = cache.keys(prefix)
cache.delete_many(keys)
return len(keys)
@receiver(post_save) @receiver(post_save)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def invalidate_flow_cache(sender, instance, **_): def invalidate_flow_cache(sender, instance, **_):
@ -15,17 +22,16 @@ def invalidate_flow_cache(sender, instance, **_):
from passbook.flows.planner import cache_key from passbook.flows.planner import cache_key
if isinstance(instance, Flow): if isinstance(instance, Flow):
LOGGER.debug("Invalidating Flow cache", flow=instance) total = delete_cache_prefix(f"{cache_key(instance)}*")
cache.delete(f"{cache_key(instance)}*") LOGGER.debug("Invalidating Flow cache", flow=instance, len=total)
if isinstance(instance, FlowStageBinding): if isinstance(instance, FlowStageBinding):
LOGGER.debug("Invalidating Flow cache from FlowStageBinding", binding=instance) total = delete_cache_prefix(f"{cache_key(instance.flow)}*")
cache.delete(f"{cache_key(instance.flow)}*") LOGGER.debug(
"Invalidating Flow cache from FlowStageBinding", binding=instance, len=total
)
if isinstance(instance, Stage): if isinstance(instance, Stage):
LOGGER.debug("Invalidating Flow cache from Stage", stage=instance)
total = 0 total = 0
for binding in FlowStageBinding.objects.filter(stage=instance): for binding in FlowStageBinding.objects.filter(stage=instance):
prefix = cache_key(binding.flow) prefix = cache_key(binding.flow)
keys = cache.keys(f"{prefix}*") total += delete_cache_prefix(f"{prefix}*")
total += len(keys) LOGGER.debug("Invalidating Flow cache from Stage", stage=instance, len=total)
cache.delete_many(keys)
LOGGER.debug("Deleted keys", len=total)

View File

@ -111,6 +111,7 @@ class FlowExecutorView(View):
stage_response = self.current_stage_view.get(request, *args, **kwargs) stage_response = self.current_stage_view.get(request, *args, **kwargs)
return to_stage_response(request, stage_response) return to_stage_response(request, stage_response)
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
LOGGER.exception(exc)
return to_stage_response( return to_stage_response(
request, request,
render( render(
@ -132,6 +133,7 @@ class FlowExecutorView(View):
stage_response = self.current_stage_view.post(request, *args, **kwargs) stage_response = self.current_stage_view.post(request, *args, **kwargs)
return to_stage_response(request, stage_response) return to_stage_response(request, stage_response)
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
LOGGER.exception(exc)
return to_stage_response( return to_stage_response(
request, request,
render( render(

View File

@ -107,7 +107,9 @@ INSTALLED_APPS = [
"passbook.stages.user_login.apps.PassbookStageUserLoginConfig", "passbook.stages.user_login.apps.PassbookStageUserLoginConfig",
"passbook.stages.user_logout.apps.PassbookStageUserLogoutConfig", "passbook.stages.user_logout.apps.PassbookStageUserLogoutConfig",
"passbook.stages.user_write.apps.PassbookStageUserWriteConfig", "passbook.stages.user_write.apps.PassbookStageUserWriteConfig",
"passbook.stages.otp.apps.PassbookStageOTPConfig", "passbook.stages.otp_static.apps.PassbookStageOTPStaticConfig",
"passbook.stages.otp_time.apps.PassbookStageOTPTimeConfig",
"passbook.stages.otp_validate.apps.PassbookStageOTPValidateConfig",
"passbook.stages.password.apps.PassbookStagePasswordConfig", "passbook.stages.password.apps.PassbookStagePasswordConfig",
"passbook.static.apps.PassbookStaticConfig", "passbook.static.apps.PassbookStaticConfig",
] ]

View File

@ -1,4 +1,5 @@
"""OAuth Client models""" """OAuth Client models"""
from typing import Optional
from django.db import models from django.db import models
from django.urls import reverse, reverse_lazy from django.urls import reverse, reverse_lazy
@ -61,7 +62,7 @@ class OAuthSource(Source):
return f"Callback URL: <pre>{url}</pre>" return f"Callback URL: <pre>{url}</pre>"
@property @property
def ui_user_settings(self) -> UIUserSettings: def ui_user_settings(self) -> Optional[UIUserSettings]:
view_name = "passbook_sources_oauth:oauth-client-user" view_name = "passbook_sources_oauth:oauth-client-user"
return UIUserSettings( return UIUserSettings(
name=self.name, url=reverse(view_name, kwargs={"source_slug": self.slug}), name=self.name, url=reverse(view_name, kwargs={"source_slug": self.slug}),

View File

@ -1,21 +0,0 @@
"""OTPStage API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.stages.otp.models import OTPStage
class OTPStageSerializer(ModelSerializer):
"""OTPStage Serializer"""
class Meta:
model = OTPStage
fields = ["pk", "name", "enforced"]
class OTPStageViewSet(ModelViewSet):
"""OTPStage Viewset"""
queryset = OTPStage.objects.all()
serializer_class = OTPStageSerializer

View File

@ -1,12 +0,0 @@
"""passbook OTP AppConfig"""
from django.apps.config import AppConfig
class PassbookStageOTPConfig(AppConfig):
"""passbook OTP AppConfig"""
name = "passbook.stages.otp"
label = "passbook_stages_otp"
verbose_name = "passbook Stages.OTP"
mountpoint = "user/otp/"

View File

@ -1,78 +0,0 @@
"""passbook OTP Forms"""
from django import forms
from django.core.validators import RegexValidator
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from django_otp.models import Device
from passbook.stages.otp.models import OTPStage
OTP_CODE_VALIDATOR = RegexValidator(
r"^[0-9a-z]{6,8}$", _("Only alpha-numeric characters are allowed.")
)
class PictureWidget(forms.widgets.Widget):
"""Widget to render value as img-tag"""
def render(self, name, value, attrs=None, renderer=None):
return mark_safe(f'<img src="{value}" />') # nosec
class OTPVerifyForm(forms.Form):
"""Simple Form to verify OTP Code"""
order = ["code"]
code = forms.CharField(
label=_("Code"),
validators=[OTP_CODE_VALIDATOR],
widget=forms.TextInput(attrs={"autocomplete": "off", "placeholder": "Code"}),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# This is a little helper so the field is focused by default
self.fields["code"].widget.attrs.update(
{"autofocus": "autofocus", "autocomplete": "off"}
)
class OTPSetupForm(forms.Form):
"""OTP Setup form"""
title = _("Set up OTP")
device: Device = None
qr_code = forms.CharField(
widget=PictureWidget,
disabled=True,
required=False,
label=_("Scan this Code with your OTP App."),
)
code = forms.CharField(
label=_("Code"),
validators=[OTP_CODE_VALIDATOR],
widget=forms.TextInput(attrs={"placeholder": _("One-Time Password")}),
)
tokens = forms.MultipleChoiceField(disabled=True, required=False)
def clean_code(self):
"""Check code with new otp device"""
if self.device is not None:
if not self.device.verify_token(int(self.cleaned_data.get("code"))):
raise forms.ValidationError(_("OTP Code does not match"))
return self.cleaned_data.get("code")
class OTPStageForm(forms.ModelForm):
"""Form to edit OTPStage instances"""
class Meta:
model = OTPStage
fields = ["name", "enforced"]
widgets = {
"name": forms.TextInput(),
}

View File

@ -1,33 +0,0 @@
"""OTP Stage"""
from django.db import models
from django.urls import reverse
from django.utils.translation import gettext as _
from passbook.core.types import UIUserSettings
from passbook.flows.models import Stage
class OTPStage(Stage):
"""OTP Stage"""
enforced = models.BooleanField(
default=False,
help_text=("Enforce enabled OTP for Users " "this stage applies to."),
)
type = "passbook.stages.otp.stages.OTPStage"
form = "passbook.stages.otp.forms.OTPStageForm"
@property
def ui_user_settings(self) -> UIUserSettings:
return UIUserSettings(
name="OTP", url=reverse("passbook_stages_otp:otp-user-settings"),
)
def __str__(self):
return f"OTP Stage {self.name}"
class Meta:
verbose_name = _("OTP Stage")
verbose_name_plural = _("OTP Stages")

View File

@ -1,10 +0,0 @@
"""passbook OTP Settings"""
MIDDLEWARE = [
"django_otp.middleware.OTPMiddleware",
]
INSTALLED_APPS = [
"django_otp",
"django_otp.plugins.otp_static",
"django_otp.plugins.otp_totp",
]

View File

@ -1,59 +0,0 @@
"""OTP Stage logic"""
from django.contrib import messages
from django.utils.translation import gettext as _
from django.views.generic import FormView
from django_otp import match_token, user_has_device
from structlog import get_logger
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.flows.stage import StageView
from passbook.stages.otp.forms import OTPVerifyForm
from passbook.stages.otp.views import OTP_SETTING_UP_KEY, EnableView
LOGGER = get_logger()
class OTPStage(FormView, StageView):
"""OTP Stage View"""
template_name = "stages/otp/stage.html"
form_class = OTPVerifyForm
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
kwargs["title"] = _("Enter Verification Code")
return kwargs
def get(self, request, *args, **kwargs):
"""Check if User has OTP enabled and if OTP is enforced"""
pending_user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
if not user_has_device(pending_user):
LOGGER.debug("User doesn't have OTP Setup.")
if self.executor.current_stage.enforced:
# Redirect to setup view
LOGGER.debug("OTP is enforced, redirecting to setup")
request.user = pending_user
messages.info(request, _("OTP is enforced. Please setup OTP."))
return EnableView.as_view()(request)
LOGGER.debug("OTP is not enforced, skipping form")
return self.executor.user_ok()
return super().get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
"""Check if setup is in progress and redirect to EnableView"""
if OTP_SETTING_UP_KEY in request.session:
LOGGER.debug("Passing POST to EnableView")
request.user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
return EnableView.as_view()(request)
return super().post(self, request, *args, **kwargs)
def form_valid(self, form: OTPVerifyForm):
"""Verify OTP Token"""
device = match_token(
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER],
form.cleaned_data.get("code"),
)
if device:
return self.executor.stage_ok()
messages.error(self.request, _("Invalid OTP."))
return self.form_invalid(form)

View File

@ -1,8 +0,0 @@
{% extends 'login/form_with_user.html' %}
{% load i18n %}
{% block above_form %}
{{ block.super }}
<p><b>{% trans 'Enter the Verification Code from your Authenticator App.' %}</b></p>
{% endblock %}

View File

@ -1,12 +0,0 @@
"""passbook OTP Urls"""
from django.urls import path
from passbook.stages.otp import views
urlpatterns = [
path("", views.UserSettingsView.as_view(), name="otp-user-settings"),
path("qr/", views.QRView.as_view(), name="otp-qr"),
path("enable/", views.EnableView.as_view(), name="otp-enable"),
path("disable/", views.DisableView.as_view(), name="otp-disable"),
]

View File

@ -1,17 +0,0 @@
"""passbook OTP Utils"""
from django.utils.http import urlencode
def otpauth_url(accountname, secret, issuer=None, digits=6):
"""Create otpauth according to
https://github.com/google/google-authenticator/wiki/Key-Uri-Format"""
# Ensure that the secret parameter is the FIRST parameter of the URI, this
# allows Microsoft Authenticator to work.
query = [
("secret", secret),
("digits", digits),
("issuer", "passbook"),
]
return "otpauth://totp/%s:%s?%s" % (issuer, accountname, urlencode(query))

View File

@ -1,166 +0,0 @@
"""passbook OTP Views"""
from base64 import b32encode
from binascii import unhexlify
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.translation import ugettext as _
from django.views import View
from django.views.decorators.cache import never_cache
from django.views.generic import FormView, TemplateView
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
from django_otp.plugins.otp_totp.models import TOTPDevice
from qrcode import make
from qrcode.image.svg import SvgPathImage
from structlog import get_logger
from passbook.audit.models import Event, EventAction
from passbook.lib.config import CONFIG
from passbook.stages.otp.forms import OTPSetupForm
from passbook.stages.otp.utils import otpauth_url
OTP_SESSION_KEY = "passbook_stages_otp_key"
OTP_SETTING_UP_KEY = "passbook_stages_otp_setup"
LOGGER = get_logger()
class UserSettingsView(LoginRequiredMixin, TemplateView):
"""View for user settings to control OTP"""
template_name = "stages/otp/user_settings.html"
# TODO: Check if OTP Stage exists and applies to user
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
static = StaticDevice.objects.filter(user=self.request.user, confirmed=True)
if static.exists():
kwargs["static_tokens"] = StaticToken.objects.filter(
device=static.first()
).order_by("token")
totp_devices = TOTPDevice.objects.filter(user=self.request.user, confirmed=True)
kwargs["state"] = totp_devices.exists() and static.exists()
return kwargs
class DisableView(LoginRequiredMixin, View):
"""Disable TOTP for user"""
def get(self, request: HttpRequest) -> HttpResponse:
"""Delete all the devices for user"""
static = get_object_or_404(StaticDevice, user=request.user, confirmed=True)
static_tokens = StaticToken.objects.filter(device=static).order_by("token")
totp = TOTPDevice.objects.filter(user=request.user, confirmed=True)
static.delete()
totp.delete()
for token in static_tokens:
token.delete()
messages.success(request, "Successfully disabled OTP")
# Create event with email notification
Event.new(EventAction.CUSTOM, message="User disabled OTP.").from_http(request)
return redirect(reverse("passbook_stages_otp:otp-user-settings"))
class EnableView(LoginRequiredMixin, FormView):
"""View to set up OTP"""
title = _("Set up OTP")
form_class = OTPSetupForm
template_name = "login/form.html"
totp_device = None
static_device = None
# TODO: Check if OTP Stage exists and applies to user
def get_context_data(self, **kwargs):
kwargs["config"] = CONFIG.y("passbook")
kwargs["title"] = _("Configure OTP")
kwargs["primary_action"] = _("Setup")
return super().get_context_data(**kwargs)
def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
# Check if user has TOTP setup already
finished_totp_devices = TOTPDevice.objects.filter(
user=request.user, confirmed=True
)
finished_static_devices = StaticDevice.objects.filter(
user=request.user, confirmed=True
)
if finished_totp_devices.exists() and finished_static_devices.exists():
messages.error(request, _("You already have TOTP enabled!"))
del request.session[OTP_SETTING_UP_KEY]
return redirect("passbook_stages_otp:otp-user-settings")
request.session[OTP_SETTING_UP_KEY] = True
# Check if there's an unconfirmed device left to set up
totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=False)
if not totp_devices.exists():
# Create new TOTPDevice and save it, but not confirm it
self.totp_device = TOTPDevice(user=request.user, confirmed=False)
self.totp_device.save()
else:
self.totp_device = totp_devices.first()
# Check if we have a static device already
static_devices = StaticDevice.objects.filter(user=request.user, confirmed=False)
if not static_devices.exists():
# Create new static device and some codes
self.static_device = StaticDevice(user=request.user, confirmed=False)
self.static_device.save()
# Create 9 tokens and save them
# TODO: Send static tokens via Email
for _counter in range(0, 9):
token = StaticToken(
device=self.static_device, token=StaticToken.random_token()
)
token.save()
else:
self.static_device = static_devices.first()
# Somehow convert the generated key to base32 for the QR code
rawkey = unhexlify(self.totp_device.key.encode("ascii"))
request.session[OTP_SESSION_KEY] = b32encode(rawkey).decode("utf-8")
return super().dispatch(request, *args, **kwargs)
def get_form(self, form_class=None):
form = super().get_form(form_class=form_class)
form.device = self.totp_device
form.fields["qr_code"].initial = reverse("passbook_stages_otp:otp-qr")
tokens = [(x.token, x.token) for x in self.static_device.token_set.all()]
form.fields["tokens"].choices = tokens
return form
def form_valid(self, form):
# Save device as confirmed
LOGGER.debug("Saved OTP Devices")
self.totp_device.confirmed = True
self.totp_device.save()
self.static_device.confirmed = True
self.static_device.save()
del self.request.session[OTP_SETTING_UP_KEY]
Event.new(EventAction.CUSTOM, message="User enabled OTP.").from_http(
self.request
)
return redirect("passbook_stages_otp:otp-user-settings")
@method_decorator(never_cache, name="dispatch")
class QRView(View):
"""View returns an SVG image with the OTP token information"""
def get(self, request: HttpRequest) -> HttpResponse:
"""View returns an SVG image with the OTP token information"""
# Get the data from the session
try:
key = request.session[OTP_SESSION_KEY]
except KeyError:
raise Http404
url = otpauth_url(accountname=request.user.username, secret=key)
# Make and return QR code
img = make(url, image_factory=SvgPathImage)
resp = HttpResponse(content_type="image/svg+xml; charset=utf-8")
img.save(resp)
return resp

View File

@ -0,0 +1,21 @@
"""OTPStaticStage API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.stages.otp_static.models import OTPStaticStage
class OTPStaticStageSerializer(ModelSerializer):
"""OTPStaticStage Serializer"""
class Meta:
model = OTPStaticStage
fields = ["pk", "name", "token_count"]
class OTPStaticStageViewSet(ModelViewSet):
"""OTPStaticStage Viewset"""
queryset = OTPStaticStage.objects.all()
serializer_class = OTPStaticStageSerializer

View File

@ -0,0 +1,11 @@
"""OTP Static stage"""
from django.apps import AppConfig
class PassbookStageOTPStaticConfig(AppConfig):
"""OTP Static stage"""
name = "passbook.stages.otp_static"
label = "passbook_stages_otp_static"
verbose_name = "passbook OTP.Static"
mountpoint = "-/user/otp/static/"

View File

@ -0,0 +1,39 @@
"""OTP Static forms"""
from django import forms
from django.utils.safestring import mark_safe
from passbook.stages.otp_static.models import OTPStaticStage
class StaticTokenWidget(forms.widgets.Widget):
"""Widget to render tokens as multiple labels"""
def render(self, name, value, attrs=None, renderer=None):
final_string = '<ul class="pb-otp-tokens">'
for token in value:
final_string += f"<li>{token.token}</li>"
final_string += "</ul>"
return mark_safe(final_string) # nosec
class SetupForm(forms.Form):
"""Form to setup Static OTP"""
tokens = forms.CharField(widget=StaticTokenWidget, disabled=True, required=False)
def __init__(self, tokens, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["tokens"].initial = tokens
class OTPStaticStageForm(forms.ModelForm):
"""OTP Static Stage setup form"""
class Meta:
model = OTPStaticStage
fields = ["name", "token_count"]
widgets = {
"name": forms.TextInput(),
}

View File

@ -0,0 +1,38 @@
# Generated by Django 3.0.7 on 2020-06-30 11:43
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("passbook_flows", "0006_auto_20200629_0857"),
]
operations = [
migrations.CreateModel(
name="OTPStaticStage",
fields=[
(
"stage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="passbook_flows.Stage",
),
),
("token_count", models.IntegerField(default=6)),
],
options={
"verbose_name": "OTP Static Setup Stage",
"verbose_name_plural": "OTP Static Setup Stages",
},
bases=("passbook_flows.stage",),
),
]

View File

@ -0,0 +1,32 @@
"""OTP Static models"""
from typing import Optional
from django.db import models
from django.shortcuts import reverse
from django.utils.translation import gettext_lazy as _
from passbook.core.types import UIUserSettings
from passbook.flows.models import Stage
class OTPStaticStage(Stage):
"""Generate static tokens for the user as a backup"""
token_count = models.IntegerField(default=6)
type = "passbook.stages.otp_static.stage.OTPStaticStageView"
form = "passbook.stages.otp_static.forms.OTPStaticStageForm"
@property
def ui_user_settings(self) -> Optional[UIUserSettings]:
return UIUserSettings(
name="Static OTP", url=reverse("passbook_stages_otp_static:user-settings"),
)
def __str__(self) -> str:
return f"OTP Static Stage {self.name}"
class Meta:
verbose_name = _("OTP Static Setup Stage")
verbose_name_plural = _("OTP Static Setup Stages")

View File

@ -0,0 +1,5 @@
"""OTP Static settings"""
INSTALLED_APPS = [
"django_otp.plugins.otp_static",
]

View File

@ -0,0 +1,62 @@
"""Static OTP Setup stage"""
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
from django.views.generic import FormView
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
from structlog import get_logger
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.flows.stage import StageView
from passbook.stages.otp_static.forms import SetupForm
from passbook.stages.otp_static.models import OTPStaticStage
LOGGER = get_logger()
SESSION_STATIC_DEVICE = "static_device"
SESSION_STATIC_TOKENS = "static_device_tokens"
class OTPStaticStageView(FormView, StageView):
"""Static OTP Setup stage"""
form_class = SetupForm
def get_form_kwargs(self, **kwargs) -> Dict[str, Any]:
kwargs = super().get_form_kwargs(**kwargs)
tokens = self.request.session[SESSION_STATIC_TOKENS]
kwargs["tokens"] = tokens
return kwargs
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
if not user:
LOGGER.debug("No pending user, continuing")
return self.executor.stage_ok()
# Currently, this stage only supports one device per user. If the user already
# has a device, just skip to the next stage
if StaticDevice.objects.filter(user=user).exists():
return self.executor.stage_ok()
stage: OTPStaticStage = self.executor.current_stage
if SESSION_STATIC_DEVICE not in self.request.session:
device = StaticDevice(user=user, confirmed=True)
tokens = []
for _ in range(0, stage.token_count):
tokens.append(
StaticToken(device=device, token=StaticToken.random_token())
)
self.request.session[SESSION_STATIC_DEVICE] = device
self.request.session[SESSION_STATIC_TOKENS] = tokens
return super().get(request, *args, **kwargs)
def form_valid(self, form: SetupForm) -> HttpResponse:
"""Verify OTP Token"""
device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE]
device.save()
for token in self.request.session[SESSION_STATIC_TOKENS]:
token.save()
del self.request.session[SESSION_STATIC_DEVICE]
del self.request.session[SESSION_STATIC_TOKENS]
return self.executor.stage_ok()

View File

@ -0,0 +1,20 @@
{% extends "user/base.html" %}
{% load passbook_utils %}
{% load i18n %}
{% block page %}
<div class="pf-c-card">
<div class="pf-c-card__header pf-c-title pf-m-md">
{% trans "Static One-Time Passwords" %}
</div>
<div class="pf-c-card__body">
<ul class="pb-otp-tokens">
{% for token in tokens %}
<li>{{ token.token }}</li>
{% endfor %}
</ul>
<a href="{% url 'passbook_stages_otp_static:disable' %}" class="pf-c-button pf-m-danger">{% trans "Disable Static Tokens" %}</a>
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,9 @@
"""OTP static urls"""
from django.urls import path
from passbook.stages.otp_static.views import DisableView, UserSettingsView
urlpatterns = [
path("settings", UserSettingsView.as_view(), name="user-settings"),
path("disable", DisableView.as_view(), name="disable"),
]

View File

@ -0,0 +1,41 @@
"""otp Static view Tokens"""
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpRequest, HttpResponse
from django.shortcuts import redirect
from django.views import View
from django.views.generic import TemplateView
from django_otp.plugins.otp_static.models import StaticToken, StaticDevice
from passbook.audit.models import Event, EventAction
class UserSettingsView(LoginRequiredMixin, TemplateView):
"""View for user settings to control OTP"""
template_name = "stages/otp_static/user_settings.html"
# TODO: Check if OTP Stage exists and applies to user
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
static_devices = StaticDevice.objects.filter(
user=self.request.user, confirmed=True
)
if static_devices.exists():
kwargs["tokens"] = StaticToken.objects.filter(device=static_devices.first())
return kwargs
class DisableView(LoginRequiredMixin, View):
"""Disable Static Tokens for user"""
def get(self, request: HttpRequest) -> HttpResponse:
"""Delete all the devices for user"""
devices = StaticDevice.objects.filter(user=request.user, confirmed=True)
devices.delete()
messages.success(request, "Successfully disabled Static OTP Tokens")
# Create event with email notification
Event.new(
EventAction.CUSTOM, message="User disabled Static OTP Tokens."
).from_http(request)
return redirect("passbook_stages_otp:otp-user-settings")

View File

View File

@ -0,0 +1,21 @@
"""OTPTimeStage API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.stages.otp_time.models import OTPTimeStage
class OTPTimeStageSerializer(ModelSerializer):
"""OTPTimeStage Serializer"""
class Meta:
model = OTPTimeStage
fields = ["pk", "name", "digits"]
class OTPTimeStageViewSet(ModelViewSet):
"""OTPTimeStage Viewset"""
queryset = OTPTimeStage.objects.all()
serializer_class = OTPTimeStageSerializer

View File

@ -0,0 +1,11 @@
"""OTP Time"""
from django.apps import AppConfig
class PassbookStageOTPTimeConfig(AppConfig):
"""OTP time App config"""
name = "passbook.stages.otp_time"
label = "passbook_stages_otp_time"
verbose_name = "passbook OTP.Time"
mountpoint = "-/user/otp/time/"

View File

@ -0,0 +1,64 @@
"""OTP Time forms"""
from django import forms
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from django_otp.models import Device
from passbook.stages.otp_time.models import OTPTimeStage
from passbook.stages.otp_validate.forms import OTP_CODE_VALIDATOR
class PictureWidget(forms.widgets.Widget):
"""Widget to render value as img-tag"""
def render(self, name, value, attrs=None, renderer=None):
return mark_safe(f"<br>{value}") # nosec
class SetupForm(forms.Form):
"""Form to setup Time-based OTP"""
device: Device = None
qr_code = forms.CharField(
widget=PictureWidget,
disabled=True,
required=False,
label=_("Scan this Code with your OTP App."),
)
code = forms.CharField(
label=_("Please enter the Token on your device."),
validators=[OTP_CODE_VALIDATOR],
widget=forms.TextInput(
attrs={
"autocomplete": "off",
"placeholder": "Code",
"autofocus": "autofocus",
}
),
)
def __init__(self, device, qr_code, *args, **kwargs):
super().__init__(*args, **kwargs)
self.device = device
self.fields["qr_code"].initial = qr_code
def clean_code(self):
"""Check code with new otp device"""
if self.device is not None:
if not self.device.verify_token(self.cleaned_data.get("code")):
raise forms.ValidationError(_("OTP Code does not match"))
return self.cleaned_data.get("code")
class OTPTimeStageForm(forms.ModelForm):
"""OTP Time-based Stage setup form"""
class Meta:
model = OTPTimeStage
fields = ["name", "digits"]
widgets = {
"name": forms.TextInput(),
}

View File

@ -0,0 +1,38 @@
# Generated by Django 3.0.7 on 2020-06-13 15:28
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("passbook_flows", "0005_provider_flows"),
]
operations = [
migrations.CreateModel(
name="OTPTimeStage",
fields=[
(
"stage_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="passbook_flows.Stage",
),
),
("digits", models.IntegerField(choices=[(6, "Six"), (8, "Eight")])),
],
options={
"verbose_name": "OTP Time (TOTP) Setup Stage",
"verbose_name_plural": "OTP Time (TOTP) Setup Stages",
},
bases=("passbook_flows.stage",),
),
]

View File

@ -0,0 +1,40 @@
"""OTP Time-based models"""
from typing import Optional
from django.db import models
from django.shortcuts import reverse
from django.utils.translation import gettext_lazy as _
from passbook.core.types import UIUserSettings
from passbook.flows.models import Stage
class TOTPDigits(models.IntegerChoices):
"""OTP Time Digits"""
SIX = 6, _("6 digits, widely compatible")
EIGHT = 8, _("8 digits, not compatible with apps like Google Authenticator")
class OTPTimeStage(Stage):
"""Enroll a user's device into Time-based OTP"""
digits = models.IntegerField(choices=TOTPDigits.choices)
type = "passbook.stages.otp_time.stage.OTPTimeStageView"
form = "passbook.stages.otp_time.forms.OTPTimeStageForm"
@property
def ui_user_settings(self) -> Optional[UIUserSettings]:
return UIUserSettings(
name="Time-based OTP",
url=reverse("passbook_stages_otp_time:user-settings"),
)
def __str__(self) -> str:
return f"OTP Time (TOTP) Stage {self.name}"
class Meta:
verbose_name = _("OTP Time (TOTP) Setup Stage")
verbose_name_plural = _("OTP Time (TOTP) Setup Stages")

View File

@ -0,0 +1,6 @@
"""OTP Time"""
INSTALLED_APPS = [
"django_otp.plugins.otp_totp",
]
OTP_TOTP_ISSUER = "passbook"

View File

@ -0,0 +1,64 @@
"""TOTP Setup stage"""
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
from django.utils.encoding import force_text
from django.views.generic import FormView
from django_otp.plugins.otp_totp.models import TOTPDevice
from lxml.etree import tostring # nosec
from qrcode import QRCode
from qrcode.image.svg import SvgFillImage
from structlog import get_logger
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.flows.stage import StageView
from passbook.stages.otp_time.forms import SetupForm
from passbook.stages.otp_time.models import OTPTimeStage
LOGGER = get_logger()
SESSION_TOTP_DEVICE = "totp_device"
class OTPTimeStageView(FormView, StageView):
"""OTP totp Setup stage"""
form_class = SetupForm
def get_form_kwargs(self, **kwargs) -> Dict[str, Any]:
kwargs = super().get_form_kwargs(**kwargs)
device: TOTPDevice = self.request.session[SESSION_TOTP_DEVICE]
kwargs["device"] = device
kwargs["qr_code"] = self._get_qr_code(device)
return kwargs
def _get_qr_code(self, device: TOTPDevice) -> str:
"""Get QR Code SVG as string based on `device`"""
qr_code = QRCode(image_factory=SvgFillImage)
qr_code.add_data(device.config_url)
return force_text(tostring(qr_code.make_image().get_image()))
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
if not user:
LOGGER.debug("No pending user, continuing")
return self.executor.stage_ok()
# Currently, this stage only supports one device per user. If the user already
# has a device, just skip to the next stage
if TOTPDevice.objects.filter(user=user).exists():
return self.executor.stage_ok()
stage: OTPTimeStage = self.executor.current_stage
if SESSION_TOTP_DEVICE not in self.request.session:
device = TOTPDevice(user=user, confirmed=True, digits=stage.digits)
self.request.session[SESSION_TOTP_DEVICE] = device
return super().get(request, *args, **kwargs)
def form_valid(self, form: SetupForm) -> HttpResponse:
"""Verify OTP Token"""
device: TOTPDevice = self.request.session[SESSION_TOTP_DEVICE]
device.save()
del self.request.session[SESSION_TOTP_DEVICE]
return self.executor.stage_ok()

View File

@ -6,7 +6,7 @@
{% block page %} {% block page %}
<div class="pf-c-card"> <div class="pf-c-card">
<div class="pf-c-card__header pf-c-title pf-m-md"> <div class="pf-c-card__header pf-c-title pf-m-md">
{% trans "One-Time Passwords" %} {% trans "Time-based One-Time Passwords" %}
</div> </div>
<div class="pf-c-card__body"> <div class="pf-c-card__body">
<p> <p>
@ -21,22 +21,11 @@
</p> </p>
<p> <p>
{% if not state %} {% if not state %}
<a href="{% url 'passbook_stages_otp:otp-enable' %}" class="btn btn-success btn-sm">{% trans "Enable OTP" %}</a> <a href="{% url 'passbook_stages_otp_time:otp-enable' %}" class="pf-c-button pf-m-primary">{% trans "Enable Time-based OTP" %}</a>
{% else %} {% else %}
<a href="{% url 'passbook_stages_otp:otp-disable' %}" class="btn btn-danger btn-sm">{% trans "Disable OTP" %}</a> <a href="{% url 'passbook_stages_otp_time:disable' %}" class="pf-c-button pf-m-danger">{% trans "Disable Time-based OTP" %}</a>
{% endif %} {% endif %}
</p> </p>
</div> </div>
</div> </div>
<div class="pf-c-card">
<div class="pf-c-card__header pf-c-title pf-m-md">
{% trans "Your Backup tokens:" %}
</div>
<div class="pf-c-card__body">
<pre>{% for token in static_tokens %}{{ token.token }}
{% empty %}{% trans 'N/A' %}{% endfor %}</pre>
</div>
</div>
</div>
{% endblock %} {% endblock %}

View File

@ -0,0 +1,9 @@
"""OTP Time urls"""
from django.urls import path
from passbook.stages.otp_time.views import DisableView, UserSettingsView
urlpatterns = [
path("settings", UserSettingsView.as_view(), name="user-settings"),
path("disable", DisableView.as_view(), name="disable"),
]

View File

@ -0,0 +1,38 @@
"""otp time-based view"""
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpRequest, HttpResponse
from django.shortcuts import redirect
from django.views import View
from django.views.generic import TemplateView
from django_otp.plugins.otp_totp.models import TOTPDevice
from passbook.audit.models import Event, EventAction
class UserSettingsView(LoginRequiredMixin, TemplateView):
"""View for user settings to control OTP"""
template_name = "stages/otp_time/user_settings.html"
# TODO: Check if OTP Stage exists and applies to user
def get_context_data(self, **kwargs):
kwargs = super().get_context_data(**kwargs)
totp_devices = TOTPDevice.objects.filter(user=self.request.user, confirmed=True)
kwargs["state"] = totp_devices.exists()
return kwargs
class DisableView(LoginRequiredMixin, View):
"""Disable TOTP for user"""
def get(self, request: HttpRequest) -> HttpResponse:
"""Delete all the devices for user"""
totp = TOTPDevice.objects.filter(user=request.user, confirmed=True)
totp.delete()
messages.success(request, "Successfully disabled Time-based OTP")
# Create event with email notification
Event.new(
EventAction.CUSTOM, message="User disabled Time-based OTP."
).from_http(request)
return redirect("passbook_stages_otp:otp-user-settings")

View File

View File

@ -0,0 +1,24 @@
"""OTPValidateStage API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.stages.otp_validate.models import OTPValidateStage
class OTPValidateStageSerializer(ModelSerializer):
"""OTPValidateStage Serializer"""
class Meta:
model = OTPValidateStage
fields = [
"pk",
"name",
]
class OTPValidateStageViewSet(ModelViewSet):
"""OTPValidateStage Viewset"""
queryset = OTPValidateStage.objects.all()
serializer_class = OTPValidateStageSerializer

View File

@ -0,0 +1,10 @@
"""OTP Validation Stage"""
from django.apps import AppConfig
class PassbookStageOTPValidateConfig(AppConfig):
"""OTP Validation Stage"""
name = "passbook.stages.otp_validate"
label = "passbook_stages_otp_validate"
verbose_name = "passbook OTP.Validate"

View File

@ -0,0 +1,55 @@
"""OTP Validate stage forms"""
from django import forms
from django.core.validators import RegexValidator
from django.utils.translation import gettext_lazy as _
from django_otp import match_token
from passbook.core.models import User
from passbook.stages.otp_validate.models import OTPValidateStage
OTP_CODE_VALIDATOR = RegexValidator(
r"^[0-9a-z]{6,8}$", _("Only alpha-numeric characters are allowed.")
)
class ValidationForm(forms.Form):
"""OTP Validate stage forms"""
user: User
code = forms.CharField(
label=_("Please enter the token from your device."),
validators=[OTP_CODE_VALIDATOR],
widget=forms.TextInput(
attrs={
"autocomplete": "off",
"placeholder": "123456",
"autofocus": "autofocus",
}
),
)
def __init__(self, user, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
def clean_code(self):
"""Validate code against all confirmed devices"""
code = self.cleaned_data.get("code")
device = match_token(self.user, code)
if not device:
raise forms.ValidationError(_("Invalid Token"))
return code
class OTPValidateStageForm(forms.ModelForm):
"""OTP Validate stage forms"""
class Meta:
model = OTPValidateStage
fields = ["name"]
widgets = {
"name": forms.TextInput(),
}

View File

@ -1,4 +1,4 @@
# Generated by Django 3.0.6 on 2020-05-19 22:08 # Generated by Django 3.0.7 on 2020-06-13 15:28
import django.db.models.deletion import django.db.models.deletion
from django.db import migrations, models from django.db import migrations, models
@ -9,12 +9,12 @@ class Migration(migrations.Migration):
initial = True initial = True
dependencies = [ dependencies = [
("passbook_flows", "0001_initial"), ("passbook_flows", "0005_provider_flows"),
] ]
operations = [ operations = [
migrations.CreateModel( migrations.CreateModel(
name="OTPStage", name="OTPValidateStage",
fields=[ fields=[
( (
"stage_ptr", "stage_ptr",
@ -28,14 +28,14 @@ class Migration(migrations.Migration):
), ),
), ),
( (
"enforced", "not_configured_action",
models.BooleanField( models.TextField(choices=[("skip", "Skip")], default="skip"),
default=False,
help_text="Enforce enabled OTP for Users this stage applies to.",
),
), ),
], ],
options={"verbose_name": "OTP Stage", "verbose_name_plural": "OTP Stages",}, options={
"verbose_name": "OTP Validation Stage",
"verbose_name_plural": "OTP Validation Stages",
},
bases=("passbook_flows.stage",), bases=("passbook_flows.stage",),
), ),
] ]

View File

@ -0,0 +1,24 @@
"""OTP Validation Stage"""
from django.db import models
from django.utils.translation import gettext_lazy as _
from passbook.flows.models import NotConfiguredAction, Stage
class OTPValidateStage(Stage):
"""Validate user's configured OTP Device"""
not_configured_action = models.TextField(
choices=NotConfiguredAction.choices, default=NotConfiguredAction.SKIP
)
type = "passbook.stages.otp_validate.stage.OTPValidateStageView"
form = "passbook.stages.otp_validate.forms.OTPValidateStageForm"
def __str__(self) -> str:
return f"OTP Validation Stage {self.name}"
class Meta:
verbose_name = _("OTP Validation Stage")
verbose_name_plural = _("OTP Validation Stages")

View File

@ -0,0 +1,4 @@
"""OTP Validate stage settings"""
INSTALLED_APPS = [
"django_otp",
]

View File

@ -0,0 +1,46 @@
"""OTP Validation"""
from typing import Any, Dict
from django.http import HttpRequest, HttpResponse
from django.views.generic import FormView
from django_otp import user_has_device
from structlog import get_logger
from passbook.flows.models import NotConfiguredAction
from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
from passbook.flows.stage import StageView
from passbook.stages.otp_validate.forms import ValidationForm
from passbook.stages.otp_validate.models import OTPValidateStage
LOGGER = get_logger()
class OTPValidateStageView(FormView, StageView):
"""OTP Validation"""
form_class = ValidationForm
def get_form_kwargs(self, **kwargs) -> Dict[str, Any]:
kwargs = super().get_form_kwargs(**kwargs)
kwargs["user"] = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
return kwargs
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
if not user:
LOGGER.debug("No pending user, continuing")
return self.executor.stage_ok()
has_devices = user_has_device(user)
stage: OTPValidateStage = self.executor.current_stage
if not has_devices:
if stage.not_configured_action == NotConfiguredAction.SKIP:
LOGGER.debug("OTP not configured, skipping stage")
return self.executor.stage_ok()
return super().get(request, *args, **kwargs)
def form_valid(self, form: ValidationForm) -> HttpResponse:
"""Verify OTP Token"""
# Since we do token checking in the form, we know the token is valid here
# so we can just continue
return self.executor.stage_ok()

View File

@ -28,13 +28,14 @@ class PasswordForm(forms.Form):
widget=forms.HiddenInput(attrs={"autocomplete": "username"}), required=False widget=forms.HiddenInput(attrs={"autocomplete": "username"}), required=False
) )
password = forms.CharField( password = forms.CharField(
label=_("Please enter your password."),
widget=forms.PasswordInput( widget=forms.PasswordInput(
attrs={ attrs={
"placeholder": _("Password"), "placeholder": _("Password"),
"autofocus": "autofocus", "autofocus": "autofocus",
"autocomplete": "current-password", "autocomplete": "current-password",
} }
) ),
) )

View File

@ -1,39 +1,10 @@
{% extends 'login/form_with_user.html' %}
{% load i18n %} {% load i18n %}
{% load passbook_utils %} {% load passbook_utils %}
<header class="pf-c-login__main-header"> {% block beneath_form %}
<h1 class="pf-c-title pf-m-3xl"> {% if recovery_flow %}
{% block card_title %} <a href="{% url 'passbook_flows:flow-executor' flow_slug=recovery_flow.slug %}">{% trans 'Forgot password?' %}</a>
{% trans title %} {% endif %}
{% endblock %} {% endblock %}
</h1>
</header>
<div class="pf-c-login__main-body">
{% block card %}
<form method="POST" class="pf-c-form">
<div class="pf-c-form__group">
<label class="pf-c-form__label" for="{{ field.name }}-{{ forloop.counter0 }}">
<span class="pf-c-form__label-text">{% trans "Username" %}</span>
</label>
<div class="form-control-static">
<div class="left">
<img class="pf-c-avatar" src="{% gravatar user.email %}" alt="">
{{ user.username }}
</div>
<div class="right">
<a href="{% url 'passbook_flows:cancel' %}">{% trans 'Not you?' %}</a>
</div>
</div>
</div>
{% include 'partials/form.html' %}
{% if recovery_flow %}
<a href="{% url 'passbook_flows:flow-executor' flow_slug=recovery_flow.slug %}">{% trans 'Forgot password?' %}</a>
{% endif %}
<div class="pf-c-form__group pf-m-action">
<button class="pf-c-button pf-m-primary pf-m-block" type="submit">{% trans primary_action %}</button>
</div>
</form>
{% endblock %}
</div>

View File

@ -200,6 +200,7 @@ input[data-is-monospace] {
/* Form with user */ /* Form with user */
.form-control-static { .form-control-static {
margin-top: var(--pf-global--spacer--sm);
display: flex; display: flex;
align-items: center; align-items: center;
justify-content: space-between; justify-content: space-between;
@ -209,10 +210,23 @@ input[data-is-monospace] {
align-items: center; align-items: center;
} }
.form-control-static img { .form-control-static img {
margin-right: 5px; margin-right: var(--pf-global--spacer--xs);
} }
.form-control-static a { .form-control-static a {
padding-top: 3px; padding-top: var(--pf-global--spacer--xs);
padding-bottom: 3px; padding-bottom: var(--pf-global--spacer--xs);
line-height: 32px; line-height: var(--pf-global--spacer--xl);
}
/* Static OTP Tokens, passbook.stages.otp_static */
.pb-otp-tokens {
list-style: circle;
columns: 2;
-webkit-columns: 2;
-moz-columns: 2;
margin-left: var(--pf-global--spacer--xs);
}
.pb-otp-tokens li {
font-size: var(--pf-global--FontSize--2xl);
font-family: monospace;
} }

View File

@ -4037,10 +4037,10 @@ paths:
required: true required: true
type: string type: string
format: uuid format: uuid
/stages/otp/: /stages/otp_static/:
get: get:
operationId: stages_otp_list operationId: stages_otp_static_list
description: OTPStage Viewset description: OTPStaticStage Viewset
parameters: parameters:
- name: ordering - name: ordering
in: query in: query
@ -4084,73 +4084,73 @@ paths:
results: results:
type: array type: array
items: items:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
tags: tags:
- stages - stages
post: post:
operationId: stages_otp_create operationId: stages_otp_static_create
description: OTPStage Viewset description: OTPStaticStage Viewset
parameters: parameters:
- name: data - name: data
in: body in: body
required: true required: true
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
responses: responses:
'201': '201':
description: '' description: ''
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
tags: tags:
- stages - stages
parameters: [] parameters: []
/stages/otp/{stage_uuid}/: /stages/otp_static/{stage_uuid}/:
get: get:
operationId: stages_otp_read operationId: stages_otp_static_read
description: OTPStage Viewset description: OTPStaticStage Viewset
parameters: [] parameters: []
responses: responses:
'200': '200':
description: '' description: ''
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
tags: tags:
- stages - stages
put: put:
operationId: stages_otp_update operationId: stages_otp_static_update
description: OTPStage Viewset description: OTPStaticStage Viewset
parameters: parameters:
- name: data - name: data
in: body in: body
required: true required: true
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
responses: responses:
'200': '200':
description: '' description: ''
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
tags: tags:
- stages - stages
patch: patch:
operationId: stages_otp_partial_update operationId: stages_otp_static_partial_update
description: OTPStage Viewset description: OTPStaticStage Viewset
parameters: parameters:
- name: data - name: data
in: body in: body
required: true required: true
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
responses: responses:
'200': '200':
description: '' description: ''
schema: schema:
$ref: '#/definitions/OTPStage' $ref: '#/definitions/OTPStaticStage'
tags: tags:
- stages - stages
delete: delete:
operationId: stages_otp_delete operationId: stages_otp_static_delete
description: OTPStage Viewset description: OTPStaticStage Viewset
parameters: [] parameters: []
responses: responses:
'204': '204':
@ -4160,7 +4160,261 @@ paths:
parameters: parameters:
- name: stage_uuid - name: stage_uuid
in: path in: path
description: A UUID string identifying this OTP Stage. description: A UUID string identifying this OTP Static Setup Stage.
required: true
type: string
format: uuid
/stages/otp_time/:
get:
operationId: stages_otp_time_list
description: OTPTimeStage Viewset
parameters:
- name: ordering
in: query
description: Which field to use when ordering the results.
required: false
type: string
- name: search
in: query
description: A search term.
required: false
type: string
- name: limit
in: query
description: Number of results to return per page.
required: false
type: integer
- name: offset
in: query
description: The initial index from which to return the results.
required: false
type: integer
responses:
'200':
description: ''
schema:
required:
- count
- results
type: object
properties:
count:
type: integer
next:
type: string
format: uri
x-nullable: true
previous:
type: string
format: uri
x-nullable: true
results:
type: array
items:
$ref: '#/definitions/OTPTimeStage'
tags:
- stages
post:
operationId: stages_otp_time_create
description: OTPTimeStage Viewset
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/OTPTimeStage'
responses:
'201':
description: ''
schema:
$ref: '#/definitions/OTPTimeStage'
tags:
- stages
parameters: []
/stages/otp_time/{stage_uuid}/:
get:
operationId: stages_otp_time_read
description: OTPTimeStage Viewset
parameters: []
responses:
'200':
description: ''
schema:
$ref: '#/definitions/OTPTimeStage'
tags:
- stages
put:
operationId: stages_otp_time_update
description: OTPTimeStage Viewset
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/OTPTimeStage'
responses:
'200':
description: ''
schema:
$ref: '#/definitions/OTPTimeStage'
tags:
- stages
patch:
operationId: stages_otp_time_partial_update
description: OTPTimeStage Viewset
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/OTPTimeStage'
responses:
'200':
description: ''
schema:
$ref: '#/definitions/OTPTimeStage'
tags:
- stages
delete:
operationId: stages_otp_time_delete
description: OTPTimeStage Viewset
parameters: []
responses:
'204':
description: ''
tags:
- stages
parameters:
- name: stage_uuid
in: path
description: A UUID string identifying this OTP Time (TOTP) Setup Stage.
required: true
type: string
format: uuid
/stages/otp_validate/:
get:
operationId: stages_otp_validate_list
description: OTPValidateStage Viewset
parameters:
- name: ordering
in: query
description: Which field to use when ordering the results.
required: false
type: string
- name: search
in: query
description: A search term.
required: false
type: string
- name: limit
in: query
description: Number of results to return per page.
required: false
type: integer
- name: offset
in: query
description: The initial index from which to return the results.
required: false
type: integer
responses:
'200':
description: ''
schema:
required:
- count
- results
type: object
properties:
count:
type: integer
next:
type: string
format: uri
x-nullable: true
previous:
type: string
format: uri
x-nullable: true
results:
type: array
items:
$ref: '#/definitions/OTPValidateStage'
tags:
- stages
post:
operationId: stages_otp_validate_create
description: OTPValidateStage Viewset
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/OTPValidateStage'
responses:
'201':
description: ''
schema:
$ref: '#/definitions/OTPValidateStage'
tags:
- stages
parameters: []
/stages/otp_validate/{stage_uuid}/:
get:
operationId: stages_otp_validate_read
description: OTPValidateStage Viewset
parameters: []
responses:
'200':
description: ''
schema:
$ref: '#/definitions/OTPValidateStage'
tags:
- stages
put:
operationId: stages_otp_validate_update
description: OTPValidateStage Viewset
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/OTPValidateStage'
responses:
'200':
description: ''
schema:
$ref: '#/definitions/OTPValidateStage'
tags:
- stages
patch:
operationId: stages_otp_validate_partial_update
description: OTPValidateStage Viewset
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/OTPValidateStage'
responses:
'200':
description: ''
schema:
$ref: '#/definitions/OTPValidateStage'
tags:
- stages
delete:
operationId: stages_otp_validate_delete
description: OTPValidateStage Viewset
parameters: []
responses:
'204':
description: ''
tags:
- stages
parameters:
- name: stage_uuid
in: path
description: A UUID string identifying this OTP Validation Stage.
required: true required: true
type: string type: string
format: uuid format: uuid
@ -6350,7 +6604,47 @@ definitions:
fixed_data: fixed_data:
title: Fixed data title: Fixed data
type: object type: object
OTPStage: OTPStaticStage:
required:
- name
type: object
properties:
pk:
title: Stage uuid
type: string
format: uuid
readOnly: true
name:
title: Name
type: string
minLength: 1
token_count:
title: Token count
type: integer
maximum: 2147483647
minimum: -2147483648
OTPTimeStage:
required:
- name
- digits
type: object
properties:
pk:
title: Stage uuid
type: string
format: uuid
readOnly: true
name:
title: Name
type: string
minLength: 1
digits:
title: Digits
type: integer
enum:
- 6
- 8
OTPValidateStage:
required: required:
- name - name
type: object type: object
@ -6364,10 +6658,6 @@ definitions:
title: Name title: Name
type: string type: string
minLength: 1 minLength: 1
enforced:
title: Enforced
description: Enforce enabled OTP for Users this stage applies to.
type: boolean
PasswordStage: PasswordStage:
required: required:
- name - name