root: extended flow and policy metrics (#7067)

This commit is contained in:
Jens L 2023-10-05 01:04:55 +02:00 committed by GitHub
parent 6792bf8876
commit 83f9eae654
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 25 deletions

View file

@ -81,7 +81,7 @@ dev-drop-db:
dropdb -U ${pg_user} -h ${pg_host} ${pg_name} dropdb -U ${pg_user} -h ${pg_host} ${pg_name}
# Also remove the test-db if it exists # Also remove the test-db if it exists
dropdb -U ${pg_user} -h ${pg_host} test_${pg_name} || true dropdb -U ${pg_user} -h ${pg_host} test_${pg_name} || true
echo redis-cli -n 0 flushall redis-cli -n 0 flushall
dev-create-db: dev-create-db:
createdb -U ${pg_user} -h ${pg_host} ${pg_name} createdb -U ${pg_user} -h ${pg_host} ${pg_name}

View file

@ -206,8 +206,8 @@ def prefill_task(func):
task_call_module=func.__module__, task_call_module=func.__module__,
task_call_func=func.__name__, task_call_func=func.__name__,
# We don't have real values for these attributes but they cannot be null # We don't have real values for these attributes but they cannot be null
start_timestamp=default_timer(), start_timestamp=0,
finish_timestamp=default_timer(), finish_timestamp=0,
finish_time=datetime.now(), finish_time=datetime.now(),
).save(86400) ).save(86400)
LOGGER.debug("prefilled task", task_name=func.__name__) LOGGER.debug("prefilled task", task_name=func.__name__)

View file

@ -8,6 +8,11 @@ GAUGE_FLOWS_CACHED = Gauge(
"authentik_flows_cached", "authentik_flows_cached",
"Cached flows", "Cached flows",
) )
HIST_FLOW_EXECUTION_STAGE_TIME = Histogram(
"authentik_flows_execution_stage_time",
"Duration each stage took to execute.",
["stage_type", "method"],
)
HIST_FLOWS_PLAN_TIME = Histogram( HIST_FLOWS_PLAN_TIME = Histogram(
"authentik_flows_plan_time", "authentik_flows_plan_time",
"Duration to build a plan for a flow", "Duration to build a plan for a flow",

View file

@ -24,6 +24,7 @@ from structlog.stdlib import BoundLogger, get_logger
from authentik.core.models import Application from authentik.core.models import Application
from authentik.events.models import Event, EventAction, cleanse_dict from authentik.events.models import Event, EventAction, cleanse_dict
from authentik.flows.apps import HIST_FLOW_EXECUTION_STAGE_TIME
from authentik.flows.challenge import ( from authentik.flows.challenge import (
Challenge, Challenge,
ChallengeResponse, ChallengeResponse,
@ -266,17 +267,21 @@ class FlowExecutorView(APIView):
) )
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Get the next pending challenge from the currently active flow.""" """Get the next pending challenge from the currently active flow."""
class_path = class_to_path(self.current_stage_view.__class__)
self._logger.debug( self._logger.debug(
"f(exec): Passing GET", "f(exec): Passing GET",
view_class=class_to_path(self.current_stage_view.__class__), view_class=class_path,
stage=self.current_stage, stage=self.current_stage,
) )
try: try:
with Hub.current.start_span( with Hub.current.start_span(
op="authentik.flow.executor.stage", op="authentik.flow.executor.stage",
description=class_to_path(self.current_stage_view.__class__), description=class_path,
) as span: ) as span, HIST_FLOW_EXECUTION_STAGE_TIME.labels(
span.set_data("Method", "GET") method=request.method.upper(),
stage_type=class_path,
).time():
span.set_data("Method", request.method.upper())
span.set_data("authentik Stage", self.current_stage_view) span.set_data("authentik Stage", self.current_stage_view)
span.set_data("authentik Flow", self.flow.slug) span.set_data("authentik Flow", self.flow.slug)
stage_response = self.current_stage_view.dispatch(request) stage_response = self.current_stage_view.dispatch(request)
@ -310,17 +315,21 @@ class FlowExecutorView(APIView):
) )
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Solve the previously retrieved challenge and advanced to the next stage.""" """Solve the previously retrieved challenge and advanced to the next stage."""
class_path = class_to_path(self.current_stage_view.__class__)
self._logger.debug( self._logger.debug(
"f(exec): Passing POST", "f(exec): Passing POST",
view_class=class_to_path(self.current_stage_view.__class__), view_class=class_path,
stage=self.current_stage, stage=self.current_stage,
) )
try: try:
with Hub.current.start_span( with Hub.current.start_span(
op="authentik.flow.executor.stage", op="authentik.flow.executor.stage",
description=class_to_path(self.current_stage_view.__class__), description=class_path,
) as span: ) as span, HIST_FLOW_EXECUTION_STAGE_TIME.labels(
span.set_data("Method", "POST") method=request.method.upper(),
stage_type=class_path,
).time():
span.set_data("Method", request.method.upper())
span.set_data("authentik Stage", self.current_stage_view) span.set_data("authentik Stage", self.current_stage_view)
span.set_data("authentik Flow", self.flow.slug) span.set_data("authentik Flow", self.flow.slug)
stage_response = self.current_stage_view.dispatch(request) stage_response = self.current_stage_view.dispatch(request)

View file

@ -7,7 +7,11 @@ GAUGE_POLICIES_CACHED = Gauge(
"authentik_policies_cached", "authentik_policies_cached",
"Cached Policies", "Cached Policies",
) )
HIST_POLICIES_ENGINE_TOTAL_TIME = Histogram(
"authentik_policies_engine_time_total_seconds",
"(Total) Duration the policy engine took to evaluate a result.",
["obj_type", "obj_pk"],
)
HIST_POLICIES_EXECUTION_TIME = Histogram( HIST_POLICIES_EXECUTION_TIME = Histogram(
"authentik_policies_execution_time", "authentik_policies_execution_time",
"Execution times for single policies", "Execution times for single policies",
@ -17,6 +21,7 @@ HIST_POLICIES_EXECUTION_TIME = Histogram(
"binding_target_name", "binding_target_name",
"object_pk", "object_pk",
"object_type", "object_type",
"mode",
], ],
) )

