From 7fd9d31101eb47e97014a6ffe9f58cd1fcd0639f Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Sat, 13 Jan 2024 20:31:45 +0100 Subject: [PATCH] events: migrate system tasks to save in DB Signed-off-by: Jens Langhammer --- authentik/admin/api/tasks.py | 134 ------- authentik/admin/signals.py | 8 - authentik/admin/tasks.py | 15 +- authentik/admin/tests/test_api.py | 4 +- authentik/admin/urls.py | 2 - authentik/blueprints/v1/tasks.py | 21 +- authentik/core/tasks.py | 11 +- authentik/crypto/tasks.py | 13 +- authentik/events/api/tasks.py | 101 ++++++ .../events/migrations/0003_systemtask.py | 55 +++ authentik/events/models.py | 72 +++- authentik/events/monitored_tasks.py | 227 ++++-------- authentik/events/signals.py | 10 +- authentik/events/tasks.py | 14 +- authentik/events/tests/test_tasks.py | 15 +- authentik/events/urls.py | 2 + authentik/outposts/tasks.py | 44 +-- authentik/policies/reputation/tasks.py | 10 +- authentik/providers/scim/api/providers.py | 14 +- authentik/providers/scim/tasks.py | 19 +- authentik/sources/ldap/api.py | 13 +- authentik/sources/ldap/sync/vendor/freeipa.py | 6 +- authentik/sources/ldap/sync/vendor/ms_ad.py | 6 +- authentik/sources/ldap/tasks.py | 17 +- authentik/sources/ldap/tests/test_sync.py | 8 +- authentik/sources/oauth/tasks.py | 13 +- authentik/sources/plex/tasks.py | 13 +- authentik/stages/email/tasks.py | 20 +- blueprints/schema.json | 82 +++++ schema.yml | 332 ++++++++++-------- 30 files changed, 699 insertions(+), 602 deletions(-) delete mode 100644 authentik/admin/api/tasks.py create mode 100644 authentik/events/api/tasks.py create mode 100644 authentik/events/migrations/0003_systemtask.py diff --git a/authentik/admin/api/tasks.py b/authentik/admin/api/tasks.py deleted file mode 100644 index 72714dad5..000000000 --- a/authentik/admin/api/tasks.py +++ /dev/null @@ -1,134 +0,0 @@ -"""Tasks API""" -from importlib import import_module - -from django.contrib import messages -from django.http.response import Http404 -from django.utils.translation import gettext_lazy as _ -from drf_spectacular.types import OpenApiTypes -from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema -from rest_framework.decorators import action -from rest_framework.fields import ( - CharField, - ChoiceField, - DateTimeField, - ListField, - SerializerMethodField, -) -from rest_framework.request import Request -from rest_framework.response import Response -from rest_framework.viewsets import ViewSet -from structlog.stdlib import get_logger - -from authentik.api.decorators import permission_required -from authentik.core.api.utils import PassiveSerializer -from authentik.events.monitored_tasks import TaskInfo, TaskResultStatus -from authentik.rbac.permissions import HasPermission - -LOGGER = get_logger() - - -class TaskSerializer(PassiveSerializer): - """Serialize TaskInfo and TaskResult""" - - task_name = CharField() - task_description = CharField() - task_finish_timestamp = DateTimeField(source="finish_time") - task_duration = SerializerMethodField() - - status = ChoiceField( - source="result.status.name", - choices=[(x.name, x.name) for x in TaskResultStatus], - ) - messages = ListField(source="result.messages") - - def get_task_duration(self, instance: TaskInfo) -> int: - """Get the duration a task took to run""" - return max(instance.finish_timestamp - instance.start_timestamp, 0) - - def to_representation(self, instance: TaskInfo): - """When a new version of authentik adds fields to TaskInfo, - the API will fail with an AttributeError, as the classes - are pickled in cache. In that case, just delete the info""" - try: - return super().to_representation(instance) - # pylint: disable=broad-except - except Exception: # pragma: no cover - if isinstance(self.instance, list): - for inst in self.instance: - inst.delete() - else: - self.instance.delete() - return {} - - -class TaskViewSet(ViewSet): - """Read-only view set that returns all background tasks""" - - permission_classes = [HasPermission("authentik_rbac.view_system_tasks")] - serializer_class = TaskSerializer - - @extend_schema( - responses={ - 200: TaskSerializer(many=False), - 404: OpenApiResponse(description="Task not found"), - }, - parameters=[ - OpenApiParameter( - "id", - type=OpenApiTypes.STR, - location=OpenApiParameter.PATH, - required=True, - ), - ], - ) - def retrieve(self, request: Request, pk=None) -> Response: - """Get a single system task""" - task = TaskInfo.by_name(pk) - if not task: - raise Http404 - return Response(TaskSerializer(task, many=False).data) - - @extend_schema(responses={200: TaskSerializer(many=True)}) - def list(self, request: Request) -> Response: - """List system tasks""" - tasks = sorted(TaskInfo.all().values(), key=lambda task: task.task_name) - return Response(TaskSerializer(tasks, many=True).data) - - @permission_required(None, ["authentik_rbac.run_system_tasks"]) - @extend_schema( - request=OpenApiTypes.NONE, - responses={ - 204: OpenApiResponse(description="Task retried successfully"), - 404: OpenApiResponse(description="Task not found"), - 500: OpenApiResponse(description="Failed to retry task"), - }, - parameters=[ - OpenApiParameter( - "id", - type=OpenApiTypes.STR, - location=OpenApiParameter.PATH, - required=True, - ), - ], - ) - @action(detail=True, methods=["post"]) - def retry(self, request: Request, pk=None) -> Response: - """Retry task""" - task = TaskInfo.by_name(pk) - if not task: - raise Http404 - try: - task_module = import_module(task.task_call_module) - task_func = getattr(task_module, task.task_call_func) - LOGGER.debug("Running task", task=task_func) - task_func.delay(*task.task_call_args, **task.task_call_kwargs) - messages.success( - self.request, - _("Successfully re-scheduled Task %(name)s!" % {"name": task.task_name}), - ) - return Response(status=204) - except (ImportError, AttributeError): # pragma: no cover - LOGGER.warning("Failed to run task, remove state", task=task) - # if we get an import error, the module path has probably changed - task.delete() - return Response(status=500) diff --git a/authentik/admin/signals.py b/authentik/admin/signals.py index f171fc74e..aa081724b 100644 --- a/authentik/admin/signals.py +++ b/authentik/admin/signals.py @@ -1,7 +1,6 @@ """admin signals""" from django.dispatch import receiver -from authentik.admin.api.tasks import TaskInfo from authentik.admin.apps import GAUGE_WORKERS from authentik.root.celery import CELERY_APP from authentik.root.monitoring import monitoring_set @@ -12,10 +11,3 @@ def monitoring_set_workers(sender, **kwargs): """Set worker gauge""" count = len(CELERY_APP.control.ping(timeout=0.5)) GAUGE_WORKERS.set(count) - - -@receiver(monitoring_set) -def monitoring_set_tasks(sender, **kwargs): - """Set task gauges""" - for task in TaskInfo.all().values(): - task.update_metrics() diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index c5b0ebf61..8db757d1f 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -11,12 +11,7 @@ from structlog.stdlib import get_logger from authentik import __version__, get_build_hash from authentik.admin.apps import PROM_INFO from authentik.events.models import Event, EventAction, Notification -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task from authentik.lib.config import CONFIG from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP @@ -60,7 +55,7 @@ def update_latest_version(self: MonitoredTask): """Update latest version info""" if CONFIG.get_bool("disable_update_check"): cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) - self.set_status(TaskResult(TaskResultStatus.WARNING, messages=["Version check disabled."])) + self.set_status(TaskStatus.WARNING, "Version check disabled.") return try: response = get_http_session().get( @@ -70,9 +65,7 @@ def update_latest_version(self: MonitoredTask): data = response.json() upstream_version = data.get("stable", {}).get("version") cache.set(VERSION_CACHE_KEY, upstream_version, VERSION_CACHE_TIMEOUT) - self.set_status( - TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated latest Version"]) - ) + self.set_status(TaskStatus.SUCCESSFUL, "Successfully updated latest Version") _set_prom_info() # Check if upstream version is newer than what we're running, # and if no event exists yet, create one. @@ -89,7 +82,7 @@ def update_latest_version(self: MonitoredTask): Event.new(EventAction.UPDATE_AVAILABLE, **event_dict).save() except (RequestException, IndexError) as exc: cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) _set_prom_info() diff --git a/authentik/admin/tests/test_api.py b/authentik/admin/tests/test_api.py index 8fcbfa286..8ef016b88 100644 --- a/authentik/admin/tests/test_api.py +++ b/authentik/admin/tests/test_api.py @@ -8,7 +8,7 @@ from authentik import __version__ from authentik.blueprints.tests import reconcile_app from authentik.core.models import Group, User from authentik.core.tasks import clean_expired_models -from authentik.events.monitored_tasks import TaskResultStatus +from authentik.events.monitored_tasks import TaskStatus from authentik.lib.generators import generate_id @@ -42,7 +42,7 @@ class TestAdminAPI(TestCase): ) self.assertEqual(response.status_code, 200) body = loads(response.content) - self.assertEqual(body["status"], TaskResultStatus.SUCCESSFUL.name) + self.assertEqual(body["status"], TaskStatus.SUCCESSFUL.name) self.assertEqual(body["task_name"], "clean_expired_models") response = self.client.get( reverse("authentik_api:admin_system_tasks-detail", kwargs={"pk": "qwerqwer"}) diff --git a/authentik/admin/urls.py b/authentik/admin/urls.py index fec51f5f7..6ef21e93e 100644 --- a/authentik/admin/urls.py +++ b/authentik/admin/urls.py @@ -4,12 +4,10 @@ from django.urls import path from authentik.admin.api.meta import AppsViewSet, ModelViewSet from authentik.admin.api.metrics import AdministrationMetricsViewSet from authentik.admin.api.system import SystemView -from authentik.admin.api.tasks import TaskViewSet from authentik.admin.api.version import VersionView from authentik.admin.api.workers import WorkerView api_urlpatterns = [ - ("admin/system_tasks", TaskViewSet, "admin_system_tasks"), ("admin/apps", AppsViewSet, "apps"), ("admin/models", ModelViewSet, "models"), path( diff --git a/authentik/blueprints/v1/tasks.py b/authentik/blueprints/v1/tasks.py index 686e4747c..33b517cb0 100644 --- a/authentik/blueprints/v1/tasks.py +++ b/authentik/blueprints/v1/tasks.py @@ -29,12 +29,8 @@ from authentik.blueprints.v1.common import BlueprintLoader, BlueprintMetadata, E from authentik.blueprints.v1.importer import Importer from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE from authentik.blueprints.v1.oci import OCI_PREFIX -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.events.utils import sanitize_dict from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -140,10 +136,7 @@ def blueprints_discovery(self: MonitoredTask, path: Optional[str] = None): check_blueprint_v1_file(blueprint) count += 1 self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - messages=[_("Successfully imported %(count)d files." % {"count": count})], - ) + TaskStatus.SUCCESSFUL, _("Successfully imported %(count)d files." % {"count": count}) ) @@ -196,18 +189,18 @@ def apply_blueprint(self: MonitoredTask, instance_pk: str): if not valid: instance.status = BlueprintInstanceStatus.ERROR instance.save() - self.set_status(TaskResult(TaskResultStatus.ERROR, [x["event"] for x in logs])) + self.set_status(TaskStatus.ERROR, *[x["event"] for x in logs]) return applied = importer.apply() if not applied: instance.status = BlueprintInstanceStatus.ERROR instance.save() - self.set_status(TaskResult(TaskResultStatus.ERROR, "Failed to apply")) + self.set_status(TaskStatus.ERROR, "Failed to apply") return instance.status = BlueprintInstanceStatus.SUCCESSFUL instance.last_applied_hash = file_hash instance.last_applied = now() - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL)) + self.set_status(TaskStatus.SUCCESSFUL) except ( DatabaseError, ProgrammingError, @@ -218,7 +211,7 @@ def apply_blueprint(self: MonitoredTask, instance_pk: str): ) as exc: if instance: instance.status = BlueprintInstanceStatus.ERROR - self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + self.set_error(exc) finally: if instance: instance.save() diff --git a/authentik/core/tasks.py b/authentik/core/tasks.py index 0b4c839e8..788cbc25e 100644 --- a/authentik/core/tasks.py +++ b/authentik/core/tasks.py @@ -13,12 +13,7 @@ from authentik.core.models import ( ExpiringModel, User, ) -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task from authentik.root.celery import CELERY_APP LOGGER = get_logger() @@ -54,7 +49,7 @@ def clean_expired_models(self: MonitoredTask): amount += 1 LOGGER.debug("Expired sessions", model=AuthenticatedSession, amount=amount) messages.append(f"Expired {amount} {AuthenticatedSession._meta.verbose_name_plural}") - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages)) + self.set_status(TaskStatus.SUCCESSFUL, *messages) @CELERY_APP.task(bind=True, base=MonitoredTask) @@ -75,4 +70,4 @@ def clean_temporary_users(self: MonitoredTask): user.delete() deleted_users += 1 messages.append(f"Successfully deleted {deleted_users} users.") - self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages)) + self.set_status(TaskStatus.SUCCESSFUL, *messages) diff --git a/authentik/crypto/tasks.py b/authentik/crypto/tasks.py index 4a660ee81..02fda31a6 100644 --- a/authentik/crypto/tasks.py +++ b/authentik/crypto/tasks.py @@ -9,12 +9,8 @@ from django.utils.translation import gettext_lazy as _ from structlog.stdlib import get_logger from authentik.crypto.models import CertificateKeyPair -from authentik.events.monitored_tasks import ( - MonitoredTask, - TaskResult, - TaskResultStatus, - prefill_task, -) +from authentik.events.models import TaskStatus +from authentik.events.monitored_tasks import MonitoredTask, prefill_task from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -88,8 +84,5 @@ def certificate_discovery(self: MonitoredTask): if dirty: cert.save() self.set_status( - TaskResult( - TaskResultStatus.SUCCESSFUL, - messages=[_("Successfully imported %(count)d files." % {"count": discovered})], - ) + TaskStatus.SUCCESSFUL, _("Successfully imported %(count)d files." % {"count": discovered}) ) diff --git a/authentik/events/api/tasks.py b/authentik/events/api/tasks.py new file mode 100644 index 000000000..f8482fecf --- /dev/null +++ b/authentik/events/api/tasks.py @@ -0,0 +1,101 @@ +"""Tasks API""" +from importlib import import_module + +from django.contrib import messages +from django.utils.translation import gettext_lazy as _ +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema +from rest_framework.decorators import action +from rest_framework.fields import ( + CharField, + ChoiceField, + DateTimeField, + ListField, + SerializerMethodField, +) +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.serializers import ModelSerializer +from rest_framework.viewsets import ReadOnlyModelViewSet +from structlog.stdlib import get_logger + +from authentik.api.decorators import permission_required +from authentik.events.models import SystemTask, TaskStatus + +LOGGER = get_logger() + + +class SystemTaskSerializer(ModelSerializer): + """Serialize TaskInfo and TaskResult""" + + task_name = CharField(source="name") + task_description = CharField(source="description") + task_start_timestamp = DateTimeField(source="start_timestamp") + task_finish_timestamp = DateTimeField(source="finish_timestamp") + task_duration = SerializerMethodField() + + status = ChoiceField( + # source="status", + choices=[(x.name, x.name) for x in TaskStatus], + ) + messages = ListField(child=CharField()) + + def get_task_duration(self, instance: SystemTask) -> int: + """Get the duration a task took to run""" + return max(instance.finish_timestamp.timestamp() - instance.start_timestamp.timestamp(), 0) + + class Meta: + model = SystemTask + fields = [ + "task_name", + "task_description", + "task_start_timestamp", + "task_finish_timestamp", + "task_duration", + "status", + "messages", + ] + + +class SystemTaskViewSet(ReadOnlyModelViewSet): + """Read-only view set that returns all background tasks""" + + queryset = SystemTask.objects.all() + serializer_class = SystemTaskSerializer + + @permission_required(None, ["authentik_events.rerun_task"]) + @extend_schema( + request=OpenApiTypes.NONE, + responses={ + 204: OpenApiResponse(description="Task retried successfully"), + 404: OpenApiResponse(description="Task not found"), + 500: OpenApiResponse(description="Failed to retry task"), + }, + parameters=[ + OpenApiParameter( + "id", + type=OpenApiTypes.STR, + location=OpenApiParameter.PATH, + required=True, + ), + ], + ) + @action(detail=True, methods=["post"]) + def retry(self, request: Request, pk=None) -> Response: + """Retry task""" + task = self.get_object() + try: + task_module = import_module(task.task_call_module) + task_func = getattr(task_module, task.task_call_func) + LOGGER.debug("Running task", task=task_func) + task_func.delay(*task.task_call_args, **task.task_call_kwargs) + messages.success( + self.request, + _("Successfully re-scheduled Task %(name)s!" % {"name": task.task_name}), + ) + return Response(status=204) + except (ImportError, AttributeError): # pragma: no cover + LOGGER.warning("Failed to run task, remove state", task=task) + # if we get an import error, the module path has probably changed + task.delete() + return Response(status=500) diff --git a/authentik/events/migrations/0003_systemtask.py b/authentik/events/migrations/0003_systemtask.py new file mode 100644 index 000000000..feaa0cf84 --- /dev/null +++ b/authentik/events/migrations/0003_systemtask.py @@ -0,0 +1,55 @@ +# Generated by Django 5.0.1 on 2024-01-13 19:38 + +import uuid + +from django.db import migrations, models + +import authentik.core.models + + +class Migration(migrations.Migration): + dependencies = [ + ("authentik_events", "0002_alter_notificationtransport_mode"), + ] + + operations = [ + migrations.CreateModel( + name="SystemTask", + fields=[ + ( + "expires", + models.DateTimeField(default=authentik.core.models.default_token_duration), + ), + ("expiring", models.BooleanField(default=True)), + ( + "uuid", + models.UUIDField( + default=uuid.uuid4, editable=False, primary_key=True, serialize=False + ), + ), + ("name", models.TextField()), + ("uid", models.TextField(null=True)), + ("start_timestamp", models.DateTimeField(auto_now_add=True)), + ("finish_timestamp", models.DateTimeField(auto_now=True)), + ( + "status", + models.PositiveIntegerField( + choices=[(1, "Successful"), (2, "Warning"), (4, "Error"), (8, "Unknown")] + ), + ), + ("description", models.TextField(null=True)), + ("messages", models.JSONField()), + ("task_call_module", models.TextField()), + ("task_call_func", models.TextField()), + ("task_call_args", models.JSONField(default=list)), + ("task_call_kwargs", models.JSONField(default=dict)), + ], + options={ + "verbose_name": "System Task", + "verbose_name_plural": "System Tasks", + "permissions": [("rerun_task", "Rerun task")], + "default_permissions": ["view"], + "unique_together": {("name", "uid")}, + }, + ), + ] diff --git a/authentik/events/models.py b/authentik/events/models.py index 240ba11de..1cddeec39 100644 --- a/authentik/events/models.py +++ b/authentik/events/models.py @@ -4,7 +4,7 @@ from collections import Counter from datetime import timedelta from inspect import currentframe from smtplib import SMTPException -from typing import TYPE_CHECKING, Optional +from typing import Optional from uuid import uuid4 from django.db import models @@ -18,6 +18,7 @@ from django.http.request import QueryDict from django.utils.timezone import now from django.utils.translation import gettext as _ from requests import RequestException +from rest_framework.serializers import Serializer from structlog.stdlib import get_logger from authentik import get_full_version @@ -26,6 +27,7 @@ from authentik.core.middleware import ( SESSION_KEY_IMPERSONATE_USER, ) from authentik.core.models import ExpiringModel, Group, PropertyMapping, User +from authentik.events.apps import GAUGE_TASKS from authentik.events.context_processors.base import get_context_processors from authentik.events.utils import ( cleanse_dict, @@ -45,8 +47,6 @@ from authentik.tenants.models import Tenant from authentik.tenants.utils import DEFAULT_TENANT LOGGER = get_logger() -if TYPE_CHECKING: - from rest_framework.serializers import Serializer def default_event_duration(): @@ -267,7 +267,7 @@ class Event(SerializerModel, ExpiringModel): super().save(*args, **kwargs) @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.events import EventSerializer return EventSerializer @@ -475,7 +475,7 @@ class NotificationTransport(SerializerModel): raise NotificationTransportError(exc) from exc @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.notification_transports import NotificationTransportSerializer return NotificationTransportSerializer @@ -508,7 +508,7 @@ class Notification(SerializerModel): user = models.ForeignKey(User, on_delete=models.CASCADE) @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.notifications import NotificationSerializer return NotificationSerializer @@ -551,7 +551,7 @@ class NotificationRule(SerializerModel, PolicyBindingModel): ) @property - def serializer(self) -> "Serializer": + def serializer(self) -> type[Serializer]: from authentik.events.api.notification_rules import NotificationRuleSerializer return NotificationRuleSerializer @@ -572,7 +572,7 @@ class NotificationWebhookMapping(PropertyMapping): return "ak-property-mapping-notification-form" @property - def serializer(self) -> type["Serializer"]: + def serializer(self) -> type[type[Serializer]]: from authentik.events.api.notification_mappings import NotificationWebhookMappingSerializer return NotificationWebhookMappingSerializer @@ -583,3 +583,59 @@ class NotificationWebhookMapping(PropertyMapping): class Meta: verbose_name = _("Webhook Mapping") verbose_name_plural = _("Webhook Mappings") + + +class TaskStatus(models.IntegerChoices): + """Possible states of tasks""" + + SUCCESSFUL = 1 + WARNING = 2 + ERROR = 4 + UNKNOWN = 8 + + +class SystemTask(SerializerModel, ExpiringModel): + """Info about a system task running in the background along with details to restart the task""" + + uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) + name = models.TextField() + uid = models.TextField(null=True) + + start_timestamp = models.DateTimeField(auto_now_add=True) + finish_timestamp = models.DateTimeField(auto_now=True) + + status = models.PositiveIntegerField(choices=TaskStatus.choices) + + description = models.TextField(null=True) + messages = models.JSONField() + + task_call_module = models.TextField() + task_call_func = models.TextField() + task_call_args = models.JSONField(default=list) + task_call_kwargs = models.JSONField(default=dict) + + @property + def serializer(self) -> type[Serializer]: + from authentik.events.api.tasks import SystemTaskSerializer + + return SystemTaskSerializer + + def update_metrics(self): + """Update prometheus metrics""" + duration = max(self.finish_timestamp.timestamp() - self.start_timestamp.timestamp(), 0) + GAUGE_TASKS.labels( + task_name=self.name.split(":")[0], + task_uid=self.uid or "", + status=self.status.name.lower(), + ).set(duration) + + def __str__(self) -> str: + return f"System Task {self.name}" + + class Meta: + unique_together = (("name", "uid"),) + # Remove "add", "change" and "delete" permissions as those are not used + default_permissions = ["view"] + permissions = [("rerun_task", _("Rerun task"))] + verbose_name = _("System Task") + verbose_name_plural = _("System Tasks") diff --git a/authentik/events/monitored_tasks.py b/authentik/events/monitored_tasks.py index 70f59f610..3ec5ab3a2 100644 --- a/authentik/events/monitored_tasks.py +++ b/authentik/events/monitored_tasks.py @@ -1,115 +1,18 @@ """Monitored tasks""" -from dataclasses import dataclass, field -from datetime import datetime -from enum import Enum +from datetime import datetime, timedelta, timezone from timeit import default_timer from typing import Any, Optional from celery import Task -from django.core.cache import cache +from django.db import DatabaseError, InternalError, ProgrammingError +from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from structlog.stdlib import get_logger -from authentik.events.apps import GAUGE_TASKS -from authentik.events.models import Event, EventAction +from authentik.events.models import Event, EventAction, SystemTask, TaskStatus from authentik.lib.utils.errors import exception_to_string LOGGER = get_logger() -CACHE_KEY_PREFIX = "goauthentik.io/events/tasks/" - - -class TaskResultStatus(Enum): - """Possible states of tasks""" - - SUCCESSFUL = 1 - WARNING = 2 - ERROR = 4 - UNKNOWN = 8 - - -@dataclass -class TaskResult: - """Result of a task run, this class is created by the task itself - and used by self.set_status""" - - status: TaskResultStatus - - messages: list[str] = field(default_factory=list) - - # Optional UID used in cache for tasks that run in different instances - uid: Optional[str] = field(default=None) - - def with_error(self, exc: Exception) -> "TaskResult": - """Since errors might not always be pickle-able, set the traceback""" - # TODO: Mark exception somehow so that is rendered as
 in frontend
