flows: add additional sentry spans to flow executor

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-12-12 20:01:01 +01:00
parent 4e2457560d
commit 6efc7578ef

View file

@ -19,6 +19,7 @@ from drf_spectacular.utils import OpenApiParameter, PolymorphicProxySerializer,
from rest_framework.permissions import AllowAny
from rest_framework.views import APIView
from sentry_sdk import capture_exception
from sentry_sdk.hub import Hub
from structlog.stdlib import BoundLogger, get_logger
from authentik.core.models import USER_ATTRIBUTE_DEBUG
@ -156,73 +157,77 @@ class FlowExecutorView(APIView):
# pylint: disable=unused-argument, too-many-return-statements
def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
get_params = QueryDict(request.GET.get("query", ""))
if QS_KEY_TOKEN in get_params:
plan = self._check_flow_token(get_params)
if plan:
self.request.session[SESSION_KEY_PLAN] = plan
# Early check if there's an active Plan for the current session
if SESSION_KEY_PLAN in self.request.session:
self.plan = self.request.session[SESSION_KEY_PLAN]
if self.plan.flow_pk != self.flow.pk.hex:
self._logger.warning(
"f(exec): Found existing plan for other flow, deleting plan",
)
# Existing plan is deleted from session and instance
self.plan = None
self.cancel()
self._logger.debug("f(exec): Continuing existing plan")
with Hub.current.start_span(op="flow.executor.dispatch") as span:
span.set_data("flow", self.flow.flow_uuid)
get_params = QueryDict(request.GET.get("query", ""))
if QS_KEY_TOKEN in get_params:
plan = self._check_flow_token(get_params)
if plan:
self.request.session[SESSION_KEY_PLAN] = plan
# Early check if there's an active Plan for the current session
if SESSION_KEY_PLAN in self.request.session:
self.plan = self.request.session[SESSION_KEY_PLAN]
if self.plan.flow_pk != self.flow.pk.hex:
self._logger.warning(
"f(exec): Found existing plan for other flow, deleting plan",
)
# Existing plan is deleted from session and instance
self.plan = None
self.cancel()
self._logger.debug("f(exec): Continuing existing plan")
# Don't check session again as we've either already loaded the plan or we need to plan
if not self.plan:
request.session[SESSION_KEY_HISTORY] = []
self._logger.debug("f(exec): No active Plan found, initiating planner")
# Don't check session again as we've either already loaded the plan or we need to plan
if not self.plan:
request.session[SESSION_KEY_HISTORY] = []
self._logger.debug("f(exec): No active Plan found, initiating planner")
try:
self.plan = self._initiate_plan()
except FlowNonApplicableException as exc:
self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
return to_stage_response(self.request, self.handle_invalid_flow(exc))
except EmptyFlowException as exc:
self._logger.warning("f(exec): Flow is empty", exc=exc)
# To match behaviour with loading an empty flow plan from cache,
# we don't show an error message here, but rather call _flow_done()
return self._flow_done()
# Initial flow request, check if we have an upstream query string passed in
request.session[SESSION_KEY_GET] = get_params
# We don't save the Plan after getting the next stage
# as it hasn't been successfully passed yet
try:
self.plan = self._initiate_plan()
except FlowNonApplicableException as exc:
self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
return to_stage_response(self.request, self.handle_invalid_flow(exc))
except EmptyFlowException as exc:
self._logger.warning("f(exec): Flow is empty", exc=exc)
# To match behaviour with loading an empty flow plan from cache,
# we don't show an error message here, but rather call _flow_done()
# This is the first time we actually access any attribute on the selected plan
# if the cached plan is from an older version, it might have different attributes
# in which case we just delete the plan and invalidate everything
next_binding = self.plan.next(self.request)
except Exception as exc: # pylint: disable=broad-except
self._logger.warning(
"f(exec): found incompatible flow plan, invalidating run", exc=exc
)
cache.delete_pattern("flow_*")
return self.stage_invalid()
if not next_binding:
self._logger.debug("f(exec): no more stages, flow is done.")
return self._flow_done()
# Initial flow request, check if we have an upstream query string passed in
request.session[SESSION_KEY_GET] = get_params
# We don't save the Plan after getting the next stage
# as it hasn't been successfully passed yet
try:
# This is the first time we actually access any attribute on the selected plan
# if the cached plan is from an older version, it might have different attributes
# in which case we just delete the plan and invalidate everything
next_binding = self.plan.next(self.request)
except Exception as exc: # pylint: disable=broad-except
self._logger.warning("f(exec): found incompatible flow plan, invalidating run", exc=exc)
cache.delete_pattern("flow_*")
return self.stage_invalid()
if not next_binding:
self._logger.debug("f(exec): no more stages, flow is done.")
return self._flow_done()
self.current_binding = next_binding
self.current_stage = next_binding.stage
self._logger.debug(
"f(exec): Current stage",
current_stage=self.current_stage,
flow_slug=self.flow.slug,
)
try:
stage_cls = self.current_stage.type
except NotImplementedError as exc:
self._logger.debug("Error getting stage type", exc=exc)
return self.stage_invalid()
self.current_stage_view = stage_cls(self)
self.current_stage_view.args = self.args
self.current_stage_view.kwargs = self.kwargs
self.current_stage_view.request = request
try:
return super().dispatch(request)
except InvalidStageError as exc:
return self.stage_invalid(str(exc))
self.current_binding = next_binding
self.current_stage = next_binding.stage
self._logger.debug(
"f(exec): Current stage",
current_stage=self.current_stage,
flow_slug=self.flow.slug,
)
try:
stage_cls = self.current_stage.type
except NotImplementedError as exc:
self._logger.debug("Error getting stage type", exc=exc)
return self.stage_invalid()
self.current_stage_view = stage_cls(self)
self.current_stage_view.args = self.args
self.current_stage_view.kwargs = self.kwargs
self.current_stage_view.request = request
try:
return super().dispatch(request)
except InvalidStageError as exc:
return self.stage_invalid(str(exc))
def handle_exception(self, exc: Exception) -> HttpResponse:
"""Handle exception in stage execution"""
@ -264,8 +269,12 @@ class FlowExecutorView(APIView):
stage=self.current_stage,
)
try:
stage_response = self.current_stage_view.get(request, *args, **kwargs)
return to_stage_response(request, stage_response)
with Hub.current.start_span(op="flow.executor.stage") as span:
span.set_data("method", "get")
span.set_data("stage", self.current_stage_view)
span.set_data("flow", self.flow.flow_uuid)
stage_response = self.current_stage_view.get(request, *args, **kwargs)
return to_stage_response(request, stage_response)
except Exception as exc: # pylint: disable=broad-except
return self.handle_exception(exc)
@ -301,8 +310,12 @@ class FlowExecutorView(APIView):
stage=self.current_stage,
)
try:
stage_response = self.current_stage_view.post(request, *args, **kwargs)
return to_stage_response(request, stage_response)
with Hub.current.start_span(op="flow.executor.stage") as span:
span.set_data("method", "post")
span.set_data("stage", self.current_stage_view)
span.set_data("flow", self.flow.flow_uuid)
stage_response = self.current_stage_view.post(request, *args, **kwargs)
return to_stage_response(request, stage_response)
except Exception as exc: # pylint: disable=broad-except
return self.handle_exception(exc)