View file

@ -1,6 +1,7 @@
"""authentik policy engine""" """authentik policy engine"""
from multiprocessing import Pipe, current_process from multiprocessing import Pipe, current_process
from multiprocessing.connection import Connection from multiprocessing.connection import Connection
from timeit import default_timer
from typing import Iterator, Optional from typing import Iterator, Optional
from django.core.cache import cache from django.core.cache import cache
@ -10,6 +11,8 @@ from sentry_sdk.tracing import Span
from structlog.stdlib import BoundLogger, get_logger from structlog.stdlib import BoundLogger, get_logger
from authentik.core.models import User from authentik.core.models import User
from authentik.lib.utils.reflection import class_to_path
from authentik.policies.apps import HIST_POLICIES_ENGINE_TOTAL_TIME, HIST_POLICIES_EXECUTION_TIME
from authentik.policies.exceptions import PolicyEngineException from authentik.policies.exceptions import PolicyEngineException
from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel, PolicyEngineMode from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel, PolicyEngineMode
from authentik.policies.process import PolicyProcess, cache_key from authentik.policies.process import PolicyProcess, cache_key
@ -77,6 +80,33 @@ class PolicyEngine:
if binding.policy is not None and binding.policy.__class__ == Policy: if binding.policy is not None and binding.policy.__class__ == Policy:
raise PolicyEngineException(f"Policy '{binding.policy}' is root type") raise PolicyEngineException(f"Policy '{binding.policy}' is root type")
def _check_cache(self, binding: PolicyBinding):
if not self.use_cache:
return False
before = default_timer()
key = cache_key(binding, self.request)
cached_policy = cache.get(key, None)
duration = max(default_timer() - before, 0)
if not cached_policy:
return False
self.logger.debug(
"P_ENG: Taking result from cache",
binding=binding,
cache_key=key,
request=self.request,
)
HIST_POLICIES_EXECUTION_TIME.labels(
binding_order=binding.order,
binding_target_type=binding.target_type,
binding_target_name=binding.target_name,
object_pk=str(self.request.obj.pk),
object_type=class_to_path(self.request.obj.__class__),
mode="cache_retrieve",
).observe(duration)
# It's a bit silly to time this, but
self.__cached_policies.append(cached_policy)
return True
def build(self) -> "PolicyEngine": def build(self) -> "PolicyEngine":
"""Build wrapper which monitors performance""" """Build wrapper which monitors performance"""
with ( with (
@ -84,6 +114,10 @@ class PolicyEngine:
op="authentik.policy.engine.build", op="authentik.policy.engine.build",
description=self.__pbm, description=self.__pbm,
) as span, ) as span,
HIST_POLICIES_ENGINE_TOTAL_TIME.labels(
obj_type=class_to_path(self.__pbm.__class__),
obj_pk=str(self.__pbm.pk),
).time(),
): ):
span: Span span: Span
span.set_data("pbm", self.__pbm) span.set_data("pbm", self.__pbm)
@ -92,16 +126,7 @@ class PolicyEngine:
self.__expected_result_count += 1 self.__expected_result_count += 1
self._check_policy_type(binding) self._check_policy_type(binding)
key = cache_key(binding, self.request) if self._check_cache(binding):
cached_policy = cache.get(key, None)
if cached_policy and self.use_cache:
self.logger.debug(
"P_ENG: Taking result from cache",
binding=binding,
cache_key=key,
request=self.request,
)
self.__cached_policies.append(cached_policy)
continue continue
self.logger.debug("P_ENG: Evaluating policy", binding=binding, request=self.request) self.logger.debug("P_ENG: Evaluating policy", binding=binding, request=self.request)
our_end, task_end = Pipe(False) our_end, task_end = Pipe(False)

View file

@ -11,6 +11,7 @@ from structlog.stdlib import get_logger
from authentik.events.models import Event, EventAction from authentik.events.models import Event, EventAction
from authentik.lib.config import CONFIG from authentik.lib.config import CONFIG
from authentik.lib.utils.errors import exception_to_string from authentik.lib.utils.errors import exception_to_string
from authentik.lib.utils.reflection import class_to_path
from authentik.policies.apps import HIST_POLICIES_EXECUTION_TIME from authentik.policies.apps import HIST_POLICIES_EXECUTION_TIME
from authentik.policies.exceptions import PolicyException from authentik.policies.exceptions import PolicyException
from authentik.policies.models import PolicyBinding from authentik.policies.models import PolicyBinding
@ -128,9 +129,8 @@ class PolicyProcess(PROCESS_CLASS):
binding_target_type=self.binding.target_type, binding_target_type=self.binding.target_type,
binding_target_name=self.binding.target_name, binding_target_name=self.binding.target_name,
object_pk=str(self.request.obj.pk), object_pk=str(self.request.obj.pk),
object_type=( object_type=class_to_path(self.request.obj.__class__),
f"{self.request.obj._meta.app_label}.{self.request.obj._meta.model_name}" mode="execute_process",
),
).time(), ).time(),
): ):
span: Span span: Span