-        self.messages.append(exception_to_string(exc))
-        return self
-
-
-@dataclass
-class TaskInfo:
-    """Info about a task run"""
-
-    task_name: str
-    start_timestamp: float
-    finish_timestamp: float
-    finish_time: datetime
-
-    result: TaskResult
-
-    task_call_module: str
-    task_call_func: str
-    task_call_args: list[Any] = field(default_factory=list)
-    task_call_kwargs: dict[str, Any] = field(default_factory=dict)
-
-    task_description: Optional[str] = field(default=None)
-
-    @staticmethod
-    def all() -> dict[str, "TaskInfo"]:
-        """Get all TaskInfo objects"""
-        return cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*"))
-
-    @staticmethod
-    def by_name(name: str) -> Optional["TaskInfo"] | Optional[list["TaskInfo"]]:
-        """Get TaskInfo Object by name"""
-        if "*" in name:
-            return cache.get_many(cache.keys(CACHE_KEY_PREFIX + name)).values()
-        return cache.get(CACHE_KEY_PREFIX + name, None)
-
-    @property
-    def full_name(self) -> str:
-        """Get the full cache key with task name and UID"""
-        key = CACHE_KEY_PREFIX + self.task_name
-        if self.result.uid:
-            uid_suffix = f":{self.result.uid}"
-            key += uid_suffix
-            if not self.task_name.endswith(uid_suffix):
-                self.task_name += uid_suffix
-        return key
-
-    def delete(self):
-        """Delete task info from cache"""
-        return cache.delete(self.full_name)
-
-    def update_metrics(self):
-        """Update prometheus metrics"""
-        start = default_timer()
-        if hasattr(self, "start_timestamp"):
-            start = self.start_timestamp
-        try:
-            duration = max(self.finish_timestamp - start, 0)
-        except TypeError:
-            duration = 0
-        GAUGE_TASKS.labels(
-            task_name=self.task_name.split(":")[0],
-            task_uid=self.result.uid or "",
-            status=self.result.status.name.lower(),
-        ).set(duration)
-
-    def save(self, timeout_hours=6):
-        """Save task into cache"""
-        self.update_metrics()
-        cache.set(self.full_name, self, timeout=timeout_hours * 60 * 60)
 
 
 class MonitoredTask(Task):
