diff --git a/authentik/admin/api/tasks.py b/authentik/admin/api/tasks.py deleted file mode 100644 index 72714dad5..000000000 --- a/authentik/admin/api/tasks.py +++ /dev/null @@ -1,134 +0,0 @@ -"""Tasks API""" -from importlib import import_module - -from django.contrib import messages -from django.http.response import Http404 -from django.utils.translation import gettext_lazy as _ -from drf_spectacular.types import OpenApiTypes -from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema -from rest_framework.decorators import action -from rest_framework.fields import ( - CharField, - ChoiceField, - DateTimeField, - ListField, - SerializerMethodField, -) -from rest_framework.request import Request -from rest_framework.response import Response -from rest_framework.viewsets import ViewSet -from structlog.stdlib import get_logger - -from authentik.api.decorators import permission_required -from authentik.core.api.utils import PassiveSerializer -from authentik.events.monitored_tasks import TaskInfo, TaskResultStatus -from authentik.rbac.permissions import HasPermission - -LOGGER = get_logger() - - -class TaskSerializer(PassiveSerializer): - """Serialize TaskInfo and TaskResult""" - - task_name = CharField() - task_description = CharField() - task_finish_timestamp = DateTimeField(source="finish_time") - task_duration = SerializerMethodField() - - status = ChoiceField( - source="result.status.name", - choices=[(x.name, x.name) for x in TaskResultStatus], - ) - messages = ListField(source="result.messages") - - def get_task_duration(self, instance: TaskInfo) -> int: - """Get the duration a task took to run""" - return max(instance.finish_timestamp - instance.start_timestamp, 0) - - def to_representation(self, instance: TaskInfo): - """When a new version of authentik adds fields to TaskInfo, - the API will fail with an AttributeError, as the classes - are pickled in cache. In that case, just delete the info""" - try: - return super().to_representation(instance) - # pylint: disable=broad-except - except Exception: # pragma: no cover - if isinstance(self.instance, list): - for inst in self.instance: - inst.delete() - else: - self.instance.delete() - return {} - - -class TaskViewSet(ViewSet): - """Read-only view set that returns all background tasks""" - - permission_classes = [HasPermission("authentik_rbac.view_system_tasks")] - serializer_class = TaskSerializer - - @extend_schema( - responses={ - 200: TaskSerializer(many=False), - 404: OpenApiResponse(description="Task not found"), - }, - parameters=[ - OpenApiParameter( - "id", - type=OpenApiTypes.STR, - location=OpenApiParameter.PATH, - required=True, - ), - ], - ) - def retrieve(self, request: Request, pk=None) -> Response: - """Get a single system task""" - task = TaskInfo.by_name(pk) - if not task: - raise Http404 - return Response(TaskSerializer(task, many=False).data) - - @extend_schema(responses={200: TaskSerializer(many=True)}) - def list(self, request: Request) -> Response: - """List system tasks""" - tasks = sorted(TaskInfo.all().values(), key=lambda task: task.task_name) - return Response(TaskSerializer(tasks, many=True).data) - - @permission_required(None, ["authentik_rbac.run_system_tasks"]) - @extend_schema( - request=OpenApiTypes.NONE, - responses={ - 204: OpenApiResponse(description="Task retried successfully"), - 404: OpenApiResponse(description="Task not found"), - 500: OpenApiResponse(description="Failed to retry task"), - }, - parameters=[ - OpenApiParameter( - "id", - type=OpenApiTypes.STR, - location=OpenApiParameter.PATH, - required=True, - ), - ], - ) - @action(detail=True, methods=["post"]) - def retry(self, request: Request, pk=None) -> Response: - """Retry task""" - task = TaskInfo.by_name(pk) - if not task: - raise Http404 - try: - task_module = import_module(task.task_call_module) - task_func = getattr(task_module, task.task_call_func) - LOGGER.debug("Running task", task=task_func) - task_func.delay(*task.task_call_args, **task.task_call_kwargs) - messages.success( - self.request, - _("Successfully re-scheduled Task %(name)s!" % {"name": task.task_name}), - ) - return Response(status=204) - except (ImportError, AttributeError): # pragma: no cover - LOGGER.warning("Failed to run task, remove state", task=task) - # if we get an import error, the module path has probably changed - task.delete() - return Response(status=500) diff --git a/authentik/admin/signals.py b/authentik/admin/signals.py index f171fc74e..aa081724b 100644 --- a/authentik/admin/signals.py +++ b/authentik/admin/signals.py @@ -1,7 +1,6 @@ """admin signals""" from django.dispatch import receiver -from authentik.admin.api.tasks import TaskInfo from authentik.admin.apps import GAUGE_WORKERS from authentik.root.celery import CELERY_APP from authentik.root.monitoring import monitoring_set @@ -12,10 +11,3 @@ def monitoring_set_workers(sender, **kwargs): """Set worker gauge""" count = len(CELERY_APP.control.ping(timeout=0.5)) GAUGE_WORKERS.set(count) - - -@receiver(monitoring_set) -def monitoring_set_tasks(sender, **kwargs): - """Set task gauges""" - for task in TaskInfo.all().values(): - task.update_metrics() diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index c5b0ebf61..8db757d1f 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -11,12 +11,7 @@ from structlog.stdlib import get_logger from authentik import __version__, get_build_hash from authentik.admin.apps import PROM_INFO from authentik.events.models import Event, EventAction, Notification -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task from authentik.lib.config import CONFIG from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP @@ -60,7 +55,7 @@ def update_latest_version(self: MonitoredTask): """Update latest version info""" if CONFIG.get_bool("disable_update_check"): cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) - self.set_status(TaskResult(TaskResultStatus.WARNING, messages=["Version check disabled."])) + self.set_status(TaskStatus.WARNING, "Version check disabled.") return try: response = get_http_session().get( @@ -70,9 +65,7 @@ def update_latest_version(self: MonitoredTask): data = response.json() upstream_version = data.get("stable", {}).get("version") cache.set(VERSION_CACHE_KEY, upstream_version, VERSION_CACHE_TIMEOUT) - self.set_status( - TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated latest Version"]) - ) + self.set_status(TaskStatus.SUCCESSFUL, "Successfully updated latest Version") _set_prom_info() # Check if upstream version is newer than what we're running, # and if no event exists yet, create one. @@ -89,7 +82,7 @@ def update_latest_version(self: MonitoredTask): Event.new(EventAction.UPDATE_AVAILABLE, **event_dict).save() except (RequestException, IndexError) as exc: cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) _set_prom_info() diff --git a/authentik/admin/tests/test_api.py b/authentik/admin/tests/test_api.py index 8fcbfa286..8ef016b88 100644 --- a/authentik/admin/tests/test_api.py +++ b/authentik/admin/tests/test_api.py @@ -8,7 +8,7 @@ from authentik import __version__ from authentik.blueprints.tests import reconcile_app from authentik.core.models import Group, User from authentik.core.tasks import clean_expired_models -from authentik.events.monitored_tasks import TaskResultStatus +from authentik.events.monitored_tasks import TaskStatus from authentik.lib.generators import generate_id @@ -42,7 +42,7 @@ class TestAdminAPI(TestCase): ) self.assertEqual(response.status_code, 200) body = loads(response.content) - self.assertEqual(body["status"], TaskResultStatus.SUCCESSFUL.name) + self.assertEqual(body["status"], TaskStatus.SUCCESSFUL.name) self.assertEqual(body["task_name"], "clean_expired_models") response = self.client.get( reverse("authentik_api:admin_system_tasks-detail", kwargs={"pk": "qwerqwer"}) diff --git a/authentik/admin/urls.py b/authentik/admin/urls.py index fec51f5f7..6ef21e93e 100644 --- a/authentik/admin/urls.py +++ b/authentik/admin/urls.py @@ -4,12 +4,10 @@ from django.urls import path from authentik.admin.api.meta import AppsViewSet, ModelViewSet from authentik.admin.api.metrics import AdministrationMetricsViewSet from authentik.admin.api.system import SystemView -from authentik.admin.api.tasks import TaskViewSet from authentik.admin.api.version import VersionView from authentik.admin.api.workers import WorkerView api_urlpatterns = [ - ("admin/system_tasks", TaskViewSet, "admin_system_tasks"), ("admin/apps", AppsViewSet, "apps"), ("admin/models", ModelViewSet, "models"), path( diff --git a/authentik/blueprints/v1/tasks.py b/authentik/blueprints/v1/tasks.py index 686e4747c..33b517cb0 100644 --- a/authentik/blueprints/v1/tasks.py +++ b/authentik/blueprints/v1/tasks.py @@ -29,12 +29,8 @@ from authentik.blueprints.v1.common import BlueprintLoader, BlueprintMetadata, E from authentik.blueprints.v1.importer import Importer from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE from authentik.blueprints.v1.oci import OCI_PREFIX -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.events.utils import sanitize_dict from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -140,10 +136,7 @@ def blueprints_discovery(self: MonitoredTask, path: Optional[str] = None): check_blueprint_v1_file(blueprint) count += 1 self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - messages=[_("Successfully imported %(count)d files." % {"count": count})], - ) + TaskStatus.SUCCESSFUL, _("Successfully imported %(count)d files." % {"count": count}) ) @@ -196,18 +189,18 @@ def apply_blueprint(self: MonitoredTask, instance_pk: str): if not valid: instance.status = BlueprintInstanceStatus.ERROR instance.save() - self.set_status(TaskResult(TaskResultStatus.ERROR, [x["event"] for x in logs])) + self.set_status(TaskStatus.ERROR, *[x["event"] for x in logs]) return applied = importer.apply() if not applied: instance.status = BlueprintInstanceStatus.ERROR instance.save() - self.set_status(TaskResult(TaskResultStatus.ERROR, "Failed to apply")) + self.set_status(TaskStatus.ERROR, "Failed to apply") return instance.status = BlueprintInstanceStatus.SUCCESSFUL instance.last_applied_hash = file_hash instance.last_applied = now() - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL)) + self.set_status(TaskStatus.SUCCESSFUL) except ( DatabaseError, ProgrammingError, @@ -218,7 +211,7 @@ def apply_blueprint(self: MonitoredTask, instance_pk: str): ) as exc: if instance: instance.status = BlueprintInstanceStatus.ERROR - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) finally: if instance: instance.save() diff --git a/authentik/core/tasks.py b/authentik/core/tasks.py index 0b4c839e8..788cbc25e 100644 --- a/authentik/core/tasks.py +++ b/authentik/core/tasks.py @@ -13,12 +13,7 @@ from authentik.core.models import ( ExpiringModel, User, ) -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task from authentik.root.celery import CELERY_APP LOGGER = get_logger() @@ -54,7 +49,7 @@ def clean_expired_models(self: MonitoredTask): amount += 1 LOGGER.debug("Expired sessions", model=AuthenticatedSession, amount=amount) messages.append(f"Expired {amount} {AuthenticatedSession._meta.verbose_name_plural}") - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages)) + self.set_status(TaskStatus.SUCCESSFUL, *messages) @CELERY_APP.task(bind=True, base=MonitoredTask) @@ -75,4 +70,4 @@ def clean_temporary_users(self: MonitoredTask): user.delete() deleted_users += 1 messages.append(f"Successfully deleted {deleted_users} users.") - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages)) + self.set_status(TaskStatus.SUCCESSFUL, *messages) diff --git a/authentik/crypto/tasks.py b/authentik/crypto/tasks.py index 4a660ee81..02fda31a6 100644 --- a/authentik/crypto/tasks.py +++ b/authentik/crypto/tasks.py @@ -9,12 +9,8 @@ from django.utils.translation import gettext_lazy as _ from structlog.stdlib import get_logger from authentik.crypto.models import CertificateKeyPair -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -88,8 +84,5 @@ def certificate_discovery(self: MonitoredTask): if dirty: cert.save() self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - messages=[_("Successfully imported %(count)d files." % {"count": discovered})], - ) + TaskStatus.SUCCESSFUL, _("Successfully imported %(count)d files." % {"count": discovered}) ) diff --git a/authentik/events/api/tasks.py b/authentik/events/api/tasks.py new file mode 100644 index 000000000..f8482fecf --- /dev/null +++ b/authentik/events/api/tasks.py @@ -0,0 +1,101 @@ +"""Tasks API""" +from importlib import import_module + +from django.contrib import messages +from django.utils.translation import gettext_lazy as _ +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema +from rest_framework.decorators import action +from rest_framework.fields import ( + CharField, + ChoiceField, + DateTimeField, + ListField, + SerializerMethodField, +) +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.serializers import ModelSerializer +from rest_framework.viewsets import ReadOnlyModelViewSet +from structlog.stdlib import get_logger + +from authentik.api.decorators import permission_required +from authentik.events.models import SystemTask, TaskStatus + +LOGGER = get_logger() + + +class SystemTaskSerializer(ModelSerializer): + """Serialize TaskInfo and TaskResult""" + + task_name = CharField(source="name") + task_description = CharField(source="description") + task_start_timestamp = DateTimeField(source="start_timestamp") + task_finish_timestamp = DateTimeField(source="finish_timestamp") + task_duration = SerializerMethodField() + + status = ChoiceField( + # source="status", + choices=[(x.name, x.name) for x in TaskStatus], + ) + messages = ListField(child=CharField()) + + def get_task_duration(self, instance: SystemTask) -> int: + """Get the duration a task took to run""" + return max(instance.finish_timestamp.timestamp() - instance.start_timestamp.timestamp(), 0) + + class Meta: + model = SystemTask + fields = [ + "task_name", + "task_description", + "task_start_timestamp", + "task_finish_timestamp", + "task_duration", + "status", + "messages", + ] + + +class SystemTaskViewSet(ReadOnlyModelViewSet): + """Read-only view set that returns all background tasks""" + + queryset = SystemTask.objects.all() + serializer_class = SystemTaskSerializer + + @permission_required(None, ["authentik_events.rerun_task"]) + @extend_schema( + request=OpenApiTypes.NONE, + responses={ + 204: OpenApiResponse(description="Task retried successfully"), + 404: OpenApiResponse(description="Task not found"), + 500: OpenApiResponse(description="Failed to retry task"), + }, + parameters=[ + OpenApiParameter( + "id", + type=OpenApiTypes.STR, + location=OpenApiParameter.PATH, + required=True, + ), + ], + ) + @action(detail=True, methods=["post"]) + def retry(self, request: Request, pk=None) -> Response: + """Retry task""" + task = self.get_object() + try: + task_module = import_module(task.task_call_module) + task_func = getattr(task_module, task.task_call_func) + LOGGER.debug("Running task", task=task_func) + task_func.delay(*task.task_call_args, **task.task_call_kwargs) + messages.success( + self.request, + _("Successfully re-scheduled Task %(name)s!" % {"name": task.task_name}), + ) + return Response(status=204) + except (ImportError, AttributeError): # pragma: no cover + LOGGER.warning("Failed to run task, remove state", task=task) + # if we get an import error, the module path has probably changed + task.delete() + return Response(status=500) diff --git a/authentik/events/migrations/0003_systemtask.py b/authentik/events/migrations/0003_systemtask.py new file mode 100644 index 000000000..feaa0cf84 --- /dev/null +++ b/authentik/events/migrations/0003_systemtask.py @@ -0,0 +1,55 @@ +# Generated by Django 5.0.1 on 2024-01-13 19:38 + +import uuid + +from django.db import migrations, models + +import authentik.core.models + + +class Migration(migrations.Migration): + dependencies = [ + ("authentik_events", "0002_alter_notificationtransport_mode"), + ] + + operations = [ + migrations.CreateModel( + name="SystemTask", + fields=[ + ( + "expires", + models.DateTimeField(default=authentik.core.models.default_token_duration), + ), + ("expiring", models.BooleanField(default=True)), + ( + "uuid", + models.UUIDField( + default=uuid.uuid4, editable=False, primary_key=True, serialize=False + ), + ), + ("name", models.TextField()), + ("uid", models.TextField(null=True)), + ("start_timestamp", models.DateTimeField(auto_now_add=True)), + ("finish_timestamp", models.DateTimeField(auto_now=True)), + ( + "status", + models.PositiveIntegerField( + choices=[(1, "Successful"), (2, "Warning"), (4, "Error"), (8, "Unknown")] + ), + ), + ("description", models.TextField(null=True)), + ("messages", models.JSONField()), + ("task_call_module", models.TextField()), + ("task_call_func", models.TextField()), + ("task_call_args", models.JSONField(default=list)), + ("task_call_kwargs", models.JSONField(default=dict)), + ], + options={ + "verbose_name": "System Task", + "verbose_name_plural": "System Tasks", + "permissions": [("rerun_task", "Rerun task")], + "default_permissions": ["view"], + "unique_together": {("name", "uid")}, + }, + ), + ] diff --git a/authentik/events/models.py b/authentik/events/models.py index 240ba11de..1cddeec39 100644 --- a/authentik/events/models.py +++ b/authentik/events/models.py @@ -4,7 +4,7 @@ from collections import Counter from datetime import timedelta from inspect import currentframe from smtplib import SMTPException -from typing import TYPE_CHECKING, Optional +from typing import Optional from uuid import uuid4 from django.db import models @@ -18,6 +18,7 @@ from django.http.request import QueryDict from django.utils.timezone import now from django.utils.translation import gettext as _ from requests import RequestException +from rest_framework.serializers import Serializer from structlog.stdlib import get_logger from authentik import get_full_version @@ -26,6 +27,7 @@ from authentik.core.middleware import ( SESSION_KEY_IMPERSONATE_USER, ) from authentik.core.models import ExpiringModel, Group, PropertyMapping, User +from authentik.events.apps import GAUGE_TASKS from authentik.events.context_processors.base import get_context_processors from authentik.events.utils import ( cleanse_dict, @@ -45,8 +47,6 @@ from authentik.tenants.models import Tenant from authentik.tenants.utils import DEFAULT_TENANT LOGGER = get_logger() -if TYPE_CHECKING: - from rest_framework.serializers import Serializer def default_event_duration(): @@ -267,7 +267,7 @@ class Event(SerializerModel, ExpiringModel): super().save(*args, **kwargs) @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.events import EventSerializer return EventSerializer @@ -475,7 +475,7 @@ class NotificationTransport(SerializerModel): raise NotificationTransportError(exc) from exc @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.notification_transports import NotificationTransportSerializer return NotificationTransportSerializer @@ -508,7 +508,7 @@ class Notification(SerializerModel): user = models.ForeignKey(User, on_delete=models.CASCADE) @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.notifications import NotificationSerializer return NotificationSerializer @@ -551,7 +551,7 @@ class NotificationRule(SerializerModel, PolicyBindingModel): ) @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.notification_rules import NotificationRuleSerializer return NotificationRuleSerializer @@ -572,7 +572,7 @@ class NotificationWebhookMapping(PropertyMapping): return "ak-property-mapping-notification-form" @property - def serializer(self) -> type["Serializer"]: + def serializer(self) -> type[type[Serializer]]: from authentik.events.api.notification_mappings import NotificationWebhookMappingSerializer return NotificationWebhookMappingSerializer @@ -583,3 +583,59 @@ class NotificationWebhookMapping(PropertyMapping): class Meta: verbose_name = _("Webhook Mapping") verbose_name_plural = _("Webhook Mappings") + + +class TaskStatus(models.IntegerChoices): + """Possible states of tasks""" + + SUCCESSFUL = 1 + WARNING = 2 + ERROR = 4 + UNKNOWN = 8 + + +class SystemTask(SerializerModel, ExpiringModel): + """Info about a system task running in the background along with details to restart the task""" + + uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) + name = models.TextField() + uid = models.TextField(null=True) + + start_timestamp = models.DateTimeField(auto_now_add=True) + finish_timestamp = models.DateTimeField(auto_now=True) + + status = models.PositiveIntegerField(choices=TaskStatus.choices) + + description = models.TextField(null=True) + messages = models.JSONField() + + task_call_module = models.TextField() + task_call_func = models.TextField() + task_call_args = models.JSONField(default=list) + task_call_kwargs = models.JSONField(default=dict) + + @property + def serializer(self) -> type[Serializer]: + from authentik.events.api.tasks import SystemTaskSerializer + + return SystemTaskSerializer + + def update_metrics(self): + """Update prometheus metrics""" + duration = max(self.finish_timestamp.timestamp() - self.start_timestamp.timestamp(), 0) + GAUGE_TASKS.labels( + task_name=self.name.split(":")[0], + task_uid=self.uid or "", + status=self.status.name.lower(), + ).set(duration) + + def __str__(self) -> str: + return f"System Task {self.name}" + + class Meta: + unique_together = (("name", "uid"),) + # Remove "add", "change" and "delete" permissions as those are not used + default_permissions = ["view"] + permissions = [("rerun_task", _("Rerun task"))] + verbose_name = _("System Task") + verbose_name_plural = _("System Tasks") diff --git a/authentik/events/monitored_tasks.py b/authentik/events/monitored_tasks.py index 70f59f610..3ec5ab3a2 100644 --- a/authentik/events/monitored_tasks.py +++ b/authentik/events/monitored_tasks.py @@ -1,115 +1,18 @@ """Monitored tasks""" -from dataclasses import dataclass, field -from datetime import datetime -from enum import Enum +from datetime import datetime, timedelta, timezone from timeit import default_timer from typing import Any, Optional from celery import Task -from django.core.cache import cache +from django.db import DatabaseError, InternalError, ProgrammingError +from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from structlog.stdlib import get_logger -from authentik.events.apps import GAUGE_TASKS -from authentik.events.models import Event, EventAction +from authentik.events.models import Event, EventAction, SystemTask, TaskStatus from authentik.lib.utils.errors import exception_to_string LOGGER = get_logger() -CACHE_KEY_PREFIX = "goauthentik.io/events/tasks/" - - -class TaskResultStatus(Enum): - """Possible states of tasks""" - - SUCCESSFUL = 1 - WARNING = 2 - ERROR = 4 - UNKNOWN = 8 - - -@dataclass -class TaskResult: - """Result of a task run, this class is created by the task itself - and used by self.set_status""" - - status: TaskResultStatus - - messages: list[str] = field(default_factory=list) - - # Optional UID used in cache for tasks that run in different instances - uid: Optional[str] = field(default=None) - - def with_error(self, exc: Exception) -> "TaskResult": - """Since errors might not always be pickle-able, set the traceback""" - # TODO: Mark exception somehow so that is rendered as
in frontend - self.messages.append(exception_to_string(exc)) - return self - - -@dataclass -class TaskInfo: - """Info about a task run""" - - task_name: str - start_timestamp: float - finish_timestamp: float - finish_time: datetime - - result: TaskResult - - task_call_module: str - task_call_func: str - task_call_args: list[Any] = field(default_factory=list) - task_call_kwargs: dict[str, Any] = field(default_factory=dict) - - task_description: Optional[str] = field(default=None) - - @staticmethod - def all() -> dict[str, "TaskInfo"]: - """Get all TaskInfo objects""" - return cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*")) - - @staticmethod - def by_name(name: str) -> Optional["TaskInfo"] | Optional[list["TaskInfo"]]: - """Get TaskInfo Object by name""" - if "*" in name: - return cache.get_many(cache.keys(CACHE_KEY_PREFIX + name)).values() - return cache.get(CACHE_KEY_PREFIX + name, None) - - @property - def full_name(self) -> str: - """Get the full cache key with task name and UID""" - key = CACHE_KEY_PREFIX + self.task_name - if self.result.uid: - uid_suffix = f":{self.result.uid}" - key += uid_suffix - if not self.task_name.endswith(uid_suffix): - self.task_name += uid_suffix - return key - - def delete(self): - """Delete task info from cache""" - return cache.delete(self.full_name) - - def update_metrics(self): - """Update prometheus metrics""" - start = default_timer() - if hasattr(self, "start_timestamp"): - start = self.start_timestamp - try: - duration = max(self.finish_timestamp - start, 0) - except TypeError: - duration = 0 - GAUGE_TASKS.labels( - task_name=self.task_name.split(":")[0], - task_uid=self.result.uid or "", - status=self.result.status.name.lower(), - ).set(duration) - - def save(self, timeout_hours=6): - """Save task into cache""" - self.update_metrics() - cache.set(self.full_name, self, timeout=timeout_hours * 60 * 60) class MonitoredTask(Task): @@ -118,73 +21,94 @@ class MonitoredTask(Task): # For tasks that should only be listed if they failed, set this to False save_on_success: bool - _result: Optional[TaskResult] + _status: Optional[TaskStatus] + _messages: list[str] _uid: Optional[str] - start: Optional[float] = None + _start: Optional[float] = None def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.save_on_success = True self._uid = None - self._result = None + self._status = None + self._messages = [] self.result_timeout_hours = 6 def set_uid(self, uid: str): """Set UID, so in the case of an unexpected error its saved correctly""" self._uid = uid - def set_status(self, result: TaskResult): + def set_status(self, status: TaskStatus, *messages: str): """Set result for current run, will overwrite previous result.""" - self._result = result + self._status = status + self._messages = messages + + def set_error(self, exception: Exception): + """Set result to error and save exception""" + self._status = TaskStatus.ERROR + self._messages = [exception_to_string(exception)] def before_start(self, task_id, args, kwargs): - self.start = default_timer() + self._start = default_timer() return super().before_start(task_id, args, kwargs) # pylint: disable=too-many-arguments def after_return(self, status, retval, task_id, args: list[Any], kwargs: dict[str, Any], einfo): super().after_return(status, retval, task_id, args, kwargs, einfo=einfo) - if not self._result: + if not self._status: return - if not self._result.uid: - self._result.uid = self._uid - info = TaskInfo( - task_name=self.__name__, - task_description=self.__doc__, - start_timestamp=self.start or default_timer(), - finish_timestamp=default_timer(), - finish_time=datetime.now(), - result=self._result, - task_call_module=self.__module__, - task_call_func=self.__name__, - task_call_args=args, - task_call_kwargs=kwargs, + if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success: + SystemTask.objects.filter( + name=self.__name__, + uid=self._uid, + ).delete() + return + SystemTask.objects.update_or_create( + name=self.__name__, + uid=self._uid, + defaults={ + "description": self.__doc__, + "start_timestamp": datetime.fromtimestamp( + self._start or default_timer(), tz=timezone.utc + ), + "finish_timestamp": datetime.fromtimestamp(default_timer(), tz=timezone.utc), + "task_call_module": self.__module__, + "task_call_func": self.__name__, + "task_call_args": args, + "task_call_kwargs": kwargs, + "status": self._status, + "messages": self._messages, + "expires": now() + timedelta(hours=self.result_timeout_hours), + "expiring": True, + }, ) - if self._result.status == TaskResultStatus.SUCCESSFUL and not self.save_on_success: - info.delete() - return - info.save(self.result_timeout_hours) # pylint: disable=too-many-arguments def on_failure(self, exc, task_id, args, kwargs, einfo): super().on_failure(exc, task_id, args, kwargs, einfo=einfo) - if not self._result: - self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[str(exc)]) - if not self._result.uid: - self._result.uid = self._uid - TaskInfo( - task_name=self.__name__, - task_description=self.__doc__, - start_timestamp=self.start or default_timer(), - finish_timestamp=default_timer(), - finish_time=datetime.now(), - result=self._result, - task_call_module=self.__module__, - task_call_func=self.__name__, - task_call_args=args, - task_call_kwargs=kwargs, - ).save(self.result_timeout_hours) + if not self._status: + self._status = TaskStatus.ERROR + self._messages = exception_to_string(exc) + SystemTask.objects.update_or_create( + name=self.__name__, + uid=self._uid, + defaults={ + "description": self.__doc__, + "start_timestamp": datetime.fromtimestamp( + self._start or default_timer(), tz=timezone.utc + ), + "finish_timestamp": datetime.fromtimestamp(default_timer(), tz=timezone.utc), + "task_call_module": self.__module__, + "task_call_func": self.__name__, + "task_call_args": args, + "task_call_kwargs": kwargs, + "status": self._status, + "messages": self._messages, + "expires": now() + timedelta(hours=self.result_timeout_hours), + "expiring": True, + }, + ) Event.new( EventAction.SYSTEM_TASK_EXCEPTION, message=f"Task {self.__name__} encountered an error: {exception_to_string(exc)}", @@ -196,19 +120,20 @@ class MonitoredTask(Task): def prefill_task(func): """Ensure a task's details are always in cache, so it can always be triggered via API""" - status = TaskInfo.by_name(func.__name__) + try: + status = SystemTask.objects.filter(name=func.__name__).first() + except (DatabaseError, InternalError, ProgrammingError): + return func if status: return func - TaskInfo( - task_name=func.__name__, - task_description=func.__doc__, - result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]), + SystemTask.objects.create( + name=func.__name__, + description=func.__doc__, + status=TaskStatus.UNKNOWN, + messages=[_("Task has not been run yet.")], task_call_module=func.__module__, task_call_func=func.__name__, - # We don't have real values for these attributes but they cannot be null - start_timestamp=0, - finish_timestamp=0, - finish_time=datetime.now(), - ).save(86400) + expiring=False, + ) LOGGER.debug("prefilled task", task_name=func.__name__) return func diff --git a/authentik/events/signals.py b/authentik/events/signals.py index d0b7ec06a..dcec5eb42 100644 --- a/authentik/events/signals.py +++ b/authentik/events/signals.py @@ -8,12 +8,13 @@ from django.http import HttpRequest from authentik.core.models import User from authentik.core.signals import login_failed, password_changed -from authentik.events.models import Event, EventAction +from authentik.events.models import Event, EventAction, SystemTask from authentik.events.tasks import event_notification_handler, gdpr_cleanup from authentik.flows.models import Stage from authentik.flows.planner import PLAN_CONTEXT_SOURCE, FlowPlan from authentik.flows.views.executor import SESSION_KEY_PLAN from authentik.lib.config import CONFIG +from authentik.root.monitoring import monitoring_set from authentik.stages.invitation.models import Invitation from authentik.stages.invitation.signals import invitation_used from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS @@ -100,3 +101,10 @@ def event_user_pre_delete_cleanup(sender, instance: User, **_): """If gdpr_compliance is enabled, remove all the user's events""" if CONFIG.get_bool("gdpr_compliance", True): gdpr_cleanup.delay(instance.pk) + + +@receiver(monitoring_set) +def monitoring_system_task(sender, **_): + """Update metrics when task is saved""" + for task in SystemTask.objects.all(): + task.update_metrics() diff --git a/authentik/events/tasks.py b/authentik/events/tasks.py index 9c00860f8..cdcdf9ca1 100644 --- a/authentik/events/tasks.py +++ b/authentik/events/tasks.py @@ -13,13 +13,9 @@ from authentik.events.models import ( NotificationRule, NotificationTransport, NotificationTransportError, + TaskStatus, ) -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.policies.engine import PolicyEngine from authentik.policies.models import PolicyBinding, PolicyEngineMode from authentik.root.celery import CELERY_APP @@ -123,9 +119,9 @@ def notification_transport( if not transport: return transport.send(notification) - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL)) + self.set_status(TaskStatus.SUCCESSFUL) except (NotificationTransportError, PropertyMappingExpressionException) as exc: - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) raise exc @@ -146,4 +142,4 @@ def notification_cleanup(self: MonitoredTask): for notification in notifications: notification.delete() LOGGER.debug("Expired notifications", amount=amount) - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"])) + self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications") diff --git a/authentik/events/tests/test_tasks.py b/authentik/events/tests/test_tasks.py index 58dad6556..d343ebdb1 100644 --- a/authentik/events/tests/test_tasks.py +++ b/authentik/events/tests/test_tasks.py @@ -1,7 +1,8 @@ """Test Monitored tasks""" from django.test import TestCase -from authentik.events.monitored_tasks import MonitoredTask, TaskInfo, TaskResult, TaskResultStatus +from authentik.events.models import SystemTask, TaskStatus +from authentik.events.monitored_tasks import MonitoredTask from authentik.lib.generators import generate_id from authentik.root.celery import CELERY_APP @@ -22,22 +23,20 @@ class TestMonitoredTasks(TestCase): def test_task(self: MonitoredTask): self.save_on_success = False self.set_uid(uid) - self.set_status( - TaskResult(TaskResultStatus.ERROR if should_fail else TaskResultStatus.SUCCESSFUL) - ) + self.set_status(TaskStatus.ERROR if should_fail else TaskStatus.SUCCESSFUL) # First test successful run should_fail = False test_task.delay().get() - self.assertIsNone(TaskInfo.by_name(f"test_task:{uid}")) + self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid)) # Then test failed should_fail = True test_task.delay().get() - info = TaskInfo.by_name(f"test_task:{uid}") - self.assertEqual(info.result.status, TaskResultStatus.ERROR) + info = SystemTask.objects.filter(name="test_task", uid=uid) + self.assertEqual(info.status, TaskStatus.ERROR) # Then after that, the state should be removed should_fail = False test_task.delay().get() - self.assertIsNone(TaskInfo.by_name(f"test_task:{uid}")) + self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid)) diff --git a/authentik/events/urls.py b/authentik/events/urls.py index 5d7d40e5f..523435819 100644 --- a/authentik/events/urls.py +++ b/authentik/events/urls.py @@ -4,11 +4,13 @@ from authentik.events.api.notification_mappings import NotificationWebhookMappin from authentik.events.api.notification_rules import NotificationRuleViewSet from authentik.events.api.notification_transports import NotificationTransportViewSet from authentik.events.api.notifications import NotificationViewSet +from authentik.events.api.tasks import SystemTaskViewSet api_urlpatterns = [ ("events/events", EventViewSet), ("events/notifications", NotificationViewSet), ("events/transports", NotificationTransportViewSet), ("events/rules", NotificationRuleViewSet), + ("events/system_tasks", SystemTaskViewSet), ("propertymappings/notification", NotificationWebhookMappingViewSet), ] diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index 0d4e54a3a..93121b90e 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -19,12 +19,8 @@ from yaml import safe_load from authentik.enterprise.providers.rac.controllers.docker import RACDockerController from authentik.enterprise.providers.rac.controllers.kubernetes import RACKubernetesController -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.lib.config import CONFIG from authentik.lib.utils.reflection import path_to_class from authentik.outposts.consumer import OUTPOST_GROUP @@ -118,10 +114,8 @@ def outpost_service_connection_monitor(self: MonitoredTask): for connection in connections.iterator(): outpost_service_connection_state.delay(connection.pk) self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - [f"Successfully updated {len(connections)} connections."], - ) + TaskStatus.SUCCESSFUL, + f"Successfully updated {len(connections)} connections.", ) @@ -161,11 +155,11 @@ def outpost_controller( LOGGER.debug(log) LOGGER.debug("-----------------Outpost Controller logs end-------------------") except (ControllerException, ServiceConnectionInvalid) as exc: - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) else: if from_cache: cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk) - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs)) + self.set_status(TaskStatus.SUCCESSFUL, *logs) @CELERY_APP.task(bind=True, base=MonitoredTask) @@ -178,10 +172,8 @@ def outpost_token_ensurer(self: MonitoredTask): _ = outpost.token outpost.build_user_permissions(outpost.user) self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - [f"Successfully checked {len(all_outposts)} Outposts."], - ) + TaskStatus.SUCCESSFUL, + f"Successfully checked {len(all_outposts)} Outposts.", ) @@ -261,27 +253,27 @@ def _outpost_single_update(outpost: Outpost, layer=None): ) def outpost_connection_discovery(self: MonitoredTask): """Checks the local environment and create Service connections.""" - status = TaskResult(TaskResultStatus.SUCCESSFUL) + messages = [] if not CONFIG.get_bool("outposts.discover"): - status.messages.append("Outpost integration discovery is disabled") - self.set_status(status) + messages.append("Outpost integration discovery is disabled") + self.set_status(TaskStatus.SUCCESSFUL, *messages) return # Explicitly check against token filename, as that's # only present when the integration is enabled if Path(SERVICE_TOKEN_FILENAME).exists(): - status.messages.append("Detected in-cluster Kubernetes Config") + messages.append("Detected in-cluster Kubernetes Config") if not KubernetesServiceConnection.objects.filter(local=True).exists(): - status.messages.append("Created Service Connection for in-cluster") + messages.append("Created Service Connection for in-cluster") KubernetesServiceConnection.objects.create( name="Local Kubernetes Cluster", local=True, kubeconfig={} ) # For development, check for the existence of a kubeconfig file kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser() if kubeconfig_path.exists(): - status.messages.append("Detected kubeconfig") + messages.append("Detected kubeconfig") kubeconfig_local_name = f"k8s-{gethostname()}" if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists(): - status.messages.append("Creating kubeconfig Service Connection") + messages.append("Creating kubeconfig Service Connection") with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig: KubernetesServiceConnection.objects.create( name=kubeconfig_local_name, @@ -290,12 +282,12 @@ def outpost_connection_discovery(self: MonitoredTask): unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path socket = Path(unix_socket_path) if socket.exists() and access(socket, R_OK): - status.messages.append("Detected local docker socket") + messages.append("Detected local docker socket") if len(DockerServiceConnection.objects.filter(local=True)) == 0: - status.messages.append("Created Service Connection for docker") + messages.append("Created Service Connection for docker") DockerServiceConnection.objects.create( name="Local Docker connection", local=True, url=unix_socket_path, ) - self.set_status(status) + self.set_status(TaskStatus.SUCCESSFUL, *messages) diff --git a/authentik/policies/reputation/tasks.py b/authentik/policies/reputation/tasks.py index ac65d1748..0f9532bc6 100644 --- a/authentik/policies/reputation/tasks.py +++ b/authentik/policies/reputation/tasks.py @@ -4,12 +4,8 @@ from structlog.stdlib import get_logger from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.policies.reputation.models import Reputation from authentik.policies.reputation.signals import CACHE_KEY_PREFIX from authentik.root.celery import CELERY_APP @@ -32,4 +28,4 @@ def save_reputation(self: MonitoredTask): rep.score = score["score"] objects_to_update.append(rep) Reputation.objects.bulk_update(objects_to_update, ["score", "ip_geo_data"]) - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated Reputation"])) + self.set_status(TaskStatus.SUCCESSFUL, "Successfully updated Reputation") diff --git a/authentik/providers/scim/api/providers.py b/authentik/providers/scim/api/providers.py index ddca8cfef..1b095c7bb 100644 --- a/authentik/providers/scim/api/providers.py +++ b/authentik/providers/scim/api/providers.py @@ -1,17 +1,17 @@ """SCIM Provider API Views""" from django.utils.text import slugify from drf_spectacular.utils import OpenApiResponse, extend_schema +from guardian.shortcuts import get_objects_for_user from rest_framework.decorators import action from rest_framework.fields import BooleanField from rest_framework.request import Request from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet -from authentik.admin.api.tasks import TaskSerializer from authentik.core.api.providers import ProviderSerializer from authentik.core.api.used_by import UsedByMixin from authentik.core.api.utils import PassiveSerializer -from authentik.events.monitored_tasks import TaskInfo +from authentik.events.api.tasks import SystemTaskSerializer from authentik.providers.scim.models import SCIMProvider @@ -43,7 +43,7 @@ class SCIMSyncStatusSerializer(PassiveSerializer): """SCIM Provider sync status""" is_running = BooleanField(read_only=True) - tasks = TaskSerializer(many=True, read_only=True) + tasks = SystemTaskSerializer(many=True, read_only=True) class SCIMProviderViewSet(UsedByMixin, ModelViewSet): @@ -65,8 +65,12 @@ class SCIMProviderViewSet(UsedByMixin, ModelViewSet): def sync_status(self, request: Request, pk: int) -> Response: """Get provider's sync status""" provider: SCIMProvider = self.get_object() - task = TaskInfo.by_name(f"scim_sync:{slugify(provider.name)}") - tasks = [task] if task else [] + tasks = list( + get_objects_for_user(request.user, "authentik_events.view_systemtask").filter( + name="scim_sync", + uid=slugify(provider.name), + ) + ) status = { "tasks": tasks, "is_running": provider.sync_lock.locked(), diff --git a/authentik/providers/scim/tasks.py b/authentik/providers/scim/tasks.py index 7b16b293e..e88e2c814 100644 --- a/authentik/providers/scim/tasks.py +++ b/authentik/providers/scim/tasks.py @@ -10,7 +10,8 @@ from pydanticscim.responses import PatchOp from structlog.stdlib import get_logger from authentik.core.models import Group, User -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask from authentik.lib.utils.reflection import path_to_class from authentik.providers.scim.clients import PAGE_SIZE, PAGE_TIMEOUT from authentik.providers.scim.clients.base import SCIMClient @@ -52,8 +53,8 @@ def scim_sync(self: MonitoredTask, provider_pk: int) -> None: LOGGER.debug("SCIM sync locked, skipping task", source=provider.name) return self.set_uid(slugify(provider.name)) - result = TaskResult(TaskResultStatus.SUCCESSFUL, []) - result.messages.append(_("Starting full SCIM sync")) + messages = [] + messages.append(_("Starting full SCIM sync")) LOGGER.debug("Starting SCIM sync") users_paginator = Paginator(provider.get_user_qs(), PAGE_SIZE) groups_paginator = Paginator(provider.get_group_qs(), PAGE_SIZE) @@ -63,17 +64,17 @@ def scim_sync(self: MonitoredTask, provider_pk: int) -> None: with allow_join_result(): try: for page in users_paginator.page_range: - result.messages.append(_("Syncing page %(page)d of users" % {"page": page})) + messages.append(_("Syncing page %(page)d of users" % {"page": page})) for msg in scim_sync_users.delay(page, provider_pk).get(): - result.messages.append(msg) + messages.append(msg) for page in groups_paginator.page_range: - result.messages.append(_("Syncing page %(page)d of groups" % {"page": page})) + messages.append(_("Syncing page %(page)d of groups" % {"page": page})) for msg in scim_sync_group.delay(page, provider_pk).get(): - result.messages.append(msg) + messages.append(msg) except StopSync as exc: - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) return - self.set_status(result) + self.set_status(TaskStatus.SUCCESSFUL, *messages) @CELERY_APP.task( diff --git a/authentik/sources/ldap/api.py b/authentik/sources/ldap/api.py index 08e530bbb..e4bdf79d7 100644 --- a/authentik/sources/ldap/api.py +++ b/authentik/sources/ldap/api.py @@ -6,6 +6,7 @@ from django_filters.filters import AllValuesMultipleFilter from django_filters.filterset import FilterSet from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import extend_schema, extend_schema_field, inline_serializer +from guardian.shortcuts import get_objects_for_user from rest_framework.decorators import action from rest_framework.exceptions import ValidationError from rest_framework.fields import BooleanField, DictField, ListField, SerializerMethodField @@ -14,13 +15,12 @@ from rest_framework.request import Request from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet -from authentik.admin.api.tasks import TaskSerializer from authentik.core.api.propertymappings import PropertyMappingSerializer from authentik.core.api.sources import SourceSerializer from authentik.core.api.used_by import UsedByMixin from authentik.core.api.utils import PassiveSerializer from authentik.crypto.models import CertificateKeyPair -from authentik.events.monitored_tasks import TaskInfo +from authentik.events.api.tasks import SystemTaskSerializer from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource from authentik.sources.ldap.tasks import CACHE_KEY_STATUS, SYNC_CLASSES @@ -91,7 +91,7 @@ class LDAPSyncStatusSerializer(PassiveSerializer): """LDAP Source sync status""" is_running = BooleanField(read_only=True) - tasks = TaskSerializer(many=True, read_only=True) + tasks = SystemTaskSerializer(many=True, read_only=True) class LDAPSourceViewSet(UsedByMixin, ModelViewSet): @@ -136,7 +136,12 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet): def sync_status(self, request: Request, slug: str) -> Response: """Get source's sync status""" source: LDAPSource = self.get_object() - tasks = TaskInfo.by_name(f"ldap_sync:{source.slug}:*") or [] + tasks = list( + get_objects_for_user(request.user, "authentik_events.view_systemtask").filter( + name="ldap_sync", + uid__startswith=source.slug, + ) + ) status = { "tasks": tasks, "is_running": source.sync_lock.locked(), diff --git a/authentik/sources/ldap/sync/vendor/freeipa.py b/authentik/sources/ldap/sync/vendor/freeipa.py index d0bce0584..d8014f716 100644 --- a/authentik/sources/ldap/sync/vendor/freeipa.py +++ b/authentik/sources/ldap/sync/vendor/freeipa.py @@ -1,9 +1,7 @@ """FreeIPA specific""" -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Generator -from pytz import UTC - from authentik.core.models import User from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer, flatten @@ -27,7 +25,7 @@ class FreeIPA(BaseLDAPSynchronizer): if "krbLastPwdChange" not in attributes: return pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now()) - pwd_last_set = pwd_last_set.replace(tzinfo=UTC) + pwd_last_set = pwd_last_set.replace(tzinfo=timezone.utc) if created or pwd_last_set >= user.password_change_date: self.message(f"'{user.username}': Reset user's password") self._logger.debug( diff --git a/authentik/sources/ldap/sync/vendor/ms_ad.py b/authentik/sources/ldap/sync/vendor/ms_ad.py index 84b8fc7df..342b104d9 100644 --- a/authentik/sources/ldap/sync/vendor/ms_ad.py +++ b/authentik/sources/ldap/sync/vendor/ms_ad.py @@ -1,10 +1,8 @@ """Active Directory specific""" -from datetime import datetime +from datetime import datetime, timezone from enum import IntFlag from typing import Any, Generator -from pytz import UTC - from authentik.core.models import User from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer @@ -58,7 +56,7 @@ class MicrosoftActiveDirectory(BaseLDAPSynchronizer): if "pwdLastSet" not in attributes: return pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now()) - pwd_last_set = pwd_last_set.replace(tzinfo=UTC) + pwd_last_set = pwd_last_set.replace(tzinfo=timezone.utc) if created or pwd_last_set >= user.password_change_date: self.message(f"'{user.username}': Reset user's password") self._logger.debug( diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index 7f00d6bb3..dcf6bff5d 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -8,8 +8,8 @@ from ldap3.core.exceptions import LDAPException from redis.exceptions import LockError from structlog.stdlib import get_logger -from authentik.events.monitored_tasks import CACHE_KEY_PREFIX as CACHE_KEY_PREFIX_TASKS -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.models import SystemTask, TaskStatus +from authentik.events.monitored_tasks import MonitoredTask from authentik.lib.config import CONFIG from authentik.lib.utils.errors import exception_to_string from authentik.lib.utils.reflection import class_to_path, path_to_class @@ -69,8 +69,7 @@ def ldap_sync_single(source_pk: str): try: with lock: # Delete all sync tasks from the cache - keys = cache.keys(f"{CACHE_KEY_PREFIX_TASKS}ldap_sync:{source.slug}*") - cache.delete_many(keys) + SystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete() task = chain( # User and group sync can happen at once, they have no dependencies on each other group( @@ -127,20 +126,18 @@ def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str, page_cache_k + "Try increasing ldap.task_timeout_hours" ) LOGGER.warning(error_message) - self.set_status(TaskResult(TaskResultStatus.ERROR, [error_message])) + self.set_status(TaskStatus.ERROR, error_message) return cache.touch(page_cache_key) count = sync_inst.sync(page) messages = sync_inst.messages messages.append(f"Synced {count} objects.") self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - messages, - ) + TaskStatus.SUCCESSFUL, + *messages, ) cache.delete(page_cache_key) except LDAPException as exc: # No explicit event is created here as .set_status with an error will do that LOGGER.warning(exception_to_string(exc)) - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) diff --git a/authentik/sources/ldap/tests/test_sync.py b/authentik/sources/ldap/tests/test_sync.py index 13035ac2b..d5c8372c4 100644 --- a/authentik/sources/ldap/tests/test_sync.py +++ b/authentik/sources/ldap/tests/test_sync.py @@ -7,8 +7,8 @@ from django.test import TestCase from authentik.blueprints.tests import apply_blueprint from authentik.core.models import Group, User from authentik.core.tests.utils import create_test_admin_user -from authentik.events.models import Event, EventAction -from authentik.events.monitored_tasks import TaskInfo, TaskResultStatus +from authentik.events.models import Event, EventAction, SystemTask +from authentik.events.monitored_tasks import TaskStatus from authentik.lib.generators import generate_id, generate_key from authentik.lib.utils.reflection import class_to_path from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource @@ -41,8 +41,8 @@ class LDAPSyncTests(TestCase): connection = MagicMock(return_value=mock_ad_connection(LDAP_PASSWORD)) with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): ldap_sync.delay(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo").get() - status = TaskInfo.by_name("ldap_sync:ldap:users:foo") - self.assertEqual(status.result.status, TaskResultStatus.ERROR) + task = SystemTask.objects.filter(name="ldap_sync", uid="ldap:users:foo").first() + self.assertEqual(task.status, TaskStatus.ERROR) def test_sync_error(self): """Test user sync""" diff --git a/authentik/sources/oauth/tasks.py b/authentik/sources/oauth/tasks.py index 6197df512..caa284729 100644 --- a/authentik/sources/oauth/tasks.py +++ b/authentik/sources/oauth/tasks.py @@ -4,7 +4,8 @@ from json import dumps from requests import RequestException from structlog.stdlib import get_logger -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP from authentik.sources.oauth.models import OAuthSource @@ -16,7 +17,7 @@ LOGGER = get_logger() def update_well_known_jwks(self: MonitoredTask): """Update OAuth sources' config from well_known, and JWKS info from the configured URL""" session = get_http_session() - result = TaskResult(TaskResultStatus.SUCCESSFUL, []) + messages = [] for source in OAuthSource.objects.all().exclude(oidc_well_known_url=""): try: well_known_config = session.get(source.oidc_well_known_url) @@ -24,7 +25,7 @@ def update_well_known_jwks(self: MonitoredTask): except RequestException as exc: text = exc.response.text if exc.response else str(exc) LOGGER.warning("Failed to update well_known", source=source, exc=exc, text=text) - result.messages.append(f"Failed to update OIDC configuration for {source.slug}") + messages.append(f"Failed to update OIDC configuration for {source.slug}") continue config = well_known_config.json() try: @@ -47,7 +48,7 @@ def update_well_known_jwks(self: MonitoredTask): source=source, exc=exc, ) - result.messages.append(f"Failed to update OIDC configuration for {source.slug}") + messages.append(f"Failed to update OIDC configuration for {source.slug}") continue if dirty: LOGGER.info("Updating sources' OpenID Configuration", source=source) @@ -60,11 +61,11 @@ def update_well_known_jwks(self: MonitoredTask): except RequestException as exc: text = exc.response.text if exc.response else str(exc) LOGGER.warning("Failed to update JWKS", source=source, exc=exc, text=text) - result.messages.append(f"Failed to update JWKS for {source.slug}") + messages.append(f"Failed to update JWKS for {source.slug}") continue config = jwks_config.json() if dumps(source.oidc_jwks, sort_keys=True) != dumps(config, sort_keys=True): source.oidc_jwks = config LOGGER.info("Updating sources' JWKS", source=source) source.save() - self.set_status(result) + self.set_status(TaskStatus.SUCCESSFUL, *messages) diff --git a/authentik/sources/plex/tasks.py b/authentik/sources/plex/tasks.py index c83b43f21..0934712a6 100644 --- a/authentik/sources/plex/tasks.py +++ b/authentik/sources/plex/tasks.py @@ -1,8 +1,8 @@ """Plex tasks""" from requests import RequestException -from authentik.events.models import Event, EventAction -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.models import Event, EventAction, TaskStatus +from authentik.events.monitored_tasks import MonitoredTask from authentik.lib.utils.errors import exception_to_string from authentik.root.celery import CELERY_APP from authentik.sources.plex.models import PlexSource @@ -27,16 +27,15 @@ def check_plex_token(self: MonitoredTask, source_slug: int): auth = PlexAuth(source, source.plex_token) try: auth.get_user_info() - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Plex token is valid."])) + self.set_status(TaskStatus.SUCCESSFUL, "Plex token is valid.") except RequestException as exc: error = exception_to_string(exc) if len(source.plex_token) > 0: error = error.replace(source.plex_token, "$PLEX_TOKEN") self.set_status( - TaskResult( - TaskResultStatus.ERROR, - ["Plex token is invalid/an error occurred:", error], - ) + TaskStatus.ERROR, + "Plex token is invalid/an error occurred:", + error, ) Event.new( EventAction.CONFIGURATION_ERROR, diff --git a/authentik/stages/email/tasks.py b/authentik/stages/email/tasks.py index 6f0b6e104..0d39e4758 100644 --- a/authentik/stages/email/tasks.py +++ b/authentik/stages/email/tasks.py @@ -9,8 +9,8 @@ from django.core.mail.utils import DNS_NAME from django.utils.text import slugify from structlog.stdlib import get_logger -from authentik.events.models import Event, EventAction -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.models import Event, EventAction, TaskStatus +from authentik.events.monitored_tasks import MonitoredTask from authentik.root.celery import CELERY_APP from authentik.stages.email.models import EmailStage from authentik.stages.email.utils import logo_data @@ -58,10 +58,8 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti stages = EmailStage.objects.filter(pk=email_stage_pk) if not stages.exists(): self.set_status( - TaskResult( - TaskResultStatus.WARNING, - messages=["Email stage does not exist anymore. Discarding message."], - ) + TaskStatus.WARNING, + "Email stage does not exist anymore. Discarding message.", ) return stage: EmailStage = stages.first() @@ -69,7 +67,7 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti backend = stage.backend except ValueError as exc: LOGGER.warning("failed to get email backend", exc=exc) - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) return backend.open() # Since django's EmailMessage objects are not JSON serialisable, @@ -97,12 +95,10 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti to_email=message_object.to, ).save() self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - messages=["Successfully sent Mail."], - ) + TaskStatus.SUCCESSFUL, + "Successfully sent Mail.", ) except (SMTPException, ConnectionError, OSError) as exc: LOGGER.debug("Error sending email, retrying...", exc=exc) - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) raise exc diff --git a/blueprints/schema.json b/blueprints/schema.json index 213cb1673..cf78a683a 100644 --- a/blueprints/schema.json +++ b/blueprints/schema.json @@ -263,6 +263,43 @@ } } }, + { + "type": "object", + "required": [ + "model", + "identifiers" + ], + "properties": { + "model": { + "const": "authentik_events.systemtask" + }, + "id": { + "type": "string" + }, + "state": { + "type": "string", + "enum": [ + "absent", + "present", + "created", + "must_created" + ], + "default": "present" + }, + "conditions": { + "type": "array", + "items": { + "type": "boolean" + } + }, + "attrs": { + "$ref": "#/$defs/model_authentik_events.systemtask" + }, + "identifiers": { + "$ref": "#/$defs/model_authentik_events.systemtask" + } + } + }, { "type": "object", "required": [ @@ -3197,6 +3234,50 @@ }, "required": [] }, + "model_authentik_events.systemtask": { + "type": "object", + "properties": { + "task_name": { + "type": "string", + "minLength": 1, + "title": "Task name" + }, + "task_description": { + "type": "string", + "minLength": 1, + "title": "Task description" + }, + "task_start_timestamp": { + "type": "string", + "format": "date-time", + "title": "Task start timestamp" + }, + "task_finish_timestamp": { + "type": "string", + "format": "date-time", + "title": "Task finish timestamp" + }, + "status": { + "type": "string", + "enum": [ + "SUCCESSFUL", + "WARNING", + "ERROR", + "UNKNOWN" + ], + "title": "Status" + }, + "messages": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + }, + "title": "Messages" + } + }, + "required": [] + }, "model_authentik_flows.flow": { "type": "object", "properties": { @@ -3607,6 +3688,7 @@ "authentik_events.notification", "authentik_events.notificationrule", "authentik_events.notificationwebhookmapping", + "authentik_events.systemtask", "authentik_flows.flow", "authentik_flows.flowstagebinding", "authentik_outposts.dockerserviceconnection", diff --git a/schema.yml b/schema.yml index 85dd0e5f5..350ef914d 100644 --- a/schema.yml +++ b/schema.yml @@ -147,103 +147,6 @@ paths: schema: $ref: '#/components/schemas/GenericError' description: '' - /admin/system_tasks/: - get: - operationId: admin_system_tasks_list - description: List system tasks - tags: - - admin - security: - - authentik: [] - responses: - '200': - content: - application/json: - schema: - type: array - items: - $ref: '#/components/schemas/Task' - description: '' - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/ValidationError' - description: '' - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/GenericError' - description: '' - /admin/system_tasks/{id}/: - get: - operationId: admin_system_tasks_retrieve - description: Get a single system task - parameters: - - in: path - name: id - schema: - type: string - required: true - tags: - - admin - security: - - authentik: [] - responses: - '200': - content: - application/json: - schema: - $ref: '#/components/schemas/Task' - description: '' - '404': - description: Task not found - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/ValidationError' - description: '' - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/GenericError' - description: '' - /admin/system_tasks/{id}/retry/: - post: - operationId: admin_system_tasks_retry_create - description: Retry task - parameters: - - in: path - name: id - schema: - type: string - required: true - tags: - - admin - security: - - authentik: [] - responses: - '204': - description: Task retried successfully - '404': - description: Task not found - '500': - description: Failed to retry task - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/ValidationError' - description: '' - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/GenericError' - description: '' /admin/version/: get: operationId: admin_version_retrieve @@ -6932,6 +6835,133 @@ paths: schema: $ref: '#/components/schemas/GenericError' description: '' + /events/system_tasks/: + get: + operationId: events_system_tasks_list + description: Read-only view set that returns all background tasks + parameters: + - name: ordering + required: false + in: query + description: Which field to use when ordering the results. + schema: + type: string + - name: page + required: false + in: query + description: A page number within the paginated result set. + schema: + type: integer + - name: page_size + required: false + in: query + description: Number of results to return per page. + schema: + type: integer + - name: search + required: false + in: query + description: A search term. + schema: + type: string + tags: + - events + security: + - authentik: [] + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/PaginatedSystemTaskList' + description: '' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/ValidationError' + description: '' + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/GenericError' + description: '' + /events/system_tasks/{uuid}/: + get: + operationId: events_system_tasks_retrieve + description: Read-only view set that returns all background tasks + parameters: + - in: path + name: uuid + schema: + type: string + format: uuid + description: A UUID string identifying this System Task. + required: true + tags: + - events + security: + - authentik: [] + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/SystemTask' + description: '' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/ValidationError' + description: '' + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/GenericError' + description: '' + /events/system_tasks/{uuid}/retry/: + post: + operationId: events_system_tasks_retry_create + description: Retry task + parameters: + - in: path + name: id + schema: + type: string + required: true + - in: path + name: uuid + schema: + type: string + format: uuid + description: A UUID string identifying this System Task. + required: true + tags: + - events + security: + - authentik: [] + responses: + '204': + description: Task retried successfully + '404': + description: Task not found + '500': + description: Failed to retry task + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/ValidationError' + description: '' + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/GenericError' + description: '' /events/transports/: get: operationId: events_transports_list @@ -18071,6 +18101,7 @@ paths: - authentik_events.notificationrule - authentik_events.notificationtransport - authentik_events.notificationwebhookmapping + - authentik_events.systemtask - authentik_flows.flow - authentik_flows.flowstagebinding - authentik_outposts.dockerserviceconnection @@ -18143,6 +18174,7 @@ paths: * `authentik_events.notification` - Notification * `authentik_events.notificationrule` - Notification Rule * `authentik_events.notificationwebhookmapping` - Webhook Mapping + * `authentik_events.systemtask` - System Task * `authentik_flows.flow` - Flow * `authentik_flows.flowstagebinding` - Flow Stage Binding * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection @@ -18365,6 +18397,7 @@ paths: - authentik_events.notificationrule - authentik_events.notificationtransport - authentik_events.notificationwebhookmapping + - authentik_events.systemtask - authentik_flows.flow - authentik_flows.flowstagebinding - authentik_outposts.dockerserviceconnection @@ -18437,6 +18470,7 @@ paths: * `authentik_events.notification` - Notification * `authentik_events.notificationrule` - Notification Rule * `authentik_events.notificationwebhookmapping` - Webhook Mapping + * `authentik_events.systemtask` - System Task * `authentik_flows.flow` - Flow * `authentik_flows.flowstagebinding` - Flow Stage Binding * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection @@ -31696,6 +31730,7 @@ components: * `authentik_events.notification` - Notification * `authentik_events.notificationrule` - Notification Rule * `authentik_events.notificationwebhookmapping` - Webhook Mapping + * `authentik_events.systemtask` - System Task * `authentik_flows.flow` - Flow * `authentik_flows.flowstagebinding` - Flow Stage Binding * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection @@ -31896,6 +31931,7 @@ components: * `authentik_events.notification` - Notification * `authentik_events.notificationrule` - Notification Rule * `authentik_events.notificationwebhookmapping` - Webhook Mapping + * `authentik_events.systemtask` - System Task * `authentik_flows.flow` - Flow * `authentik_flows.flowstagebinding` - Flow Stage Binding * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection @@ -34047,7 +34083,7 @@ components: tasks: type: array items: - $ref: '#/components/schemas/Task' + $ref: '#/components/schemas/SystemTask' readOnly: true required: - is_running @@ -34214,6 +34250,7 @@ components: - authentik_events.notification - authentik_events.notificationrule - authentik_events.notificationwebhookmapping + - authentik_events.systemtask - authentik_flows.flow - authentik_flows.flowstagebinding - authentik_outposts.dockerserviceconnection @@ -34293,6 +34330,7 @@ components: * `authentik_events.notification` - Notification * `authentik_events.notificationrule` - Notification Rule * `authentik_events.notificationwebhookmapping` - Webhook Mapping + * `authentik_events.systemtask` - System Task * `authentik_flows.flow` - Flow * `authentik_flows.flowstagebinding` - Flow Stage Binding * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection @@ -36281,6 +36319,18 @@ components: required: - pagination - results + PaginatedSystemTaskList: + type: object + properties: + pagination: + $ref: '#/components/schemas/Pagination' + results: + type: array + items: + $ref: '#/components/schemas/SystemTask' + required: + - pagination + - results PaginatedTOTPDeviceList: type: object properties: @@ -37431,6 +37481,7 @@ components: * `authentik_events.notification` - Notification * `authentik_events.notificationrule` - Notification Rule * `authentik_events.notificationwebhookmapping` - Webhook Mapping + * `authentik_events.systemtask` - System Task * `authentik_flows.flow` - Flow * `authentik_flows.flowstagebinding` - Flow Stage Binding * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection @@ -41972,7 +42023,7 @@ components: tasks: type: array items: - $ref: '#/components/schemas/Task' + $ref: '#/components/schemas/SystemTask' readOnly: true required: - is_running @@ -42630,6 +42681,50 @@ components: - runtime - server_time - tenant + SystemTask: + type: object + description: Serialize TaskInfo and TaskResult + properties: + task_name: + type: string + task_description: + type: string + task_start_timestamp: + type: string + format: date-time + task_finish_timestamp: + type: string + format: date-time + task_duration: + type: integer + description: Get the duration a task took to run + readOnly: true + status: + $ref: '#/components/schemas/SystemTaskStatusEnum' + messages: + type: array + items: + type: string + required: + - messages + - status + - task_description + - task_duration + - task_finish_timestamp + - task_name + - task_start_timestamp + SystemTaskStatusEnum: + enum: + - SUCCESSFUL + - WARNING + - ERROR + - UNKNOWN + type: string + description: |- + * `SUCCESSFUL` - SUCCESSFUL + * `WARNING` - WARNING + * `ERROR` - ERROR + * `UNKNOWN` - UNKNOWN TOTPDevice: type: object description: Serializer for totp authenticator devices @@ -42656,45 +42751,6 @@ components: maxLength: 64 required: - name - Task: - type: object - description: Serialize TaskInfo and TaskResult - properties: - task_name: - type: string - task_description: - type: string - task_finish_timestamp: - type: string - format: date-time - task_duration: - type: integer - description: Get the duration a task took to run - readOnly: true - status: - $ref: '#/components/schemas/TaskStatusEnum' - messages: - type: array - items: {} - required: - - messages - - status - - task_description - - task_duration - - task_finish_timestamp - - task_name - TaskStatusEnum: - enum: - - SUCCESSFUL - - WARNING - - ERROR - - UNKNOWN - type: string - description: |- - * `SUCCESSFUL` - SUCCESSFUL - * `WARNING` - WARNING - * `ERROR` - ERROR - * `UNKNOWN` - UNKNOWN Tenant: type: object description: Tenant Serializer