diff --git a/authentik/flows/views/executor.py b/authentik/flows/views/executor.py index 9a87d5eef..2b6937e70 100644 --- a/authentik/flows/views/executor.py +++ b/authentik/flows/views/executor.py @@ -19,6 +19,7 @@ from drf_spectacular.utils import OpenApiParameter, PolymorphicProxySerializer, from rest_framework.permissions import AllowAny from rest_framework.views import APIView from sentry_sdk import capture_exception +from sentry_sdk.hub import Hub from structlog.stdlib import BoundLogger, get_logger from authentik.core.models import USER_ATTRIBUTE_DEBUG @@ -156,73 +157,77 @@ class FlowExecutorView(APIView): # pylint: disable=unused-argument, too-many-return-statements def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: - get_params = QueryDict(request.GET.get("query", "")) - if QS_KEY_TOKEN in get_params: - plan = self._check_flow_token(get_params) - if plan: - self.request.session[SESSION_KEY_PLAN] = plan - # Early check if there's an active Plan for the current session - if SESSION_KEY_PLAN in self.request.session: - self.plan = self.request.session[SESSION_KEY_PLAN] - if self.plan.flow_pk != self.flow.pk.hex: - self._logger.warning( - "f(exec): Found existing plan for other flow, deleting plan", - ) - # Existing plan is deleted from session and instance - self.plan = None - self.cancel() - self._logger.debug("f(exec): Continuing existing plan") + with Hub.current.start_span(op="flow.executor.dispatch") as span: + span.set_data("flow", self.flow.flow_uuid) + get_params = QueryDict(request.GET.get("query", "")) + if QS_KEY_TOKEN in get_params: + plan = self._check_flow_token(get_params) + if plan: + self.request.session[SESSION_KEY_PLAN] = plan + # Early check if there's an active Plan for the current session + if SESSION_KEY_PLAN in self.request.session: + self.plan = self.request.session[SESSION_KEY_PLAN] + if self.plan.flow_pk != self.flow.pk.hex: + self._logger.warning( + "f(exec): Found existing plan for other flow, deleting plan", + ) + # Existing plan is deleted from session and instance + self.plan = None + self.cancel() + self._logger.debug("f(exec): Continuing existing plan") - # Don't check session again as we've either already loaded the plan or we need to plan - if not self.plan: - request.session[SESSION_KEY_HISTORY] = [] - self._logger.debug("f(exec): No active Plan found, initiating planner") + # Don't check session again as we've either already loaded the plan or we need to plan + if not self.plan: + request.session[SESSION_KEY_HISTORY] = [] + self._logger.debug("f(exec): No active Plan found, initiating planner") + try: + self.plan = self._initiate_plan() + except FlowNonApplicableException as exc: + self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) + return to_stage_response(self.request, self.handle_invalid_flow(exc)) + except EmptyFlowException as exc: + self._logger.warning("f(exec): Flow is empty", exc=exc) + # To match behaviour with loading an empty flow plan from cache, + # we don't show an error message here, but rather call _flow_done() + return self._flow_done() + # Initial flow request, check if we have an upstream query string passed in + request.session[SESSION_KEY_GET] = get_params + # We don't save the Plan after getting the next stage + # as it hasn't been successfully passed yet try: - self.plan = self._initiate_plan() - except FlowNonApplicableException as exc: - self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) - return to_stage_response(self.request, self.handle_invalid_flow(exc)) - except EmptyFlowException as exc: - self._logger.warning("f(exec): Flow is empty", exc=exc) - # To match behaviour with loading an empty flow plan from cache, - # we don't show an error message here, but rather call _flow_done() + # This is the first time we actually access any attribute on the selected plan + # if the cached plan is from an older version, it might have different attributes + # in which case we just delete the plan and invalidate everything + next_binding = self.plan.next(self.request) + except Exception as exc: # pylint: disable=broad-except + self._logger.warning( + "f(exec): found incompatible flow plan, invalidating run", exc=exc + ) + cache.delete_pattern("flow_*") + return self.stage_invalid() + if not next_binding: + self._logger.debug("f(exec): no more stages, flow is done.") return self._flow_done() - # Initial flow request, check if we have an upstream query string passed in - request.session[SESSION_KEY_GET] = get_params - # We don't save the Plan after getting the next stage - # as it hasn't been successfully passed yet - try: - # This is the first time we actually access any attribute on the selected plan - # if the cached plan is from an older version, it might have different attributes - # in which case we just delete the plan and invalidate everything - next_binding = self.plan.next(self.request) - except Exception as exc: # pylint: disable=broad-except - self._logger.warning("f(exec): found incompatible flow plan, invalidating run", exc=exc) - cache.delete_pattern("flow_*") - return self.stage_invalid() - if not next_binding: - self._logger.debug("f(exec): no more stages, flow is done.") - return self._flow_done() - self.current_binding = next_binding - self.current_stage = next_binding.stage - self._logger.debug( - "f(exec): Current stage", - current_stage=self.current_stage, - flow_slug=self.flow.slug, - ) - try: - stage_cls = self.current_stage.type - except NotImplementedError as exc: - self._logger.debug("Error getting stage type", exc=exc) - return self.stage_invalid() - self.current_stage_view = stage_cls(self) - self.current_stage_view.args = self.args - self.current_stage_view.kwargs = self.kwargs - self.current_stage_view.request = request - try: - return super().dispatch(request) - except InvalidStageError as exc: - return self.stage_invalid(str(exc)) + self.current_binding = next_binding + self.current_stage = next_binding.stage + self._logger.debug( + "f(exec): Current stage", + current_stage=self.current_stage, + flow_slug=self.flow.slug, + ) + try: + stage_cls = self.current_stage.type + except NotImplementedError as exc: + self._logger.debug("Error getting stage type", exc=exc) + return self.stage_invalid() + self.current_stage_view = stage_cls(self) + self.current_stage_view.args = self.args + self.current_stage_view.kwargs = self.kwargs + self.current_stage_view.request = request + try: + return super().dispatch(request) + except InvalidStageError as exc: + return self.stage_invalid(str(exc)) def handle_exception(self, exc: Exception) -> HttpResponse: """Handle exception in stage execution""" @@ -264,8 +269,12 @@ class FlowExecutorView(APIView): stage=self.current_stage, ) try: - stage_response = self.current_stage_view.get(request, *args, **kwargs) - return to_stage_response(request, stage_response) + with Hub.current.start_span(op="flow.executor.stage") as span: + span.set_data("method", "get") + span.set_data("stage", self.current_stage_view) + span.set_data("flow", self.flow.flow_uuid) + stage_response = self.current_stage_view.get(request, *args, **kwargs) + return to_stage_response(request, stage_response) except Exception as exc: # pylint: disable=broad-except return self.handle_exception(exc) @@ -301,8 +310,12 @@ class FlowExecutorView(APIView): stage=self.current_stage, ) try: - stage_response = self.current_stage_view.post(request, *args, **kwargs) - return to_stage_response(request, stage_response) + with Hub.current.start_span(op="flow.executor.stage") as span: + span.set_data("method", "post") + span.set_data("stage", self.current_stage_view) + span.set_data("flow", self.flow.flow_uuid) + stage_response = self.current_stage_view.post(request, *args, **kwargs) + return to_stage_response(request, stage_response) except Exception as exc: # pylint: disable=broad-except return self.handle_exception(exc)