@@ -118,73 +21,94 @@ class MonitoredTask(Task):
     # For tasks that should only be listed if they failed, set this to False
     save_on_success: bool
 
-    _result: Optional[TaskResult]
+    _status: Optional[TaskStatus]
+    _messages: list[str]
 
     _uid: Optional[str]
-    start: Optional[float] = None
+    _start: Optional[float] = None
 
     def __init__(self, *args, **kwargs) -> None:
         super().__init__(*args, **kwargs)
         self.save_on_success = True
         self._uid = None
-        self._result = None
+        self._status = None
+        self._messages = []
         self.result_timeout_hours = 6
 
     def set_uid(self, uid: str):
         """Set UID, so in the case of an unexpected error its saved correctly"""
         self._uid = uid
 
-    def set_status(self, result: TaskResult):
+    def set_status(self, status: TaskStatus, *messages: str):
         """Set result for current run, will overwrite previous result."""
-        self._result = result
+        self._status = status
+        self._messages = messages
+
+    def set_error(self, exception: Exception):
+        """Set result to error and save exception"""
+        self._status = TaskStatus.ERROR
+        self._messages = [exception_to_string(exception)]
 
     def before_start(self, task_id, args, kwargs):
-        self.start = default_timer()
+        self._start = default_timer()
         return super().before_start(task_id, args, kwargs)
 
     # pylint: disable=too-many-arguments
     def after_return(self, status, retval, task_id, args: list[Any], kwargs: dict[str, Any], einfo):
         super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)
