350 lines
14 KiB
Python
350 lines
14 KiB
Python
"""flow views tests"""
|
|
from unittest.mock import MagicMock, PropertyMock, patch
|
|
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.shortcuts import reverse
|
|
from django.test import Client, TestCase
|
|
from django.utils.encoding import force_str
|
|
|
|
from passbook.flows.exceptions import EmptyFlowException, FlowNonApplicableException
|
|
from passbook.flows.markers import ReevaluateMarker, StageMarker
|
|
from passbook.flows.models import Flow, FlowDesignation, FlowStageBinding
|
|
from passbook.flows.planner import FlowPlan
|
|
from passbook.flows.views import NEXT_ARG_NAME, SESSION_KEY_PLAN
|
|
from passbook.lib.config import CONFIG
|
|
from passbook.policies.dummy.models import DummyPolicy
|
|
from passbook.policies.http import AccessDeniedResponse
|
|
from passbook.policies.models import PolicyBinding
|
|
from passbook.policies.types import PolicyResult
|
|
from passbook.stages.dummy.models import DummyStage
|
|
|
|
POLICY_RETURN_FALSE = PropertyMock(return_value=PolicyResult(False))
|
|
POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
|
|
|
|
|
|
def to_stage_response(request: HttpRequest, source: HttpResponse):
|
|
"""Mock for to_stage_response that returns the original response, so we can check
|
|
inheritance and member attributes"""
|
|
return source
|
|
|
|
|
|
TO_STAGE_RESPONSE_MOCK = MagicMock(side_effect=to_stage_response)
|
|
|
|
|
|
class TestFlowExecutor(TestCase):
|
|
"""Test views logic"""
|
|
|
|
def setUp(self):
|
|
self.client = Client()
|
|
|
|
def test_existing_plan_diff_flow(self):
|
|
"""Check that a plan for a different flow cancels the current plan"""
|
|
flow = Flow.objects.create(
|
|
name="test-existing-plan-diff",
|
|
slug="test-existing-plan-diff",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
stage = DummyStage.objects.create(name="dummy")
|
|
plan = FlowPlan(
|
|
flow_pk=flow.pk.hex + "a", stages=[stage], markers=[StageMarker()]
|
|
)
|
|
session = self.client.session
|
|
session[SESSION_KEY_PLAN] = plan
|
|
session.save()
|
|
|
|
cancel_mock = MagicMock()
|
|
with patch("passbook.flows.views.FlowExecutorView.cancel", cancel_mock):
|
|
response = self.client.get(
|
|
reverse(
|
|
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
|
|
),
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(cancel_mock.call_count, 2)
|
|
|
|
@patch(
|
|
"passbook.flows.views.to_stage_response", TO_STAGE_RESPONSE_MOCK,
|
|
)
|
|
@patch(
|
|
"passbook.policies.engine.PolicyEngine.result", POLICY_RETURN_FALSE,
|
|
)
|
|
def test_invalid_non_applicable_flow(self):
|
|
"""Tests that a non-applicable flow returns the correct error message"""
|
|
flow = Flow.objects.create(
|
|
name="test-non-applicable",
|
|
slug="test-non-applicable",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
|
|
CONFIG.update_from_dict({"domain": "testserver"})
|
|
response = self.client.get(
|
|
reverse("passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}),
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIsInstance(response, AccessDeniedResponse)
|
|
self.assertInHTML(FlowNonApplicableException.__doc__, response.rendered_content)
|
|
|
|
@patch(
|
|
"passbook.flows.views.to_stage_response", TO_STAGE_RESPONSE_MOCK,
|
|
)
|
|
def test_invalid_empty_flow(self):
|
|
"""Tests that an empty flow returns the correct error message"""
|
|
flow = Flow.objects.create(
|
|
name="test-empty",
|
|
slug="test-empty",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
|
|
CONFIG.update_from_dict({"domain": "testserver"})
|
|
response = self.client.get(
|
|
reverse("passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}),
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIsInstance(response, AccessDeniedResponse)
|
|
self.assertInHTML(EmptyFlowException.__doc__, response.rendered_content)
|
|
|
|
def test_invalid_flow_redirect(self):
|
|
"""Tests that an invalid flow still redirects"""
|
|
flow = Flow.objects.create(
|
|
name="test-empty",
|
|
slug="test-empty",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
|
|
CONFIG.update_from_dict({"domain": "testserver"})
|
|
dest = "/unique-string"
|
|
url = reverse("passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug})
|
|
response = self.client.get(url + f"?{NEXT_ARG_NAME}={dest}")
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertJSONEqual(
|
|
force_str(response.content), {"type": "redirect", "to": dest},
|
|
)
|
|
|
|
def test_multi_stage_flow(self):
|
|
"""Test a full flow with multiple stages"""
|
|
flow = Flow.objects.create(
|
|
name="test-full",
|
|
slug="test-full",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
|
|
)
|
|
FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy2"), order=1
|
|
)
|
|
|
|
exec_url = reverse(
|
|
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
|
|
)
|
|
# First Request, start planning, renders form
|
|
response = self.client.get(exec_url)
|
|
self.assertEqual(response.status_code, 200)
|
|
# Check that two stages are in plan
|
|
session = self.client.session
|
|
plan: FlowPlan = session[SESSION_KEY_PLAN]
|
|
self.assertEqual(len(plan.stages), 2)
|
|
# Second request, submit form, one stage left
|
|
response = self.client.post(exec_url)
|
|
# Second request redirects to the same URL
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, exec_url)
|
|
# Check that two stages are in plan
|
|
session = self.client.session
|
|
plan: FlowPlan = session[SESSION_KEY_PLAN]
|
|
self.assertEqual(len(plan.stages), 1)
|
|
|
|
def test_reevaluate_remove_last(self):
|
|
"""Test planner with re-evaluate (last stage is removed)"""
|
|
flow = Flow.objects.create(
|
|
name="test-default-context",
|
|
slug="test-default-context",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
|
|
|
|
binding = FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
|
|
)
|
|
binding2 = FlowStageBinding.objects.create(
|
|
target=flow,
|
|
stage=DummyStage.objects.create(name="dummy2"),
|
|
order=1,
|
|
re_evaluate_policies=True,
|
|
)
|
|
|
|
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
|
|
|
|
# Here we patch the dummy policy to evaluate to true so the stage is included
|
|
with patch(
|
|
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
|
|
):
|
|
|
|
exec_url = reverse(
|
|
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
|
|
)
|
|
# First request, run the planner
|
|
response = self.client.get(exec_url)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
|
|
|
|
self.assertEqual(plan.stages[0], binding.stage)
|
|
self.assertEqual(plan.stages[1], binding2.stage)
|
|
|
|
self.assertIsInstance(plan.markers[0], StageMarker)
|
|
self.assertIsInstance(plan.markers[1], ReevaluateMarker)
|
|
|
|
# Second request, this passes the first dummy stage
|
|
response = self.client.post(exec_url)
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
# third request, this should trigger the re-evaluate
|
|
# We do this request without the patch, so the policy results in false
|
|
response = self.client.post(exec_url)
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(response.url, reverse("passbook_core:overview"))
|
|
|
|
def test_reevaluate_remove_middle(self):
|
|
"""Test planner with re-evaluate (middle stage is removed)"""
|
|
flow = Flow.objects.create(
|
|
name="test-default-context",
|
|
slug="test-default-context",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
|
|
|
|
binding = FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
|
|
)
|
|
binding2 = FlowStageBinding.objects.create(
|
|
target=flow,
|
|
stage=DummyStage.objects.create(name="dummy2"),
|
|
order=1,
|
|
re_evaluate_policies=True,
|
|
)
|
|
binding3 = FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy3"), order=2
|
|
)
|
|
|
|
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
|
|
|
|
# Here we patch the dummy policy to evaluate to true so the stage is included
|
|
with patch(
|
|
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
|
|
):
|
|
|
|
exec_url = reverse(
|
|
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
|
|
)
|
|
# First request, run the planner
|
|
response = self.client.get(exec_url)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
|
|
|
|
self.assertEqual(plan.stages[0], binding.stage)
|
|
self.assertEqual(plan.stages[1], binding2.stage)
|
|
self.assertEqual(plan.stages[2], binding3.stage)
|
|
|
|
self.assertIsInstance(plan.markers[0], StageMarker)
|
|
self.assertIsInstance(plan.markers[1], ReevaluateMarker)
|
|
self.assertIsInstance(plan.markers[2], StageMarker)
|
|
|
|
# Second request, this passes the first dummy stage
|
|
response = self.client.post(exec_url)
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
|
|
|
|
self.assertEqual(plan.stages[0], binding2.stage)
|
|
self.assertEqual(plan.stages[1], binding3.stage)
|
|
|
|
self.assertIsInstance(plan.markers[0], StageMarker)
|
|
self.assertIsInstance(plan.markers[1], StageMarker)
|
|
|
|
# third request, this should trigger the re-evaluate
|
|
# We do this request without the patch, so the policy results in false
|
|
response = self.client.post(exec_url)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertJSONEqual(
|
|
force_str(response.content),
|
|
{"type": "redirect", "to": reverse("passbook_core:overview")},
|
|
)
|
|
|
|
def test_reevaluate_remove_consecutive(self):
|
|
"""Test planner with re-evaluate (consecutive stages are removed)"""
|
|
flow = Flow.objects.create(
|
|
name="test-default-context",
|
|
slug="test-default-context",
|
|
designation=FlowDesignation.AUTHENTICATION,
|
|
)
|
|
false_policy = DummyPolicy.objects.create(result=False, wait_min=1, wait_max=2)
|
|
|
|
binding = FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy1"), order=0
|
|
)
|
|
binding2 = FlowStageBinding.objects.create(
|
|
target=flow,
|
|
stage=DummyStage.objects.create(name="dummy2"),
|
|
order=1,
|
|
re_evaluate_policies=True,
|
|
)
|
|
binding3 = FlowStageBinding.objects.create(
|
|
target=flow,
|
|
stage=DummyStage.objects.create(name="dummy3"),
|
|
order=2,
|
|
re_evaluate_policies=True,
|
|
)
|
|
binding4 = FlowStageBinding.objects.create(
|
|
target=flow, stage=DummyStage.objects.create(name="dummy4"), order=2
|
|
)
|
|
|
|
PolicyBinding.objects.create(policy=false_policy, target=binding2, order=0)
|
|
PolicyBinding.objects.create(policy=false_policy, target=binding3, order=0)
|
|
|
|
# Here we patch the dummy policy to evaluate to true so the stage is included
|
|
with patch(
|
|
"passbook.policies.dummy.models.DummyPolicy.passes", POLICY_RETURN_TRUE
|
|
):
|
|
|
|
exec_url = reverse(
|
|
"passbook_flows:flow-executor", kwargs={"flow_slug": flow.slug}
|
|
)
|
|
# First request, run the planner
|
|
response = self.client.get(exec_url)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn("dummy1", force_str(response.content))
|
|
|
|
plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
|
|
|
|
self.assertEqual(plan.stages[0], binding.stage)
|
|
self.assertEqual(plan.stages[1], binding2.stage)
|
|
self.assertEqual(plan.stages[2], binding3.stage)
|
|
self.assertEqual(plan.stages[3], binding4.stage)
|
|
|
|
self.assertIsInstance(plan.markers[0], StageMarker)
|
|
self.assertIsInstance(plan.markers[1], ReevaluateMarker)
|
|
self.assertIsInstance(plan.markers[2], ReevaluateMarker)
|
|
self.assertIsInstance(plan.markers[3], StageMarker)
|
|
|
|
# Second request, this passes the first dummy stage
|
|
response = self.client.post(exec_url)
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
# third request, this should trigger the re-evaluate
|
|
# A get request will evaluate the policies and this will return stage 4
|
|
# but it won't save it, hence we cant' check the plan
|
|
response = self.client.get(exec_url)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn("dummy4", force_str(response.content))
|
|
|
|
# fourth request, this confirms the last stage (dummy4)
|
|
# We do this request without the patch, so the policy results in false
|
|
response = self.client.post(exec_url)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertJSONEqual(
|
|
force_str(response.content),
|
|
{"type": "redirect", "to": reverse("passbook_core:overview")},
|
|
)
|