events: replace @prefill_task with custom base class to prefill
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
20c738c384
commit
cf78c89830
|
@ -11,12 +11,7 @@ from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik import ENV_GIT_HASH_KEY, __version__
|
from authentik import ENV_GIT_HASH_KEY, __version__
|
||||||
from authentik.events.models import Event, EventAction, Notification
|
from authentik.events.models import Event, EventAction, Notification
|
||||||
from authentik.events.monitored_tasks import (
|
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus
|
||||||
MonitoredTask,
|
|
||||||
TaskResult,
|
|
||||||
TaskResultStatus,
|
|
||||||
prefill_task,
|
|
||||||
)
|
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.lib.utils.http import get_http_session
|
from authentik.lib.utils.http import get_http_session
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
@ -53,9 +48,8 @@ def clear_update_notifications():
|
||||||
notification.delete()
|
notification.delete()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def update_latest_version(self: PrefilledMonitoredTask):
|
||||||
def update_latest_version(self: MonitoredTask):
|
|
||||||
"""Update latest version info"""
|
"""Update latest version info"""
|
||||||
if CONFIG.y_bool("disable_update_check"):
|
if CONFIG.y_bool("disable_update_check"):
|
||||||
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
|
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
|
||||||
|
|
|
@ -16,21 +16,15 @@ from kubernetes.config.incluster_config import SERVICE_HOST_ENV_NAME
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.core.models import AuthenticatedSession, ExpiringModel
|
from authentik.core.models import AuthenticatedSession, ExpiringModel
|
||||||
from authentik.events.monitored_tasks import (
|
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus
|
||||||
MonitoredTask,
|
|
||||||
TaskResult,
|
|
||||||
TaskResultStatus,
|
|
||||||
prefill_task,
|
|
||||||
)
|
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def clean_expired_models(self: PrefilledMonitoredTask):
|
||||||
def clean_expired_models(self: MonitoredTask):
|
|
||||||
"""Remove expired objects"""
|
"""Remove expired objects"""
|
||||||
messages = []
|
messages = []
|
||||||
for cls in ExpiringModel.__subclasses__():
|
for cls in ExpiringModel.__subclasses__():
|
||||||
|
@ -68,9 +62,8 @@ def should_backup() -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def backup_database(self: PrefilledMonitoredTask): # pragma: no cover
|
||||||
def backup_database(self: MonitoredTask): # pragma: no cover
|
|
||||||
"""Database backup"""
|
"""Database backup"""
|
||||||
self.result_timeout_hours = 25
|
self.result_timeout_hours = 25
|
||||||
if not should_backup():
|
if not should_backup():
|
||||||
|
|
|
@ -112,30 +112,6 @@ class TaskInfo:
|
||||||
cache.set(key, self, timeout=timeout_hours * 60 * 60)
|
cache.set(key, self, timeout=timeout_hours * 60 * 60)
|
||||||
|
|
||||||
|
|
||||||
def prefill_task(func):
|
|
||||||
"""Ensure a task's details are always in cache, so it can always be triggered via API"""
|
|
||||||
|
|
||||||
def wrapper(*args, **kwargs):
|
|
||||||
status = TaskInfo.by_name(func.__name__)
|
|
||||||
if status:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
TaskInfo(
|
|
||||||
task_name=func.__name__,
|
|
||||||
task_description=func.__doc__,
|
|
||||||
result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]),
|
|
||||||
task_call_module=func.__module__,
|
|
||||||
task_call_func=func.__name__,
|
|
||||||
# We don't have real values for these attributes but they cannot be null
|
|
||||||
start_timestamp=default_timer(),
|
|
||||||
finish_timestamp=default_timer(),
|
|
||||||
finish_time=datetime.now(),
|
|
||||||
).save(86400)
|
|
||||||
LOGGER.debug("prefilled task", task_name=func.__name__)
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
class MonitoredTask(Task):
|
class MonitoredTask(Task):
|
||||||
"""Task which can save its state to the cache"""
|
"""Task which can save its state to the cache"""
|
||||||
|
|
||||||
|
@ -210,5 +186,31 @@ class MonitoredTask(Task):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class PrefilledMonitoredTask(MonitoredTask):
|
||||||
|
"""Subclass of MonitoredTask, but create entry in cache if task hasn't been run
|
||||||
|
Does not support UID"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
status = TaskInfo.by_name(self.__name__)
|
||||||
|
if status:
|
||||||
|
return
|
||||||
|
TaskInfo(
|
||||||
|
task_name=self.__name__,
|
||||||
|
task_description=self.__doc__,
|
||||||
|
result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]),
|
||||||
|
task_call_module=self.__module__,
|
||||||
|
task_call_func=self.__name__,
|
||||||
|
# We don't have real values for these attributes but they cannot be null
|
||||||
|
start_timestamp=default_timer(),
|
||||||
|
finish_timestamp=default_timer(),
|
||||||
|
finish_time=datetime.now(),
|
||||||
|
).save(86400)
|
||||||
|
LOGGER.debug("prefilled task", task_name=self.__name__)
|
||||||
|
|
||||||
|
def run(self, *args, **kwargs):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
for task in TaskInfo.all().values():
|
for task in TaskInfo.all().values():
|
||||||
task.set_prom_metrics()
|
task.set_prom_metrics()
|
||||||
|
|
|
@ -2,18 +2,12 @@
|
||||||
from django.db import DatabaseError
|
from django.db import DatabaseError
|
||||||
|
|
||||||
from authentik.core.tasks import CELERY_APP
|
from authentik.core.tasks import CELERY_APP
|
||||||
from authentik.events.monitored_tasks import (
|
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus
|
||||||
MonitoredTask,
|
|
||||||
TaskResult,
|
|
||||||
TaskResultStatus,
|
|
||||||
prefill_task,
|
|
||||||
)
|
|
||||||
from authentik.managed.manager import ObjectManager
|
from authentik.managed.manager import ObjectManager
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def managed_reconcile(self: PrefilledMonitoredTask):
|
||||||
def managed_reconcile(self: MonitoredTask):
|
|
||||||
"""Run ObjectManager to ensure objects are up-to-date"""
|
"""Run ObjectManager to ensure objects are up-to-date"""
|
||||||
try:
|
try:
|
||||||
ObjectManager().run()
|
ObjectManager().run()
|
||||||
|
|
|
@ -19,9 +19,9 @@ from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.events.monitored_tasks import (
|
from authentik.events.monitored_tasks import (
|
||||||
MonitoredTask,
|
MonitoredTask,
|
||||||
|
PrefilledMonitoredTask,
|
||||||
TaskResult,
|
TaskResult,
|
||||||
TaskResultStatus,
|
TaskResultStatus,
|
||||||
prefill_task,
|
|
||||||
)
|
)
|
||||||
from authentik.lib.utils.reflection import path_to_class
|
from authentik.lib.utils.reflection import path_to_class
|
||||||
from authentik.outposts.controllers.base import BaseController, ControllerException
|
from authentik.outposts.controllers.base import BaseController, ControllerException
|
||||||
|
@ -75,9 +75,8 @@ def outpost_service_connection_state(connection_pk: Any):
|
||||||
cache.set(connection.state_key, state, timeout=None)
|
cache.set(connection.state_key, state, timeout=None)
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def outpost_service_connection_monitor(self: PrefilledMonitoredTask):
|
||||||
def outpost_service_connection_monitor(self: MonitoredTask):
|
|
||||||
"""Regularly check the state of Outpost Service Connections"""
|
"""Regularly check the state of Outpost Service Connections"""
|
||||||
connections = OutpostServiceConnection.objects.all()
|
connections = OutpostServiceConnection.objects.all()
|
||||||
for connection in connections.iterator():
|
for connection in connections.iterator():
|
||||||
|
@ -125,9 +124,8 @@ def outpost_controller(
|
||||||
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def outpost_token_ensurer(self: PrefilledMonitoredTask):
|
||||||
def outpost_token_ensurer(self: MonitoredTask):
|
|
||||||
"""Periodically ensure that all Outposts have valid Service Accounts
|
"""Periodically ensure that all Outposts have valid Service Accounts
|
||||||
and Tokens"""
|
and Tokens"""
|
||||||
all_outposts = Outpost.objects.all()
|
all_outposts = Outpost.objects.all()
|
||||||
|
|
|
@ -2,12 +2,7 @@
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.events.monitored_tasks import (
|
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus
|
||||||
MonitoredTask,
|
|
||||||
TaskResult,
|
|
||||||
TaskResultStatus,
|
|
||||||
prefill_task,
|
|
||||||
)
|
|
||||||
from authentik.policies.reputation.models import IPReputation, UserReputation
|
from authentik.policies.reputation.models import IPReputation, UserReputation
|
||||||
from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX
|
from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
|
@ -15,9 +10,8 @@ from authentik.root.celery import CELERY_APP
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def save_ip_reputation(self: PrefilledMonitoredTask):
|
||||||
def save_ip_reputation(self: MonitoredTask):
|
|
||||||
"""Save currently cached reputation to database"""
|
"""Save currently cached reputation to database"""
|
||||||
objects_to_update = []
|
objects_to_update = []
|
||||||
for key, score in cache.get_many(cache.keys(CACHE_KEY_IP_PREFIX + "*")).items():
|
for key, score in cache.get_many(cache.keys(CACHE_KEY_IP_PREFIX + "*")).items():
|
||||||
|
@ -29,9 +23,8 @@ def save_ip_reputation(self: MonitoredTask):
|
||||||
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated IP Reputation"]))
|
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated IP Reputation"]))
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def save_user_reputation(self: PrefilledMonitoredTask):
|
||||||
def save_user_reputation(self: MonitoredTask):
|
|
||||||
"""Save currently cached reputation to database"""
|
"""Save currently cached reputation to database"""
|
||||||
objects_to_update = []
|
objects_to_update = []
|
||||||
for key, score in cache.get_many(cache.keys(CACHE_KEY_USER_PREFIX + "*")).items():
|
for key, score in cache.get_many(cache.keys(CACHE_KEY_USER_PREFIX + "*")).items():
|
||||||
|
|
|
@ -3,12 +3,7 @@ from django.utils.timezone import now
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
from authentik.core.models import AuthenticatedSession, User
|
from authentik.core.models import AuthenticatedSession, User
|
||||||
from authentik.events.monitored_tasks import (
|
from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus
|
||||||
MonitoredTask,
|
|
||||||
TaskResult,
|
|
||||||
TaskResultStatus,
|
|
||||||
prefill_task,
|
|
||||||
)
|
|
||||||
from authentik.lib.utils.time import timedelta_from_string
|
from authentik.lib.utils.time import timedelta_from_string
|
||||||
from authentik.root.celery import CELERY_APP
|
from authentik.root.celery import CELERY_APP
|
||||||
from authentik.sources.saml.models import SAMLSource
|
from authentik.sources.saml.models import SAMLSource
|
||||||
|
@ -16,9 +11,8 @@ from authentik.sources.saml.models import SAMLSource
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask)
|
||||||
@prefill_task
|
def clean_temporary_users(self: PrefilledMonitoredTask):
|
||||||
def clean_temporary_users(self: MonitoredTask):
|
|
||||||
"""Remove temporary users created by SAML Sources"""
|
"""Remove temporary users created by SAML Sources"""
|
||||||
_now = now()
|
_now = now()
|
||||||
messages = []
|
messages = []
|
||||||
|
|
Reference in a new issue