-        if not self._result:
+        if not self._status:
             return
-        if not self._result.uid:
-            self._result.uid = self._uid
-        info = TaskInfo(
-            task_name=self.__name__,
-            task_description=self.__doc__,
-            start_timestamp=self.start or default_timer(),
-            finish_timestamp=default_timer(),
-            finish_time=datetime.now(),
-            result=self._result,
-            task_call_module=self.__module__,
-            task_call_func=self.__name__,
-            task_call_args=args,
-            task_call_kwargs=kwargs,
+        if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success:
+            SystemTask.objects.filter(
+                name=self.__name__,
+                uid=self._uid,
+            ).delete()
+            return
+        SystemTask.objects.update_or_create(
+            name=self.__name__,
+            uid=self._uid,
+            defaults={
+                "description": self.__doc__,
+                "start_timestamp": datetime.fromtimestamp(
+                    self._start or default_timer(), tz=timezone.utc
+                ),
+                "finish_timestamp": datetime.fromtimestamp(default_timer(), tz=timezone.utc),
+                "task_call_module": self.__module__,
+                "task_call_func": self.__name__,
+                "task_call_args": args,
+                "task_call_kwargs": kwargs,
+                "status": self._status,
+                "messages": self._messages,
+                "expires": now() + timedelta(hours=self.result_timeout_hours),
+                "expiring": True,
+            },
         )
-        if self._result.status == TaskResultStatus.SUCCESSFUL and not self.save_on_success:
-            info.delete()
-            return
-        info.save(self.result_timeout_hours)
 
     # pylint: disable=too-many-arguments
     def on_failure(self, exc, task_id, args, kwargs, einfo):
         super().on_failure(exc, task_id, args, kwargs, einfo=einfo)
-        if not self._result:
-            self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[str(exc)])
-        if not self._result.uid:
-            self._result.uid = self._uid
-        TaskInfo(
-            task_name=self.__name__,
-            task_description=self.__doc__,
-            start_timestamp=self.start or default_timer(),
-            finish_timestamp=default_timer(),
-            finish_time=datetime.now(),
-            result=self._result,
-            task_call_module=self.__module__,
-            task_call_func=self.__name__,
-            task_call_args=args,
-            task_call_kwargs=kwargs,
-        ).save(self.result_timeout_hours)
+        if not self._status:
+            self._status = TaskStatus.ERROR
+            self._messages = exception_to_string(exc)
+        SystemTask.objects.update_or_create(
+            name=self.__name__,
+            uid=self._uid,
+            defaults={
+                "description": self.__doc__,
+                "start_timestamp": datetime.fromtimestamp(
+                    self._start or default_timer(), tz=timezone.utc
+                ),
+                "finish_timestamp": datetime.fromtimestamp(default_timer(), tz=timezone.utc),
+                "task_call_module": self.__module__,
+                "task_call_func": self.__name__,
+                "task_call_args": args,
+                "task_call_kwargs": kwargs,
+                "status": self._status,
+                "messages": self._messages,
+                "expires": now() + timedelta(hours=self.result_timeout_hours),
+                "expiring": True,
+            },
+        )
         Event.new(
             EventAction.SYSTEM_TASK_EXCEPTION,
             message=f"Task {self.__name__} encountered an error: {exception_to_string(exc)}",
@@ -196,19 +120,20 @@ class MonitoredTask(Task):
 
 def prefill_task(func):
     """Ensure a task's details are always in cache, so it can always be triggered via API"""
-    status = TaskInfo.by_name(func.__name__)
+    try:
+        status = SystemTask.objects.filter(name=func.__name__).first()
+    except (DatabaseError, InternalError, ProgrammingError):
+        return func
     if status:
         return func
-    TaskInfo(
-        task_name=func.__name__,
-        task_description=func.__doc__,
-        result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]),
+    SystemTask.objects.create(
+        name=func.__name__,
+        description=func.__doc__,
+        status=TaskStatus.UNKNOWN,
+        messages=[_("Task has not been run yet.")],
         task_call_module=func.__module__,
         task_call_func=func.__name__,
-        # We don't have real values for these attributes but they cannot be null
-        start_timestamp=0,
-        finish_timestamp=0,
-        finish_time=datetime.now(),
-    ).save(86400)
+        expiring=False,
+    )
     LOGGER.debug("prefilled task", task_name=func.__name__)
     return func
diff --git a/authentik/events/signals.py b/authentik/events/signals.py
index d0b7ec06a..dcec5eb42 100644
--- a/authentik/events/signals.py
+++ b/authentik/events/signals.py
@@ -8,12 +8,13 @@ from django.http import HttpRequest
 
 from authentik.core.models import User
 from authentik.core.signals import login_failed, password_changed
-from authentik.events.models import Event, EventAction
+from authentik.events.models import Event, EventAction, SystemTask
 from authentik.events.tasks import event_notification_handler, gdpr_cleanup
 from authentik.flows.models import Stage
 from authentik.flows.planner import PLAN_CONTEXT_SOURCE, FlowPlan
 from authentik.flows.views.executor import SESSION_KEY_PLAN
 from authentik.lib.config import CONFIG
+from authentik.root.monitoring import monitoring_set
 from authentik.stages.invitation.models import Invitation
 from authentik.stages.invitation.signals import invitation_used
 from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
@@ -100,3 +101,10 @@ def event_user_pre_delete_cleanup(sender, instance: User, **_):
     """If gdpr_compliance is enabled, remove all the user's events"""
     if CONFIG.get_bool("gdpr_compliance", True):
         gdpr_cleanup.delay(instance.pk)
+
+
+@receiver(monitoring_set)
+def monitoring_system_task(sender, **_):
+    """Update metrics when task is saved"""
+    for task in SystemTask.objects.all():
+        task.update_metrics()
diff --git a/authentik/events/tasks.py b/authentik/events/tasks.py
index 9c00860f8..cdcdf9ca1 100644
--- a/authentik/events/tasks.py
+++ b/authentik/events/tasks.py
@@ -13,13 +13,9 @@ from authentik.events.models import (
     NotificationRule,
     NotificationTransport,
     NotificationTransportError,
+    TaskStatus,
 )
-from authentik.events.monitored_tasks import (
-    MonitoredTask,
-    TaskResult,
-    TaskResultStatus,
-    prefill_task,
-)
+from authentik.events.monitored_tasks import MonitoredTask, prefill_task
 from authentik.policies.engine import PolicyEngine
 from authentik.policies.models import PolicyBinding, PolicyEngineMode
 from authentik.root.celery import CELERY_APP
@@ -123,9 +119,9 @@ def notification_transport(
         if not transport:
             return
         transport.send(notification)
-        self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
+        self.set_status(TaskStatus.SUCCESSFUL)
     except (NotificationTransportError, PropertyMappingExpressionException) as exc:
-        self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
+        self.set_error(exc)
         raise exc
 
 
@@ -146,4 +142,4 @@ def notification_cleanup(self: MonitoredTask):
     for notification in notifications:
         notification.delete()
     LOGGER.debug("Expired notifications", amount=amount)
-    self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"]))
+    self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications")
diff --git a/authentik/events/tests/test_tasks.py b/authentik/events/tests/test_tasks.py
index 58dad6556..d343ebdb1 100644
--- a/authentik/events/tests/test_tasks.py
+++ b/authentik/events/tests/test_tasks.py
@@ -1,7 +1,8 @@
 """Test Monitored tasks"""
 from django.test import TestCase
 
-from authentik.events.monitored_tasks import MonitoredTask, TaskInfo, TaskResult, TaskResultStatus
+from authentik.events.models import SystemTask, TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask
 from authentik.lib.generators import generate_id
 from authentik.root.celery import CELERY_APP
 
@@ -22,22 +23,20 @@ class TestMonitoredTasks(TestCase):
         def test_task(self: MonitoredTask):
             self.save_on_success = False
             self.set_uid(uid)
-            self.set_status(
-                TaskResult(TaskResultStatus.ERROR if should_fail else TaskResultStatus.SUCCESSFUL)
-            )
+            self.set_status(TaskStatus.ERROR if should_fail else TaskStatus.SUCCESSFUL)
 
         # First test successful run
         should_fail = False
         test_task.delay().get()
-        self.assertIsNone(TaskInfo.by_name(f"test_task:{uid}"))
+        self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid))
 
         # Then test failed
         should_fail = True
         test_task.delay().get()
-        info = TaskInfo.by_name(f"test_task:{uid}")
-        self.assertEqual(info.result.status, TaskResultStatus.ERROR)
+        info = SystemTask.objects.filter(name="test_task", uid=uid)
+        self.assertEqual(info.status, TaskStatus.ERROR)
 
         # Then after that, the state should be removed
         should_fail = False
         test_task.delay().get()
