stages/password: add failed_attempts_before_cancel to cancel a flow after x failed entries
This commit is contained in:
parent
d30abc64d0
commit
83408b6ae0
|
@ -340,7 +340,6 @@ class BaseGrantModel(models.Model):
|
|||
abstract = True
|
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
class AuthorizationCode(ExpiringModel, BaseGrantModel):
|
||||
"""OAuth2 Authorization Code"""
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@ class PasswordStageSerializer(ModelSerializer):
|
|||
"pk",
|
||||
"name",
|
||||
"backends",
|
||||
"change_flow",
|
||||
"failed_attempts_before_cancel",
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ class PasswordStageForm(forms.ModelForm):
|
|||
class Meta:
|
||||
|
||||
model = PasswordStage
|
||||
fields = ["name", "backends", "change_flow"]
|
||||
fields = ["name", "backends", "change_flow", "failed_attempts_before_cancel"]
|
||||
widgets = {
|
||||
"name": forms.TextInput(),
|
||||
"backends": FilteredSelectMultiple(
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
# Generated by Django 3.1.1 on 2020-09-18 23:48
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("passbook_stages_password", "0002_passwordstage_change_flow"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="passwordstage",
|
||||
name="failed_attempts_before_cancel",
|
||||
field=models.IntegerField(
|
||||
default=5,
|
||||
help_text="How many attempts a user has before the flow is canceled. To lock the user out, use a reputation policy and a user_write stage.",
|
||||
),
|
||||
),
|
||||
]
|
|
@ -22,6 +22,15 @@ class PasswordStage(Stage):
|
|||
models.TextField(),
|
||||
help_text=_("Selection of backends to test the password against."),
|
||||
)
|
||||
failed_attempts_before_cancel = models.IntegerField(
|
||||
default=5,
|
||||
help_text=_(
|
||||
(
|
||||
"How many attempts a user has before the flow is canceled. "
|
||||
"To lock the user out, use a reputation policy and a user_write stage."
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
change_flow = models.ForeignKey(
|
||||
Flow,
|
||||
|
|
|
@ -17,9 +17,11 @@ from passbook.flows.planner import PLAN_CONTEXT_PENDING_USER
|
|||
from passbook.flows.stage import StageView
|
||||
from passbook.lib.utils.reflection import path_to_class
|
||||
from passbook.stages.password.forms import PasswordForm
|
||||
from passbook.stages.password.models import PasswordStage
|
||||
|
||||
LOGGER = get_logger()
|
||||
PLAN_CONTEXT_AUTHENTICATION_BACKEND = "user_backend"
|
||||
SESSION_INVALID_TRIES = "user_invalid_tries"
|
||||
|
||||
|
||||
def authenticate(
|
||||
|
@ -71,6 +73,20 @@ class PasswordStageView(FormView, StageView):
|
|||
kwargs["recovery_flow"] = recovery_flow.first()
|
||||
return kwargs
|
||||
|
||||
def form_invalid(self, form: PasswordForm) -> HttpResponse:
|
||||
if SESSION_INVALID_TRIES not in self.request.session:
|
||||
self.request.session[SESSION_INVALID_TRIES] = 0
|
||||
self.request.session[SESSION_INVALID_TRIES] += 1
|
||||
current_stage: PasswordStage = self.executor.current_stage
|
||||
if (
|
||||
self.request.session[SESSION_INVALID_TRIES]
|
||||
> current_stage.failed_attempts_before_cancel
|
||||
):
|
||||
LOGGER.debug("User has exceeded maximum tries")
|
||||
del self.request.session[SESSION_INVALID_TRIES]
|
||||
return self.executor.stage_invalid()
|
||||
return super().form_invalid(form)
|
||||
|
||||
def form_valid(self, form: PasswordForm) -> HttpResponse:
|
||||
"""Authenticate against django's authentication backend"""
|
||||
if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
|
||||
|
|
|
@ -131,6 +131,37 @@ class TestPasswordStage(TestCase):
|
|||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_invalid_password_lockout(self):
|
||||
"""Test with a valid pending user and invalid password (trigger logout counter)"""
|
||||
plan = FlowPlan(
|
||||
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
|
||||
)
|
||||
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
|
||||
session = self.client.session
|
||||
session[SESSION_KEY_PLAN] = plan
|
||||
session.save()
|
||||
|
||||
for _ in range(self.stage.failed_attempts_before_cancel):
|
||||
response = self.client.post(
|
||||
reverse(
|
||||
"passbook_flows:flow-executor", kwargs={"flow_slug": self.flow.slug}
|
||||
),
|
||||
# Form data
|
||||
{"password": self.password + "test"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse(
|
||||
"passbook_flows:flow-executor", kwargs={"flow_slug": self.flow.slug}
|
||||
),
|
||||
# Form data
|
||||
{"password": self.password + "test"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
# To ensure the plan has been cancelled, check SESSION_KEY_PLAN
|
||||
self.assertNotIn(SESSION_KEY_PLAN, self.client.session)
|
||||
|
||||
@patch(
|
||||
"passbook.flows.views.to_stage_response", TO_STAGE_RESPONSE_MOCK,
|
||||
)
|
||||
|
|
Reference in a new issue