rename globally to system task
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
parent
c3e9d23190
commit
08ee66d682
|
@ -11,7 +11,7 @@ from structlog.stdlib import get_logger
|
||||||
from authentik import __version__, get_build_hash
|
from authentik import __version__, get_build_hash
|
||||||
from authentik.admin.apps import PROM_INFO
|
from authentik.admin.apps import PROM_INFO
|
||||||
from authentik.events.models import Event, EventAction, Notification
|
from authentik.events.models import Event, EventAction, Notification
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task
|
from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.lib.utils.http import get_http_session
|
from authentik.lib.utils.http import get_http_session
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
@ -49,9 +49,9 @@ def clear_update_notifications():
|
||||||
notification.delete()
|
notification.delete()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def update_latest_version(self: MonitoredTask):
|
def update_latest_version(self: SystemTask):
|
||||||
"""Update latest version info"""
|
"""Update latest version info"""
|
||||||
if CONFIG.get_bool("disable_update_check"):
|
if CONFIG.get_bool("disable_update_check"):
|
||||||
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
|
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
|
||||||
|
|
|
@ -30,7 +30,7 @@ from authentik.blueprints.v1.importer import Importer
|
||||||
from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE
|
from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE
|
||||||
from authentik.blueprints.v1.oci import OCI_PREFIX
|
from authentik.blueprints.v1.oci import OCI_PREFIX
|
||||||
from authentik.events.models import TaskStatus
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
|
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||||
from authentik.events.utils import sanitize_dict
|
from authentik.events.utils import sanitize_dict
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
@ -124,10 +124,10 @@ def blueprints_find() -> list[BlueprintFile]:
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(
|
@CELERY_APP.task(
|
||||||
throws=(DatabaseError, ProgrammingError, InternalError), base=MonitoredTask, bind=True
|
throws=(DatabaseError, ProgrammingError, InternalError), base=SystemTask, bind=True
|
||||||
)
|
)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def blueprints_discovery(self: MonitoredTask, path: Optional[str] = None):
|
def blueprints_discovery(self: SystemTask, path: Optional[str] = None):
|
||||||
"""Find blueprints and check if they need to be created in the database"""
|
"""Find blueprints and check if they need to be created in the database"""
|
||||||
count = 0
|
count = 0
|
||||||
for blueprint in blueprints_find():
|
for blueprint in blueprints_find():
|
||||||
|
@ -169,9 +169,9 @@ def check_blueprint_v1_file(blueprint: BlueprintFile):
|
||||||
|
|
||||||
@CELERY_APP.task(
|
@CELERY_APP.task(
|
||||||
bind=True,
|
bind=True,
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
)
|
)
|
||||||
def apply_blueprint(self: MonitoredTask, instance_pk: str):
|
def apply_blueprint(self: SystemTask, instance_pk: str):
|
||||||
"""Apply single blueprint"""
|
"""Apply single blueprint"""
|
||||||
self.save_on_success = False
|
self.save_on_success = False
|
||||||
instance: Optional[BlueprintInstance] = None
|
instance: Optional[BlueprintInstance] = None
|
||||||
|
|
|
@ -13,15 +13,15 @@ from authentik.core.models import (
|
||||||
ExpiringModel,
|
ExpiringModel,
|
||||||
User,
|
User,
|
||||||
)
|
)
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task
|
from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def clean_expired_models(self: MonitoredTask):
|
def clean_expired_models(self: SystemTask):
|
||||||
"""Remove expired objects"""
|
"""Remove expired objects"""
|
||||||
messages = []
|
messages = []
|
||||||
for cls in ExpiringModel.__subclasses__():
|
for cls in ExpiringModel.__subclasses__():
|
||||||
|
@ -52,9 +52,9 @@ def clean_expired_models(self: MonitoredTask):
|
||||||
self.set_status(TaskStatus.SUCCESSFUL, *messages)
|
self.set_status(TaskStatus.SUCCESSFUL, *messages)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def clean_temporary_users(self: MonitoredTask):
|
def clean_temporary_users(self: SystemTask):
|
||||||
"""Remove temporary users created by SAML Sources"""
|
"""Remove temporary users created by SAML Sources"""
|
||||||
_now = datetime.now()
|
_now = datetime.now()
|
||||||
messages = []
|
messages = []
|
||||||
|
|
|
@ -10,7 +10,7 @@ from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.crypto.models import CertificateKeyPair
|
from authentik.crypto.models import CertificateKeyPair
|
||||||
from authentik.events.models import TaskStatus
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
|
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
|
||||||
|
@ -35,9 +35,9 @@ def ensure_certificate_valid(body: str):
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def certificate_discovery(self: MonitoredTask):
|
def certificate_discovery(self: SystemTask):
|
||||||
"""Discover, import and update certificates from the filesystem"""
|
"""Discover, import and update certificates from the filesystem"""
|
||||||
certs = {}
|
certs = {}
|
||||||
private_keys = {}
|
private_keys = {}
|
||||||
|
|
|
@ -58,7 +58,7 @@ class AuthentikEventsConfig(ManagedAppConfig):
|
||||||
def reconcile_prefill_tasks(self):
|
def reconcile_prefill_tasks(self):
|
||||||
"""Prefill tasks"""
|
"""Prefill tasks"""
|
||||||
from authentik.events.models import SystemTask
|
from authentik.events.models import SystemTask
|
||||||
from authentik.events.monitored_tasks import _prefill_tasks
|
from authentik.events.system_tasks import _prefill_tasks
|
||||||
|
|
||||||
for task in _prefill_tasks:
|
for task in _prefill_tasks:
|
||||||
if SystemTask.objects.filter(name=task.name).exists():
|
if SystemTask.objects.filter(name=task.name).exists():
|
||||||
|
|
|
@ -8,14 +8,16 @@ from django.utils.timezone import now
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.events.models import Event, EventAction, SystemTask, TaskStatus
|
from authentik.events.models import Event, EventAction
|
||||||
|
from authentik.events.models import SystemTask as DBSystemTask
|
||||||
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.utils import sanitize_item
|
from authentik.events.utils import sanitize_item
|
||||||
from authentik.lib.utils.errors import exception_to_string
|
from authentik.lib.utils.errors import exception_to_string
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
class MonitoredTask(Task):
|
class SystemTask(Task):
|
||||||
"""Task which can save its state to the cache"""
|
"""Task which can save its state to the cache"""
|
||||||
|
|
||||||
# For tasks that should only be listed if they failed, set this to False
|
# For tasks that should only be listed if they failed, set this to False
|
||||||
|
@ -59,12 +61,12 @@ class MonitoredTask(Task):
|
||||||
if not self._status:
|
if not self._status:
|
||||||
return
|
return
|
||||||
if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success:
|
if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success:
|
||||||
SystemTask.objects.filter(
|
DBSystemTask.objects.filter(
|
||||||
name=self.__name__,
|
name=self.__name__,
|
||||||
uid=self._uid,
|
uid=self._uid,
|
||||||
).delete()
|
).delete()
|
||||||
return
|
return
|
||||||
SystemTask.objects.update_or_create(
|
DBSystemTask.objects.update_or_create(
|
||||||
name=self.__name__,
|
name=self.__name__,
|
||||||
uid=self._uid,
|
uid=self._uid,
|
||||||
defaults={
|
defaults={
|
||||||
|
@ -88,7 +90,7 @@ class MonitoredTask(Task):
|
||||||
if not self._status:
|
if not self._status:
|
||||||
self._status = TaskStatus.ERROR
|
self._status = TaskStatus.ERROR
|
||||||
self._messages = exception_to_string(exc)
|
self._messages = exception_to_string(exc)
|
||||||
SystemTask.objects.update_or_create(
|
DBSystemTask.objects.update_or_create(
|
||||||
name=self.__name__,
|
name=self.__name__,
|
||||||
uid=self._uid,
|
uid=self._uid,
|
||||||
defaults={
|
defaults={
|
||||||
|
@ -117,7 +119,7 @@ class MonitoredTask(Task):
|
||||||
def prefill_task(func):
|
def prefill_task(func):
|
||||||
"""Ensure a task's details are always in cache, so it can always be triggered via API"""
|
"""Ensure a task's details are always in cache, so it can always be triggered via API"""
|
||||||
_prefill_tasks.append(
|
_prefill_tasks.append(
|
||||||
SystemTask(
|
DBSystemTask(
|
||||||
name=func.__name__,
|
name=func.__name__,
|
||||||
description=func.__doc__,
|
description=func.__doc__,
|
||||||
status=TaskStatus.UNKNOWN,
|
status=TaskStatus.UNKNOWN,
|
|
@ -15,7 +15,7 @@ from authentik.events.models import (
|
||||||
NotificationTransportError,
|
NotificationTransportError,
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
|
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||||
from authentik.policies.engine import PolicyEngine
|
from authentik.policies.engine import PolicyEngine
|
||||||
from authentik.policies.models import PolicyBinding, PolicyEngineMode
|
from authentik.policies.models import PolicyBinding, PolicyEngineMode
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
@ -95,10 +95,10 @@ def event_trigger_handler(event_uuid: str, trigger_name: str):
|
||||||
bind=True,
|
bind=True,
|
||||||
autoretry_for=(NotificationTransportError,),
|
autoretry_for=(NotificationTransportError,),
|
||||||
retry_backoff=True,
|
retry_backoff=True,
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
)
|
)
|
||||||
def notification_transport(
|
def notification_transport(
|
||||||
self: MonitoredTask, transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str
|
self: SystemTask, transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str
|
||||||
):
|
):
|
||||||
"""Send notification over specified transport"""
|
"""Send notification over specified transport"""
|
||||||
self.save_on_success = False
|
self.save_on_success = False
|
||||||
|
@ -133,9 +133,9 @@ def gdpr_cleanup(user_pk: int):
|
||||||
events.delete()
|
events.delete()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def notification_cleanup(self: MonitoredTask):
|
def notification_cleanup(self: SystemTask):
|
||||||
"""Cleanup seen notifications and notifications whose event expired."""
|
"""Cleanup seen notifications and notifications whose event expired."""
|
||||||
notifications = Notification.objects.filter(Q(event=None) | Q(seen=True))
|
notifications = Notification.objects.filter(Q(event=None) | Q(seen=True))
|
||||||
amount = notifications.count()
|
amount = notifications.count()
|
||||||
|
|
|
@ -6,8 +6,9 @@ from rest_framework.test import APITestCase
|
||||||
|
|
||||||
from authentik.core.tasks import clean_expired_models
|
from authentik.core.tasks import clean_expired_models
|
||||||
from authentik.core.tests.utils import create_test_admin_user
|
from authentik.core.tests.utils import create_test_admin_user
|
||||||
from authentik.events.models import SystemTask, TaskStatus
|
from authentik.events.models import SystemTask as DBSystemTask
|
||||||
from authentik.events.monitored_tasks import MonitoredTask
|
from authentik.events.models import TaskStatus
|
||||||
|
from authentik.events.system_tasks import SystemTask
|
||||||
from authentik.lib.generators import generate_id
|
from authentik.lib.generators import generate_id
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
|
||||||
|
@ -28,9 +29,9 @@ class TestSystemTasks(APITestCase):
|
||||||
|
|
||||||
@CELERY_APP.task(
|
@CELERY_APP.task(
|
||||||
bind=True,
|
bind=True,
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
)
|
)
|
||||||
def test_task(self: MonitoredTask):
|
def test_task(self: SystemTask):
|
||||||
self.save_on_success = False
|
self.save_on_success = False
|
||||||
self.set_uid(uid)
|
self.set_uid(uid)
|
||||||
self.set_status(TaskStatus.ERROR if should_fail else TaskStatus.SUCCESSFUL)
|
self.set_status(TaskStatus.ERROR if should_fail else TaskStatus.SUCCESSFUL)
|
||||||
|
@ -38,18 +39,18 @@ class TestSystemTasks(APITestCase):
|
||||||
# First test successful run
|
# First test successful run
|
||||||
should_fail = False
|
should_fail = False
|
||||||
test_task.delay().get()
|
test_task.delay().get()
|
||||||
self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid).first())
|
self.assertIsNone(DBSystemTask.objects.filter(name="test_task", uid=uid).first())
|
||||||
|
|
||||||
# Then test failed
|
# Then test failed
|
||||||
should_fail = True
|
should_fail = True
|
||||||
test_task.delay().get()
|
test_task.delay().get()
|
||||||
task = SystemTask.objects.filter(name="test_task", uid=uid).first()
|
task = DBSystemTask.objects.filter(name="test_task", uid=uid).first()
|
||||||
self.assertEqual(task.status, TaskStatus.ERROR)
|
self.assertEqual(task.status, TaskStatus.ERROR)
|
||||||
|
|
||||||
# Then after that, the state should be removed
|
# Then after that, the state should be removed
|
||||||
should_fail = False
|
should_fail = False
|
||||||
test_task.delay().get()
|
test_task.delay().get()
|
||||||
self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid).first())
|
self.assertIsNone(DBSystemTask.objects.filter(name="test_task", uid=uid).first())
|
||||||
|
|
||||||
def test_tasks(self):
|
def test_tasks(self):
|
||||||
"""Test Task API"""
|
"""Test Task API"""
|
||||||
|
@ -62,7 +63,7 @@ class TestSystemTasks(APITestCase):
|
||||||
def test_tasks_single(self):
|
def test_tasks_single(self):
|
||||||
"""Test Task API (read single)"""
|
"""Test Task API (read single)"""
|
||||||
clean_expired_models.delay().get()
|
clean_expired_models.delay().get()
|
||||||
task = SystemTask.objects.filter(name="clean_expired_models").first()
|
task = DBSystemTask.objects.filter(name="clean_expired_models").first()
|
||||||
response = self.client.get(
|
response = self.client.get(
|
||||||
reverse(
|
reverse(
|
||||||
"authentik_api:systemtask-detail",
|
"authentik_api:systemtask-detail",
|
||||||
|
@ -81,7 +82,7 @@ class TestSystemTasks(APITestCase):
|
||||||
def test_tasks_run(self):
|
def test_tasks_run(self):
|
||||||
"""Test Task API (run)"""
|
"""Test Task API (run)"""
|
||||||
clean_expired_models.delay().get()
|
clean_expired_models.delay().get()
|
||||||
task = SystemTask.objects.filter(name="clean_expired_models").first()
|
task = DBSystemTask.objects.filter(name="clean_expired_models").first()
|
||||||
response = self.client.post(
|
response = self.client.post(
|
||||||
reverse(
|
reverse(
|
||||||
"authentik_api:systemtask-run",
|
"authentik_api:systemtask-run",
|
||||||
|
|
|
@ -20,7 +20,7 @@ from yaml import safe_load
|
||||||
from authentik.enterprise.providers.rac.controllers.docker import RACDockerController
|
from authentik.enterprise.providers.rac.controllers.docker import RACDockerController
|
||||||
from authentik.enterprise.providers.rac.controllers.kubernetes import RACKubernetesController
|
from authentik.enterprise.providers.rac.controllers.kubernetes import RACKubernetesController
|
||||||
from authentik.events.models import TaskStatus
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
|
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.lib.utils.reflection import path_to_class
|
from authentik.lib.utils.reflection import path_to_class
|
||||||
from authentik.outposts.consumer import OUTPOST_GROUP
|
from authentik.outposts.consumer import OUTPOST_GROUP
|
||||||
|
@ -104,11 +104,11 @@ def outpost_service_connection_state(connection_pk: Any):
|
||||||
|
|
||||||
@CELERY_APP.task(
|
@CELERY_APP.task(
|
||||||
bind=True,
|
bind=True,
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
throws=(DatabaseError, ProgrammingError, InternalError),
|
throws=(DatabaseError, ProgrammingError, InternalError),
|
||||||
)
|
)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def outpost_service_connection_monitor(self: MonitoredTask):
|
def outpost_service_connection_monitor(self: SystemTask):
|
||||||
"""Regularly check the state of Outpost Service Connections"""
|
"""Regularly check the state of Outpost Service Connections"""
|
||||||
connections = OutpostServiceConnection.objects.all()
|
connections = OutpostServiceConnection.objects.all()
|
||||||
for connection in connections.iterator():
|
for connection in connections.iterator():
|
||||||
|
@ -128,9 +128,9 @@ def outpost_controller_all():
|
||||||
outpost_controller.delay(outpost.pk.hex, "up", from_cache=False)
|
outpost_controller.delay(outpost.pk.hex, "up", from_cache=False)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
def outpost_controller(
|
def outpost_controller(
|
||||||
self: MonitoredTask, outpost_pk: str, action: str = "up", from_cache: bool = False
|
self: SystemTask, outpost_pk: str, action: str = "up", from_cache: bool = False
|
||||||
):
|
):
|
||||||
"""Create/update/monitor/delete the deployment of an Outpost"""
|
"""Create/update/monitor/delete the deployment of an Outpost"""
|
||||||
logs = []
|
logs = []
|
||||||
|
@ -162,9 +162,9 @@ def outpost_controller(
|
||||||
self.set_status(TaskStatus.SUCCESSFUL, *logs)
|
self.set_status(TaskStatus.SUCCESSFUL, *logs)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def outpost_token_ensurer(self: MonitoredTask):
|
def outpost_token_ensurer(self: SystemTask):
|
||||||
"""Periodically ensure that all Outposts have valid Service Accounts
|
"""Periodically ensure that all Outposts have valid Service Accounts
|
||||||
and Tokens"""
|
and Tokens"""
|
||||||
all_outposts = Outpost.objects.all()
|
all_outposts = Outpost.objects.all()
|
||||||
|
@ -248,10 +248,10 @@ def _outpost_single_update(outpost: Outpost, layer=None):
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(
|
@CELERY_APP.task(
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
bind=True,
|
bind=True,
|
||||||
)
|
)
|
||||||
def outpost_connection_discovery(self: MonitoredTask):
|
def outpost_connection_discovery(self: SystemTask):
|
||||||
"""Checks the local environment and create Service connections."""
|
"""Checks the local environment and create Service connections."""
|
||||||
messages = []
|
messages = []
|
||||||
if not CONFIG.get_bool("outposts.discover"):
|
if not CONFIG.get_bool("outposts.discover"):
|
||||||
|
|
|
@ -5,7 +5,7 @@ from structlog.stdlib import get_logger
|
||||||
from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
|
from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
|
||||||
from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
|
from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
|
||||||
from authentik.events.models import TaskStatus
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
|
from authentik.events.system_tasks import SystemTask, prefill_task
|
||||||
from authentik.policies.reputation.models import Reputation
|
from authentik.policies.reputation.models import Reputation
|
||||||
from authentik.policies.reputation.signals import CACHE_KEY_PREFIX
|
from authentik.policies.reputation.signals import CACHE_KEY_PREFIX
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
@ -13,9 +13,9 @@ from authentik.root.celery import CELERY_APP
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def save_reputation(self: MonitoredTask):
|
def save_reputation(self: SystemTask):
|
||||||
"""Save currently cached reputation to database"""
|
"""Save currently cached reputation to database"""
|
||||||
objects_to_update = []
|
objects_to_update = []
|
||||||
for _, score in cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*")).items():
|
for _, score in cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*")).items():
|
||||||
|
|
|
@ -11,7 +11,7 @@ from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.core.models import Group, User
|
from authentik.core.models import Group, User
|
||||||
from authentik.events.models import TaskStatus
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask
|
from authentik.events.system_tasks import SystemTask
|
||||||
from authentik.lib.utils.reflection import path_to_class
|
from authentik.lib.utils.reflection import path_to_class
|
||||||
from authentik.providers.scim.clients import PAGE_SIZE, PAGE_TIMEOUT
|
from authentik.providers.scim.clients import PAGE_SIZE, PAGE_TIMEOUT
|
||||||
from authentik.providers.scim.clients.base import SCIMClient
|
from authentik.providers.scim.clients.base import SCIMClient
|
||||||
|
@ -40,8 +40,8 @@ def scim_sync_all():
|
||||||
scim_sync.delay(provider.pk)
|
scim_sync.delay(provider.pk)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
def scim_sync(self: MonitoredTask, provider_pk: int) -> None:
|
def scim_sync(self: SystemTask, provider_pk: int) -> None:
|
||||||
"""Run SCIM full sync for provider"""
|
"""Run SCIM full sync for provider"""
|
||||||
provider: SCIMProvider = SCIMProvider.objects.filter(
|
provider: SCIMProvider = SCIMProvider.objects.filter(
|
||||||
pk=provider_pk, backchannel_application__isnull=False
|
pk=provider_pk, backchannel_application__isnull=False
|
||||||
|
|
|
@ -8,8 +8,9 @@ from ldap3.core.exceptions import LDAPException
|
||||||
from redis.exceptions import LockError
|
from redis.exceptions import LockError
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.events.models import SystemTask, TaskStatus
|
from authentik.events.models import SystemTask as DBSystemTask
|
||||||
from authentik.events.monitored_tasks import MonitoredTask
|
from authentik.events.models import TaskStatus
|
||||||
|
from authentik.events.system_tasks import SystemTask
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.lib.utils.errors import exception_to_string
|
from authentik.lib.utils.errors import exception_to_string
|
||||||
from authentik.lib.utils.reflection import class_to_path, path_to_class
|
from authentik.lib.utils.reflection import class_to_path, path_to_class
|
||||||
|
@ -69,7 +70,7 @@ def ldap_sync_single(source_pk: str):
|
||||||
try:
|
try:
|
||||||
with lock:
|
with lock:
|
||||||
# Delete all sync tasks from the cache
|
# Delete all sync tasks from the cache
|
||||||
SystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete()
|
DBSystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete()
|
||||||
task = chain(
|
task = chain(
|
||||||
# User and group sync can happen at once, they have no dependencies on each other
|
# User and group sync can happen at once, they have no dependencies on each other
|
||||||
group(
|
group(
|
||||||
|
@ -102,11 +103,11 @@ def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) ->
|
||||||
|
|
||||||
@CELERY_APP.task(
|
@CELERY_APP.task(
|
||||||
bind=True,
|
bind=True,
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
soft_time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"),
|
soft_time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"),
|
||||||
task_time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"),
|
task_time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"),
|
||||||
)
|
)
|
||||||
def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str, page_cache_key: str):
|
def ldap_sync(self: SystemTask, source_pk: str, sync_class: str, page_cache_key: str):
|
||||||
"""Synchronization of an LDAP Source"""
|
"""Synchronization of an LDAP Source"""
|
||||||
self.result_timeout_hours = CONFIG.get_int("ldap.task_timeout_hours")
|
self.result_timeout_hours = CONFIG.get_int("ldap.task_timeout_hours")
|
||||||
source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first()
|
source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first()
|
||||||
|
|
|
@ -8,7 +8,7 @@ from authentik.blueprints.tests import apply_blueprint
|
||||||
from authentik.core.models import Group, User
|
from authentik.core.models import Group, User
|
||||||
from authentik.core.tests.utils import create_test_admin_user
|
from authentik.core.tests.utils import create_test_admin_user
|
||||||
from authentik.events.models import Event, EventAction, SystemTask
|
from authentik.events.models import Event, EventAction, SystemTask
|
||||||
from authentik.events.monitored_tasks import TaskStatus
|
from authentik.events.system_tasks import TaskStatus
|
||||||
from authentik.lib.generators import generate_id, generate_key
|
from authentik.lib.generators import generate_id, generate_key
|
||||||
from authentik.lib.utils.reflection import class_to_path
|
from authentik.lib.utils.reflection import class_to_path
|
||||||
from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
|
from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
|
||||||
|
|
|
@ -5,7 +5,7 @@ from requests import RequestException
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.events.models import TaskStatus
|
from authentik.events.models import TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask
|
from authentik.events.system_tasks import SystemTask
|
||||||
from authentik.lib.utils.http import get_http_session
|
from authentik.lib.utils.http import get_http_session
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
from authentik.sources.oauth.models import OAuthSource
|
from authentik.sources.oauth.models import OAuthSource
|
||||||
|
@ -13,8 +13,8 @@ from authentik.sources.oauth.models import OAuthSource
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
def update_well_known_jwks(self: MonitoredTask):
|
def update_well_known_jwks(self: SystemTask):
|
||||||
"""Update OAuth sources' config from well_known, and JWKS info from the configured URL"""
|
"""Update OAuth sources' config from well_known, and JWKS info from the configured URL"""
|
||||||
session = get_http_session()
|
session = get_http_session()
|
||||||
messages = []
|
messages = []
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
from requests import RequestException
|
from requests import RequestException
|
||||||
|
|
||||||
from authentik.events.models import Event, EventAction, TaskStatus
|
from authentik.events.models import Event, EventAction, TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask
|
from authentik.events.system_tasks import SystemTask
|
||||||
from authentik.lib.utils.errors import exception_to_string
|
from authentik.lib.utils.errors import exception_to_string
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
from authentik.sources.plex.models import PlexSource
|
from authentik.sources.plex.models import PlexSource
|
||||||
|
@ -16,8 +16,8 @@ def check_plex_token_all():
|
||||||
check_plex_token.delay(source.slug)
|
check_plex_token.delay(source.slug)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=SystemTask)
|
||||||
def check_plex_token(self: MonitoredTask, source_slug: int):
|
def check_plex_token(self: SystemTask, source_slug: int):
|
||||||
"""Check the validity of a Plex source."""
|
"""Check the validity of a Plex source."""
|
||||||
sources = PlexSource.objects.filter(slug=source_slug)
|
sources = PlexSource.objects.filter(slug=source_slug)
|
||||||
if not sources.exists():
|
if not sources.exists():
|
||||||
|
|
|
@ -10,7 +10,7 @@ from django.utils.text import slugify
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.events.models import Event, EventAction, TaskStatus
|
from authentik.events.models import Event, EventAction, TaskStatus
|
||||||
from authentik.events.monitored_tasks import MonitoredTask
|
from authentik.events.system_tasks import SystemTask
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
from authentik.stages.email.models import EmailStage
|
from authentik.stages.email.models import EmailStage
|
||||||
from authentik.stages.email.utils import logo_data
|
from authentik.stages.email.utils import logo_data
|
||||||
|
@ -44,9 +44,9 @@ def get_email_body(email: EmailMultiAlternatives) -> str:
|
||||||
OSError,
|
OSError,
|
||||||
),
|
),
|
||||||
retry_backoff=True,
|
retry_backoff=True,
|
||||||
base=MonitoredTask,
|
base=SystemTask,
|
||||||
)
|
)
|
||||||
def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Optional[str] = None):
|
def send_mail(self: SystemTask, message: dict[Any, Any], email_stage_pk: Optional[str] = None):
|
||||||
"""Send Email for Email Stage. Retries are scheduled automatically."""
|
"""Send Email for Email Stage. Retries are scheduled automatically."""
|
||||||
self.save_on_success = False
|
self.save_on_success = False
|
||||||
message_id = make_msgid(domain=DNS_NAME)
|
message_id = make_msgid(domain=DNS_NAME)
|
||||||
|
|
Reference in a new issue