-        self.assertIsNone(TaskInfo.by_name(f"test_task:{uid}"))
+        self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid))
diff --git a/authentik/events/urls.py b/authentik/events/urls.py
index 5d7d40e5f..523435819 100644
--- a/authentik/events/urls.py
+++ b/authentik/events/urls.py
@@ -4,11 +4,13 @@ from authentik.events.api.notification_mappings import NotificationWebhookMappin
 from authentik.events.api.notification_rules import NotificationRuleViewSet
 from authentik.events.api.notification_transports import NotificationTransportViewSet
 from authentik.events.api.notifications import NotificationViewSet
+from authentik.events.api.tasks import SystemTaskViewSet
 
 api_urlpatterns = [
     ("events/events", EventViewSet),
     ("events/notifications", NotificationViewSet),
     ("events/transports", NotificationTransportViewSet),
     ("events/rules", NotificationRuleViewSet),
+    ("events/system_tasks", SystemTaskViewSet),
     ("propertymappings/notification", NotificationWebhookMappingViewSet),
 ]
diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py
index 0d4e54a3a..93121b90e 100644
--- a/authentik/outposts/tasks.py
+++ b/authentik/outposts/tasks.py
@@ -19,12 +19,8 @@ from yaml import safe_load
 
 from authentik.enterprise.providers.rac.controllers.docker import RACDockerController
 from authentik.enterprise.providers.rac.controllers.kubernetes import RACKubernetesController
-from authentik.events.monitored_tasks import (
-    MonitoredTask,
-    TaskResult,
-    TaskResultStatus,
-    prefill_task,
-)
+from authentik.events.models import TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask, prefill_task
 from authentik.lib.config import CONFIG
 from authentik.lib.utils.reflection import path_to_class
 from authentik.outposts.consumer import OUTPOST_GROUP
@@ -118,10 +114,8 @@ def outpost_service_connection_monitor(self: MonitoredTask):
     for connection in connections.iterator():
         outpost_service_connection_state.delay(connection.pk)
     self.set_status(
-        TaskResult(
-            TaskResultStatus.SUCCESSFUL,
-            [f"Successfully updated {len(connections)} connections."],
-        )
+        TaskStatus.SUCCESSFUL,
+        f"Successfully updated {len(connections)} connections.",
     )
 
 
@@ -161,11 +155,11 @@ def outpost_controller(
                 LOGGER.debug(log)
             LOGGER.debug("-----------------Outpost Controller logs end-------------------")
     except (ControllerException, ServiceConnectionInvalid) as exc:
-        self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
+        self.set_error(exc)
     else:
         if from_cache:
             cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
-        self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
+        self.set_status(TaskStatus.SUCCESSFUL, *logs)
 
 
 @CELERY_APP.task(bind=True, base=MonitoredTask)
@@ -178,10 +172,8 @@ def outpost_token_ensurer(self: MonitoredTask):
         _ = outpost.token
         outpost.build_user_permissions(outpost.user)
     self.set_status(
-        TaskResult(
-            TaskResultStatus.SUCCESSFUL,
-            [f"Successfully checked {len(all_outposts)} Outposts."],
-        )
+        TaskStatus.SUCCESSFUL,
+        f"Successfully checked {len(all_outposts)} Outposts.",
     )
 
 
@@ -261,27 +253,27 @@ def _outpost_single_update(outpost: Outpost, layer=None):
 )
 def outpost_connection_discovery(self: MonitoredTask):
     """Checks the local environment and create Service connections."""
-    status = TaskResult(TaskResultStatus.SUCCESSFUL)
+    messages = []
     if not CONFIG.get_bool("outposts.discover"):
-        status.messages.append("Outpost integration discovery is disabled")
-        self.set_status(status)
+        messages.append("Outpost integration discovery is disabled")
+        self.set_status(TaskStatus.SUCCESSFUL, *messages)
         return
     # Explicitly check against token filename, as that's
     # only present when the integration is enabled
     if Path(SERVICE_TOKEN_FILENAME).exists():
-        status.messages.append("Detected in-cluster Kubernetes Config")
+        messages.append("Detected in-cluster Kubernetes Config")
         if not KubernetesServiceConnection.objects.filter(local=True).exists():
-            status.messages.append("Created Service Connection for in-cluster")
+            messages.append("Created Service Connection for in-cluster")
             KubernetesServiceConnection.objects.create(
                 name="Local Kubernetes Cluster", local=True, kubeconfig={}
             )
     # For development, check for the existence of a kubeconfig file
     kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
     if kubeconfig_path.exists():
-        status.messages.append("Detected kubeconfig")
+        messages.append("Detected kubeconfig")
         kubeconfig_local_name = f"k8s-{gethostname()}"
         if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
