stages/email: Fix query parameters getting lost in Email links (#5376)

* fix to email confirmation flow

* handled query keyerror

* rewrite using django's QueryDict, add tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix makefile

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix lint

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove commented out code

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Co-authored-by: Roney Dsilva <roney.dsilva@cdmx.in>
Co-authored-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Roney Dsilva 2023-10-19 21:22:27 +05:30 committed by GitHub
parent acad3c4d5c
commit f036820fd8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 121 additions and 10 deletions

View file

@ -56,9 +56,9 @@ test: ## Run the server tests and produce a coverage report (locally)
coverage report
lint-fix: ## Lint and automatically fix errors in the python source code. Reports spelling errors.
isort authentik $(PY_SOURCES)
black authentik $(PY_SOURCES)
ruff authentik $(PY_SOURCES)
isort $(PY_SOURCES)
black $(PY_SOURCES)
ruff $(PY_SOURCES)
codespell -w $(CODESPELL_ARGS)
lint: ## Lint the python and golang sources

View file

@ -3,8 +3,8 @@ from datetime import timedelta
from django.contrib import messages
from django.http import HttpRequest, HttpResponse
from django.http.request import QueryDict
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext as _
@ -15,7 +15,7 @@ from authentik.flows.challenge import Challenge, ChallengeResponse, ChallengeTyp
from authentik.flows.models import FlowDesignation, FlowToken
from authentik.flows.planner import PLAN_CONTEXT_IS_RESTORED, PLAN_CONTEXT_PENDING_USER
from authentik.flows.stage import ChallengeStageView
from authentik.flows.views.executor import QS_KEY_TOKEN
from authentik.flows.views.executor import QS_KEY_TOKEN, QS_QUERY
from authentik.stages.email.models import EmailStage
from authentik.stages.email.tasks import send_mails
from authentik.stages.email.utils import TemplateEmailMessage
@ -51,8 +51,22 @@ class EmailStageView(ChallengeStageView):
"authentik_core:if-flow",
kwargs={"flow_slug": self.executor.flow.slug},
)
relative_url = f"{base_url}?{urlencode(kwargs)}"
return self.request.build_absolute_uri(relative_url)
# Parse query string from current URL (full query string)
query_params = QueryDict(self.request.META.get("QUERY_STRING", ""), mutable=True)
query_params.pop(QS_KEY_TOKEN, None)
# Check for nested query string used by flow executor, and remove any
# kind of flow token from that
if QS_QUERY in query_params:
inner_query_params = QueryDict(query_params.get(QS_QUERY), mutable=True)
inner_query_params.pop(QS_KEY_TOKEN, None)
query_params[QS_QUERY] = inner_query_params.urlencode()
query_params.update(kwargs)
full_url = base_url
if len(query_params) > 0:
full_url = f"{full_url}?{query_params.urlencode()}"
return self.request.build_absolute_uri(full_url)
def get_token(self) -> FlowToken:
"""Get token"""
@ -146,6 +160,7 @@ class EmailStageView(ChallengeStageView):
messages.error(self.request, _("No pending user."))
return super().challenge_invalid(response)
self.send_email()
messages.success(self.request, _("Email Successfully sent."))
# We can't call stage_ok yet, as we're still waiting
# for the user to click the link in the email
return super().challenge_invalid(response)

View file

@ -4,6 +4,7 @@ from unittest.mock import MagicMock, PropertyMock, patch
from django.core import mail
from django.core.mail.backends.locmem import EmailBackend
from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend
from django.test import RequestFactory
from django.urls import reverse
from django.utils.http import urlencode
@ -12,10 +13,11 @@ from authentik.flows.markers import StageMarker
from authentik.flows.models import FlowDesignation, FlowStageBinding, FlowToken
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import QS_KEY_TOKEN, SESSION_KEY_PLAN
from authentik.flows.views.executor import QS_KEY_TOKEN, SESSION_KEY_PLAN, FlowExecutorView
from authentik.lib.config import CONFIG
from authentik.lib.generators import generate_id
from authentik.stages.email.models import EmailStage
from authentik.stages.email.stage import PLAN_CONTEXT_EMAIL_OVERRIDE
from authentik.stages.email.stage import PLAN_CONTEXT_EMAIL_OVERRIDE, EmailStageView
class TestEmailStage(FlowTestCase):
@ -23,8 +25,8 @@ class TestEmailStage(FlowTestCase):
def setUp(self):
super().setUp()
self.factory = RequestFactory()
self.user = create_test_admin_user()
self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
self.stage = EmailStage.objects.create(
name="email",
@ -205,3 +207,97 @@ class TestEmailStage(FlowTestCase):
self.assertEqual(response.status_code, 200)
self.assertStageResponse(response, component="ak-stage-access-denied")
def test_url_no_params(self):
"""Test generation of the URL in the EMail"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
request = self.factory.get(url)
stage_view = EmailStageView(
FlowExecutorView(
request=request,
flow=self.flow,
),
request=request,
)
self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/")
def test_url_our_params(self):
"""Test that all of our parameters are passed to the URL correctly"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
request = self.factory.get(url)
stage_view = EmailStageView(
FlowExecutorView(
request=request,
flow=self.flow,
),
request=request,
)
token = generate_id()
self.assertEqual(
stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}",
)
def test_url_existing_params(self):
"""Test to ensure that URL params are preserved in the URL being sent"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
url += "?foo=bar"
request = self.factory.get(url)
stage_view = EmailStageView(
FlowExecutorView(
request=request,
flow=self.flow,
),
request=request,
)
token = generate_id()
self.assertEqual(
stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}",
)
def test_url_existing_params_nested(self):
"""Test to ensure that URL params are preserved in the URL being sent (including nested)"""
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
url += "?foo=bar&"
url += "query=" + urlencode({"nested": "value"})
request = self.factory.get(url)
stage_view = EmailStageView(
FlowExecutorView(
request=request,
flow=self.flow,
),
request=request,
)
token = generate_id()
self.assertEqual(
stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
(
f"http://testserver/if/flow/{self.flow.slug}"
f"/?foo=bar&query=nested%3Dvalue&flow_token={token}"
),
)