-            status.messages.append("Creating kubeconfig Service Connection")
+            messages.append("Creating kubeconfig Service Connection")
             with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
                 KubernetesServiceConnection.objects.create(
                     name=kubeconfig_local_name,
@@ -290,12 +282,12 @@ def outpost_connection_discovery(self: MonitoredTask):
     unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
     socket = Path(unix_socket_path)
     if socket.exists() and access(socket, R_OK):
-        status.messages.append("Detected local docker socket")
+        messages.append("Detected local docker socket")
         if len(DockerServiceConnection.objects.filter(local=True)) == 0:
-            status.messages.append("Created Service Connection for docker")
+            messages.append("Created Service Connection for docker")
             DockerServiceConnection.objects.create(
                 name="Local Docker connection",
                 local=True,
                 url=unix_socket_path,
             )
-    self.set_status(status)
+    self.set_status(TaskStatus.SUCCESSFUL, *messages)
diff --git a/authentik/policies/reputation/tasks.py b/authentik/policies/reputation/tasks.py
index ac65d1748..0f9532bc6 100644
--- a/authentik/policies/reputation/tasks.py
+++ b/authentik/policies/reputation/tasks.py
@@ -4,12 +4,8 @@ from structlog.stdlib import get_logger
 
 from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
 from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
-from authentik.events.monitored_tasks import (
-    MonitoredTask,
-    TaskResult,
-    TaskResultStatus,
-    prefill_task,
-)
+from authentik.events.models import TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask, prefill_task
 from authentik.policies.reputation.models import Reputation
 from authentik.policies.reputation.signals import CACHE_KEY_PREFIX
 from authentik.root.celery import CELERY_APP
@@ -32,4 +28,4 @@ def save_reputation(self: MonitoredTask):
         rep.score = score["score"]
         objects_to_update.append(rep)
     Reputation.objects.bulk_update(objects_to_update, ["score", "ip_geo_data"])
-    self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated Reputation"]))
+    self.set_status(TaskStatus.SUCCESSFUL, "Successfully updated Reputation")
diff --git a/authentik/providers/scim/api/providers.py b/authentik/providers/scim/api/providers.py
index ddca8cfef..1b095c7bb 100644
--- a/authentik/providers/scim/api/providers.py
+++ b/authentik/providers/scim/api/providers.py
@@ -1,17 +1,17 @@
 """SCIM Provider API Views"""
 from django.utils.text import slugify
 from drf_spectacular.utils import OpenApiResponse, extend_schema
+from guardian.shortcuts import get_objects_for_user
 from rest_framework.decorators import action
 from rest_framework.fields import BooleanField
 from rest_framework.request import Request
 from rest_framework.response import Response
 from rest_framework.viewsets import ModelViewSet
 
-from authentik.admin.api.tasks import TaskSerializer
 from authentik.core.api.providers import ProviderSerializer
 from authentik.core.api.used_by import UsedByMixin
 from authentik.core.api.utils import PassiveSerializer
-from authentik.events.monitored_tasks import TaskInfo
+from authentik.events.api.tasks import SystemTaskSerializer
 from authentik.providers.scim.models import SCIMProvider
 
 
@@ -43,7 +43,7 @@ class SCIMSyncStatusSerializer(PassiveSerializer):
     """SCIM Provider sync status"""
 
     is_running = BooleanField(read_only=True)
-    tasks = TaskSerializer(many=True, read_only=True)
+    tasks = SystemTaskSerializer(many=True, read_only=True)
 
 
 class SCIMProviderViewSet(UsedByMixin, ModelViewSet):
@@ -65,8 +65,12 @@ class SCIMProviderViewSet(UsedByMixin, ModelViewSet):
     def sync_status(self, request: Request, pk: int) -> Response:
         """Get provider's sync status"""
         provider: SCIMProvider = self.get_object()
-        task = TaskInfo.by_name(f"scim_sync:{slugify(provider.name)}")
-        tasks = [task] if task else []
+        tasks = list(
+            get_objects_for_user(request.user, "authentik_events.view_systemtask").filter(
+                name="scim_sync",
+                uid=slugify(provider.name),
+            )
+        )
         status = {
             "tasks": tasks,
             "is_running": provider.sync_lock.locked(),
diff --git a/authentik/providers/scim/tasks.py b/authentik/providers/scim/tasks.py
index 7b16b293e..e88e2c814 100644
--- a/authentik/providers/scim/tasks.py
+++ b/authentik/providers/scim/tasks.py
@@ -10,7 +10,8 @@ from pydanticscim.responses import PatchOp
 from structlog.stdlib import get_logger
 
 from authentik.core.models import Group, User
-from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
+from authentik.events.models import TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask
 from authentik.lib.utils.reflection import path_to_class
 from authentik.providers.scim.clients import PAGE_SIZE, PAGE_TIMEOUT
 from authentik.providers.scim.clients.base import SCIMClient
@@ -52,8 +53,8 @@ def scim_sync(self: MonitoredTask, provider_pk: int) -> None:
         LOGGER.debug("SCIM sync locked, skipping task", source=provider.name)
         return
     self.set_uid(slugify(provider.name))
-    result = TaskResult(TaskResultStatus.SUCCESSFUL, [])
-    result.messages.append(_("Starting full SCIM sync"))
+    messages = []
+    messages.append(_("Starting full SCIM sync"))
     LOGGER.debug("Starting SCIM sync")
     users_paginator = Paginator(provider.get_user_qs(), PAGE_SIZE)
     groups_paginator = Paginator(provider.get_group_qs(), PAGE_SIZE)
@@ -63,17 +64,17 @@ def scim_sync(self: MonitoredTask, provider_pk: int) -> None:
     with allow_join_result():
         try:
             for page in users_paginator.page_range:
-                result.messages.append(_("Syncing page %(page)d of users" % {"page": page}))
+                messages.append(_("Syncing page %(page)d of users" % {"page": page}))
                 for msg in scim_sync_users.delay(page, provider_pk).get():
-                    result.messages.append(msg)
+                    messages.append(msg)
             for page in groups_paginator.page_range:
-                result.messages.append(_("Syncing page %(page)d of groups" % {"page": page}))
+                messages.append(_("Syncing page %(page)d of groups" % {"page": page}))
                 for msg in scim_sync_group.delay(page, provider_pk).get():
-                    result.messages.append(msg)
+                    messages.append(msg)
         except StopSync as exc:
-            self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
+            self.set_error(exc)
             return
-    self.set_status(result)
+    self.set_status(TaskStatus.SUCCESSFUL, *messages)
 
 
 @CELERY_APP.task(
diff --git a/authentik/sources/ldap/api.py b/authentik/sources/ldap/api.py
index 08e530bbb..e4bdf79d7 100644
--- a/authentik/sources/ldap/api.py
+++ b/authentik/sources/ldap/api.py
@@ -6,6 +6,7 @@ from django_filters.filters import AllValuesMultipleFilter
 from django_filters.filterset import FilterSet
 from drf_spectacular.types import OpenApiTypes
 from drf_spectacular.utils import extend_schema, extend_schema_field, inline_serializer
+from guardian.shortcuts import get_objects_for_user
 from rest_framework.decorators import action
 from rest_framework.exceptions import ValidationError
 from rest_framework.fields import BooleanField, DictField, ListField, SerializerMethodField
@@ -14,13 +15,12 @@ from rest_framework.request import Request
 from rest_framework.response import Response
 from rest_framework.viewsets import ModelViewSet
 
-from authentik.admin.api.tasks import TaskSerializer
 from authentik.core.api.propertymappings import PropertyMappingSerializer
 from authentik.core.api.sources import SourceSerializer
 from authentik.core.api.used_by import UsedByMixin
 from authentik.core.api.utils import PassiveSerializer
 from authentik.crypto.models import CertificateKeyPair
-from authentik.events.monitored_tasks import TaskInfo
+from authentik.events.api.tasks import SystemTaskSerializer
 from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
 from authentik.sources.ldap.tasks import CACHE_KEY_STATUS, SYNC_CLASSES
 
@@ -91,7 +91,7 @@ class LDAPSyncStatusSerializer(PassiveSerializer):
     """LDAP Source sync status"""
 
     is_running = BooleanField(read_only=True)
-    tasks = TaskSerializer(many=True, read_only=True)
+    tasks = SystemTaskSerializer(many=True, read_only=True)
 
 
 class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
@@ -136,7 +136,12 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
     def sync_status(self, request: Request, slug: str) -> Response:
         """Get source's sync status"""
         source: LDAPSource = self.get_object()
-        tasks = TaskInfo.by_name(f"ldap_sync:{source.slug}:*") or []
+        tasks = list(
+            get_objects_for_user(request.user, "authentik_events.view_systemtask").filter(
+                name="ldap_sync",
+                uid__startswith=source.slug,
+            )
+        )
         status = {
             "tasks": tasks,
             "is_running": source.sync_lock.locked(),
diff --git a/authentik/sources/ldap/sync/vendor/freeipa.py b/authentik/sources/ldap/sync/vendor/freeipa.py
index d0bce0584..d8014f716 100644
--- a/authentik/sources/ldap/sync/vendor/freeipa.py
+++ b/authentik/sources/ldap/sync/vendor/freeipa.py
@@ -1,9 +1,7 @@
 """FreeIPA specific"""
-from datetime import datetime
+from datetime import datetime, timezone
 from typing import Any, Generator
 
-from pytz import UTC
-
 from authentik.core.models import User
 from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer, flatten
 
@@ -27,7 +25,7 @@ class FreeIPA(BaseLDAPSynchronizer):
         if "krbLastPwdChange" not in attributes:
             return
         pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now())
-        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
+        pwd_last_set = pwd_last_set.replace(tzinfo=timezone.utc)
         if created or pwd_last_set >= user.password_change_date:
             self.message(f"'{user.username}': Reset user's password")
             self._logger.debug(
diff --git a/authentik/sources/ldap/sync/vendor/ms_ad.py b/authentik/sources/ldap/sync/vendor/ms_ad.py
index 84b8fc7df..342b104d9 100644
--- a/authentik/sources/ldap/sync/vendor/ms_ad.py
+++ b/authentik/sources/ldap/sync/vendor/ms_ad.py
@@ -1,10 +1,8 @@
 """Active Directory specific"""
-from datetime import datetime
+from datetime import datetime, timezone
 from enum import IntFlag
 from typing import Any, Generator
 
-from pytz import UTC
-
 from authentik.core.models import User
 from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
 
@@ -58,7 +56,7 @@ class MicrosoftActiveDirectory(BaseLDAPSynchronizer):
         if "pwdLastSet" not in attributes:
             return
         pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
-        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
+        pwd_last_set = pwd_last_set.replace(tzinfo=timezone.utc)
         if created or pwd_last_set >= user.password_change_date:
             self.message(f"'{user.username}': Reset user's password")
             self._logger.debug(
diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py
index 7f00d6bb3..dcf6bff5d 100644
--- a/authentik/sources/ldap/tasks.py
+++ b/authentik/sources/ldap/tasks.py
@@ -8,8 +8,8 @@ from ldap3.core.exceptions import LDAPException
 from redis.exceptions import LockError
 from structlog.stdlib import get_logger
 
-from authentik.events.monitored_tasks import CACHE_KEY_PREFIX as CACHE_KEY_PREFIX_TASKS
-from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
+from authentik.events.models import SystemTask, TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask
 from authentik.lib.config import CONFIG
 from authentik.lib.utils.errors import exception_to_string
 from authentik.lib.utils.reflection import class_to_path, path_to_class
@@ -69,8 +69,7 @@ def ldap_sync_single(source_pk: str):
     try:
         with lock:
             # Delete all sync tasks from the cache
-            keys = cache.keys(f"{CACHE_KEY_PREFIX_TASKS}ldap_sync:{source.slug}*")
-            cache.delete_many(keys)
+            SystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete()
             task = chain(
                 # User and group sync can happen at once, they have no dependencies on each other
                 group(
@@ -127,20 +126,18 @@ def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str, page_cache_k
                 + "Try increasing ldap.task_timeout_hours"
             )
             LOGGER.warning(error_message)
-            self.set_status(TaskResult(TaskResultStatus.ERROR, [error_message]))
+            self.set_status(TaskStatus.ERROR, error_message)
             return
         cache.touch(page_cache_key)
         count = sync_inst.sync(page)
         messages = sync_inst.messages
         messages.append(f"Synced {count} objects.")
         self.set_status(
-            TaskResult(
-                TaskResultStatus.SUCCESSFUL,
-                messages,
-            )
+            TaskStatus.SUCCESSFUL,
+            *messages,
         )
         cache.delete(page_cache_key)
     except LDAPException as exc:
         # No explicit event is created here as .set_status with an error will do that
         LOGGER.warning(exception_to_string(exc))
-        self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
+        self.set_error(exc)
diff --git a/authentik/sources/ldap/tests/test_sync.py b/authentik/sources/ldap/tests/test_sync.py
index 13035ac2b..d5c8372c4 100644
--- a/authentik/sources/ldap/tests/test_sync.py
+++ b/authentik/sources/ldap/tests/test_sync.py
@@ -7,8 +7,8 @@ from django.test import TestCase
 from authentik.blueprints.tests import apply_blueprint
 from authentik.core.models import Group, User
 from authentik.core.tests.utils import create_test_admin_user
-from authentik.events.models import Event, EventAction
-from authentik.events.monitored_tasks import TaskInfo, TaskResultStatus
+from authentik.events.models import Event, EventAction, SystemTask
+from authentik.events.monitored_tasks import TaskStatus
 from authentik.lib.generators import generate_id, generate_key
 from authentik.lib.utils.reflection import class_to_path
 from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
@@ -41,8 +41,8 @@ class LDAPSyncTests(TestCase):
         connection = MagicMock(return_value=mock_ad_connection(LDAP_PASSWORD))
         with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
             ldap_sync.delay(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo").get()
-        status = TaskInfo.by_name("ldap_sync:ldap:users:foo")
-        self.assertEqual(status.result.status, TaskResultStatus.ERROR)
+        task = SystemTask.objects.filter(name="ldap_sync", uid="ldap:users:foo").first()
+        self.assertEqual(task.status, TaskStatus.ERROR)
 
     def test_sync_error(self):
         """Test user sync"""
diff --git a/authentik/sources/oauth/tasks.py b/authentik/sources/oauth/tasks.py
index 6197df512..caa284729 100644
--- a/authentik/sources/oauth/tasks.py
+++ b/authentik/sources/oauth/tasks.py
@@ -4,7 +4,8 @@ from json import dumps
 from requests import RequestException
 from structlog.stdlib import get_logger
 
-from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
+from authentik.events.models import TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask
 from authentik.lib.utils.http import get_http_session
 from authentik.root.celery import CELERY_APP
 from authentik.sources.oauth.models import OAuthSource
@@ -16,7 +17,7 @@ LOGGER = get_logger()
 def update_well_known_jwks(self: MonitoredTask):
     """Update OAuth sources' config from well_known, and JWKS info from the configured URL"""
     session = get_http_session()
-    result = TaskResult(TaskResultStatus.SUCCESSFUL, [])
+    messages = []
     for source in OAuthSource.objects.all().exclude(oidc_well_known_url=""):
         try:
             well_known_config = session.get(source.oidc_well_known_url)
@@ -24,7 +25,7 @@ def update_well_known_jwks(self: MonitoredTask):
         except RequestException as exc:
             text = exc.response.text if exc.response else str(exc)
             LOGGER.warning("Failed to update well_known", source=source, exc=exc, text=text)
-            result.messages.append(f"Failed to update OIDC configuration for {source.slug}")
+            messages.append(f"Failed to update OIDC configuration for {source.slug}")
             continue
         config = well_known_config.json()
         try:
@@ -47,7 +48,7 @@ def update_well_known_jwks(self: MonitoredTask):
                 source=source,
                 exc=exc,
             )
-            result.messages.append(f"Failed to update OIDC configuration for {source.slug}")
+            messages.append(f"Failed to update OIDC configuration for {source.slug}")
             continue
         if dirty:
             LOGGER.info("Updating sources' OpenID Configuration", source=source)
@@ -60,11 +61,11 @@ def update_well_known_jwks(self: MonitoredTask):
         except RequestException as exc:
             text = exc.response.text if exc.response else str(exc)
             LOGGER.warning("Failed to update JWKS", source=source, exc=exc, text=text)
-            result.messages.append(f"Failed to update JWKS for {source.slug}")
+            messages.append(f"Failed to update JWKS for {source.slug}")
             continue
         config = jwks_config.json()
         if dumps(source.oidc_jwks, sort_keys=True) != dumps(config, sort_keys=True):
             source.oidc_jwks = config
             LOGGER.info("Updating sources' JWKS", source=source)
             source.save()
-    self.set_status(result)
+    self.set_status(TaskStatus.SUCCESSFUL, *messages)
diff --git a/authentik/sources/plex/tasks.py b/authentik/sources/plex/tasks.py
index c83b43f21..0934712a6 100644
--- a/authentik/sources/plex/tasks.py
+++ b/authentik/sources/plex/tasks.py
@@ -1,8 +1,8 @@
 """Plex tasks"""
 from requests import RequestException
 
-from authentik.events.models import Event, EventAction
-from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
+from authentik.events.models import Event, EventAction, TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask
 from authentik.lib.utils.errors import exception_to_string
 from authentik.root.celery import CELERY_APP
 from authentik.sources.plex.models import PlexSource
@@ -27,16 +27,15 @@ def check_plex_token(self: MonitoredTask, source_slug: int):
     auth = PlexAuth(source, source.plex_token)
     try:
         auth.get_user_info()
-        self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Plex token is valid."]))
+        self.set_status(TaskStatus.SUCCESSFUL, "Plex token is valid.")
     except RequestException as exc:
         error = exception_to_string(exc)
         if len(source.plex_token) > 0:
             error = error.replace(source.plex_token, "$PLEX_TOKEN")
         self.set_status(
-            TaskResult(
-                TaskResultStatus.ERROR,
-                ["Plex token is invalid/an error occurred:", error],
-            )
+            TaskStatus.ERROR,
+            "Plex token is invalid/an error occurred:",
+            error,
         )
         Event.new(
             EventAction.CONFIGURATION_ERROR,
diff --git a/authentik/stages/email/tasks.py b/authentik/stages/email/tasks.py
index 6f0b6e104..0d39e4758 100644
--- a/authentik/stages/email/tasks.py
+++ b/authentik/stages/email/tasks.py
@@ -9,8 +9,8 @@ from django.core.mail.utils import DNS_NAME
 from django.utils.text import slugify
 from structlog.stdlib import get_logger
 
-from authentik.events.models import Event, EventAction
-from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
+from authentik.events.models import Event, EventAction, TaskStatus
+from authentik.events.monitored_tasks import MonitoredTask
 from authentik.root.celery import CELERY_APP
 from authentik.stages.email.models import EmailStage
 from authentik.stages.email.utils import logo_data
@@ -58,10 +58,8 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti
             stages = EmailStage.objects.filter(pk=email_stage_pk)
             if not stages.exists():
                 self.set_status(
-                    TaskResult(
-                        TaskResultStatus.WARNING,
-                        messages=["Email stage does not exist anymore. Discarding message."],
-                    )
+                    TaskStatus.WARNING,
+                    "Email stage does not exist anymore. Discarding message.",
                 )
                 return
             stage: EmailStage = stages.first()
@@ -69,7 +67,7 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti
             backend = stage.backend
         except ValueError as exc:
             LOGGER.warning("failed to get email backend", exc=exc)
-            self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
+            self.set_error(exc)
             return
         backend.open()
         # Since django's EmailMessage objects are not JSON serialisable,
@@ -97,12 +95,10 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti
             to_email=message_object.to,
         ).save()
         self.set_status(
-            TaskResult(
-                TaskResultStatus.SUCCESSFUL,
-                messages=["Successfully sent Mail."],
-            )
+            TaskStatus.SUCCESSFUL,
+            "Successfully sent Mail.",
         )
     except (SMTPException, ConnectionError, OSError) as exc:
         LOGGER.debug("Error sending email, retrying...", exc=exc)
-        self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
+        self.set_error(exc)
         raise exc
diff --git a/blueprints/schema.json b/blueprints/schema.json
index 213cb1673..cf78a683a 100644
--- a/blueprints/schema.json
+++ b/blueprints/schema.json
@@ -263,6 +263,43 @@
                             }
                         }
                     },
+                    {
+                        "type": "object",
+                        "required": [
+                            "model",
+                            "identifiers"
+                        ],
+                        "properties": {
+                            "model": {
+                                "const": "authentik_events.systemtask"
+                            },
+                            "id": {
+                                "type": "string"
+                            },
+                            "state": {
+                                "type": "string",
+                                "enum": [
+                                    "absent",
+                                    "present",
+                                    "created",
+                                    "must_created"
+                                ],
+                                "default": "present"
+                            },
+                            "conditions": {
+                                "type": "array",
+                                "items": {
+                                    "type": "boolean"
+                                }
+                            },
+                            "attrs": {
+                                "$ref": "#/$defs/model_authentik_events.systemtask"
+                            },
+                            "identifiers": {
+                                "$ref": "#/$defs/model_authentik_events.systemtask"
+                            }
+                        }
+                    },
                     {
                         "type": "object",
                         "required": [
@@ -3197,6 +3234,50 @@
             },
             "required": []
         },
+        "model_authentik_events.systemtask": {
+            "type": "object",
+            "properties": {
+                "task_name": {
+                    "type": "string",
+                    "minLength": 1,
+                    "title": "Task name"
+                },
+                "task_description": {
+                    "type": "string",
+                    "minLength": 1,
+                    "title": "Task description"
+                },
+                "task_start_timestamp": {
+                    "type": "string",
+                    "format": "date-time",
+                    "title": "Task start timestamp"
+                },
+                "task_finish_timestamp": {
+                    "type": "string",
+                    "format": "date-time",
+                    "title": "Task finish timestamp"
+                },
+                "status": {
+                    "type": "string",
+                    "enum": [
+                        "SUCCESSFUL",
+                        "WARNING",
+                        "ERROR",
+                        "UNKNOWN"
+                    ],
+                    "title": "Status"
+                },
+                "messages": {
+                    "type": "array",
+                    "items": {
+                        "type": "string",
+                        "minLength": 1
+                    },
+                    "title": "Messages"
+                }
+            },
+            "required": []
+        },
         "model_authentik_flows.flow": {
             "type": "object",
             "properties": {
@@ -3607,6 +3688,7 @@
                         "authentik_events.notification",
                         "authentik_events.notificationrule",
                         "authentik_events.notificationwebhookmapping",
+                        "authentik_events.systemtask",
                         "authentik_flows.flow",
                         "authentik_flows.flowstagebinding",
                         "authentik_outposts.dockerserviceconnection",
diff --git a/schema.yml b/schema.yml
index 85dd0e5f5..350ef914d 100644
--- a/schema.yml
+++ b/schema.yml
@@ -147,103 +147,6 @@ paths:
               schema:
                 $ref: '#/components/schemas/GenericError'
           description: ''
-  /admin/system_tasks/:
-    get:
-      operationId: admin_system_tasks_list
-      description: List system tasks
-      tags:
-      - admin
-      security:
-      - authentik: []
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/Task'
-          description: ''
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/ValidationError'
-          description: ''
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/GenericError'
-          description: ''
-  /admin/system_tasks/{id}/:
-    get:
-      operationId: admin_system_tasks_retrieve
-      description: Get a single system task
-      parameters:
-      - in: path
-        name: id
-        schema:
-          type: string
-        required: true
-      tags:
-      - admin
-      security:
-      - authentik: []
-      responses:
-        '200':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/Task'
-          description: ''
-        '404':
-          description: Task not found
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/ValidationError'
-          description: ''
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/GenericError'
-          description: ''
-  /admin/system_tasks/{id}/retry/:
-    post:
-      operationId: admin_system_tasks_retry_create
-      description: Retry task
-      parameters:
-      - in: path
-        name: id
-        schema:
-          type: string
-        required: true
-      tags:
-      - admin
-      security:
-      - authentik: []
-      responses:
-        '204':
-          description: Task retried successfully
-        '404':
-          description: Task not found
-        '500':
-          description: Failed to retry task
-        '400':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/ValidationError'
-          description: ''
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/GenericError'
-          description: ''
   /admin/version/:
     get:
       operationId: admin_version_retrieve
@@ -6932,6 +6835,133 @@ paths:
               schema:
                 $ref: '#/components/schemas/GenericError'
           description: ''
+  /events/system_tasks/:
+    get:
+      operationId: events_system_tasks_list
+      description: Read-only view set that returns all background tasks
+      parameters:
+      - name: ordering
+        required: false
+        in: query
+        description: Which field to use when ordering the results.
+        schema:
+          type: string
+      - name: page
+        required: false
+        in: query
+        description: A page number within the paginated result set.
+        schema:
+          type: integer
+      - name: page_size
+        required: false
+        in: query
+        description: Number of results to return per page.
+        schema:
+          type: integer
+      - name: search
+        required: false
+        in: query
+        description: A search term.
+        schema:
+          type: string
+      tags:
+      - events
+      security:
+      - authentik: []
+      responses:
+        '200':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/PaginatedSystemTaskList'
+          description: ''
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ValidationError'
+          description: ''
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GenericError'
+          description: ''
+  /events/system_tasks/{uuid}/:
+    get:
+      operationId: events_system_tasks_retrieve
+      description: Read-only view set that returns all background tasks
+      parameters:
+      - in: path
+        name: uuid
+        schema:
+          type: string
+          format: uuid
+        description: A UUID string identifying this System Task.
+        required: true
+      tags:
+      - events
+      security:
+      - authentik: []
+      responses:
+        '200':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/SystemTask'
+          description: ''
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ValidationError'
+          description: ''
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GenericError'
+          description: ''
+  /events/system_tasks/{uuid}/retry/:
+    post:
+      operationId: events_system_tasks_retry_create
+      description: Retry task
+      parameters:
+      - in: path
+        name: id
+        schema:
+          type: string
+        required: true
+      - in: path
+        name: uuid
+        schema:
+          type: string
+          format: uuid
+        description: A UUID string identifying this System Task.
+        required: true
+      tags:
+      - events
+      security:
+      - authentik: []
+      responses:
+        '204':
+          description: Task retried successfully
+        '404':
+          description: Task not found
+        '500':
+          description: Failed to retry task
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ValidationError'
+          description: ''
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GenericError'
+          description: ''
   /events/transports/:
     get:
       operationId: events_transports_list
@@ -18071,6 +18101,7 @@ paths:
           - authentik_events.notificationrule
           - authentik_events.notificationtransport
           - authentik_events.notificationwebhookmapping
+          - authentik_events.systemtask
           - authentik_flows.flow
           - authentik_flows.flowstagebinding
           - authentik_outposts.dockerserviceconnection
@@ -18143,6 +18174,7 @@ paths:
           * `authentik_events.notification` - Notification
           * `authentik_events.notificationrule` - Notification Rule
           * `authentik_events.notificationwebhookmapping` - Webhook Mapping
+          * `authentik_events.systemtask` - System Task
           * `authentik_flows.flow` - Flow
           * `authentik_flows.flowstagebinding` - Flow Stage Binding
           * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@@ -18365,6 +18397,7 @@ paths:
           - authentik_events.notificationrule
           - authentik_events.notificationtransport
           - authentik_events.notificationwebhookmapping
+          - authentik_events.systemtask
           - authentik_flows.flow
           - authentik_flows.flowstagebinding
           - authentik_outposts.dockerserviceconnection
@@ -18437,6 +18470,7 @@ paths:
           * `authentik_events.notification` - Notification
           * `authentik_events.notificationrule` - Notification Rule
           * `authentik_events.notificationwebhookmapping` - Webhook Mapping
+          * `authentik_events.systemtask` - System Task
           * `authentik_flows.flow` - Flow
           * `authentik_flows.flowstagebinding` - Flow Stage Binding
           * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@@ -31696,6 +31730,7 @@ components:
             * `authentik_events.notification` - Notification
             * `authentik_events.notificationrule` - Notification Rule
             * `authentik_events.notificationwebhookmapping` - Webhook Mapping
+            * `authentik_events.systemtask` - System Task
             * `authentik_flows.flow` - Flow
             * `authentik_flows.flowstagebinding` - Flow Stage Binding
             * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@@ -31896,6 +31931,7 @@ components:
             * `authentik_events.notification` - Notification
             * `authentik_events.notificationrule` - Notification Rule
             * `authentik_events.notificationwebhookmapping` - Webhook Mapping
+            * `authentik_events.systemtask` - System Task
             * `authentik_flows.flow` - Flow
             * `authentik_flows.flowstagebinding` - Flow Stage Binding
             * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@@ -34047,7 +34083,7 @@ components:
         tasks:
           type: array
           items:
-            $ref: '#/components/schemas/Task'
+            $ref: '#/components/schemas/SystemTask'
           readOnly: true
       required:
       - is_running
@@ -34214,6 +34250,7 @@ components:
       - authentik_events.notification
       - authentik_events.notificationrule
       - authentik_events.notificationwebhookmapping
+      - authentik_events.systemtask
       - authentik_flows.flow
       - authentik_flows.flowstagebinding
       - authentik_outposts.dockerserviceconnection
@@ -34293,6 +34330,7 @@ components:
         * `authentik_events.notification` - Notification
         * `authentik_events.notificationrule` - Notification Rule
         * `authentik_events.notificationwebhookmapping` - Webhook Mapping
+        * `authentik_events.systemtask` - System Task
         * `authentik_flows.flow` - Flow
         * `authentik_flows.flowstagebinding` - Flow Stage Binding
         * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@@ -36281,6 +36319,18 @@ components:
       required:
       - pagination
       - results
+    PaginatedSystemTaskList:
+      type: object
+      properties:
+        pagination:
+          $ref: '#/components/schemas/Pagination'
+        results:
+          type: array
+          items:
+            $ref: '#/components/schemas/SystemTask'
+      required:
+      - pagination
+      - results
     PaginatedTOTPDeviceList:
       type: object
       properties:
@@ -37431,6 +37481,7 @@ components:
             * `authentik_events.notification` - Notification
             * `authentik_events.notificationrule` - Notification Rule
             * `authentik_events.notificationwebhookmapping` - Webhook Mapping
+            * `authentik_events.systemtask` - System Task
             * `authentik_flows.flow` - Flow
             * `authentik_flows.flowstagebinding` - Flow Stage Binding
             * `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@@ -41972,7 +42023,7 @@ components:
         tasks:
           type: array
           items:
-            $ref: '#/components/schemas/Task'
+            $ref: '#/components/schemas/SystemTask'
           readOnly: true
       required:
       - is_running
@@ -42630,6 +42681,50 @@ components:
       - runtime
       - server_time
       - tenant
+    SystemTask:
+      type: object
+      description: Serialize TaskInfo and TaskResult
+      properties:
+        task_name:
+          type: string
+        task_description:
+          type: string
+        task_start_timestamp:
+          type: string
+          format: date-time
+        task_finish_timestamp:
+          type: string
+          format: date-time
+        task_duration:
+          type: integer
+          description: Get the duration a task took to run
+          readOnly: true
+        status:
+          $ref: '#/components/schemas/SystemTaskStatusEnum'
+        messages:
+          type: array
+          items:
+            type: string
+      required:
+      - messages
+      - status
+      - task_description
+      - task_duration
+      - task_finish_timestamp
+      - task_name
+      - task_start_timestamp
+    SystemTaskStatusEnum:
+      enum:
+      - SUCCESSFUL
+      - WARNING
+      - ERROR
+      - UNKNOWN
+      type: string
+      description: |-
+        * `SUCCESSFUL` - SUCCESSFUL
+        * `WARNING` - WARNING
+        * `ERROR` - ERROR
+        * `UNKNOWN` - UNKNOWN
     TOTPDevice:
       type: object
       description: Serializer for totp authenticator devices
@@ -42656,45 +42751,6 @@ components:
           maxLength: 64
       required:
       - name
-    Task:
-      type: object
-      description: Serialize TaskInfo and TaskResult
-      properties:
-        task_name:
-          type: string
-        task_description:
-          type: string
-        task_finish_timestamp:
-          type: string
-          format: date-time
-        task_duration:
-          type: integer
-          description: Get the duration a task took to run
-          readOnly: true
-        status:
-          $ref: '#/components/schemas/TaskStatusEnum'
-        messages:
-          type: array
-          items: {}
-      required:
-      - messages
-      - status
-      - task_description
-      - task_duration
-      - task_finish_timestamp
-      - task_name
-    TaskStatusEnum:
-      enum:
-      - SUCCESSFUL
-      - WARNING
-      - ERROR
-      - UNKNOWN
-      type: string
-      description: |-
-        * `SUCCESSFUL` - SUCCESSFUL
-        * `WARNING` - WARNING
-        * `ERROR` - ERROR
-        * `UNKNOWN` - UNKNOWN
     Tenant:
       type: object
       description: Tenant Serializer