From 7d1efd7450e657affc843119f74973ba0f0a5280 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Mon, 11 Sep 2023 19:33:54 +0200 Subject: [PATCH] root: replace celery queues with priority Signed-off-by: Jens Langhammer --- authentik/admin/settings.py | 3 ++- authentik/blueprints/settings.py | 5 +++-- authentik/core/management/commands/worker.py | 1 - authentik/crypto/settings.py | 3 ++- authentik/enterprise/settings.py | 3 ++- authentik/events/settings.py | 3 ++- authentik/events/tasks.py | 3 ++- authentik/lib/default.yml | 5 +++++ authentik/outposts/settings.py | 9 +++++---- authentik/policies/reputation/settings.py | 4 +++- authentik/providers/scim/settings.py | 3 ++- authentik/root/settings.py | 10 ++++++++-- authentik/sources/ldap/settings.py | 3 ++- authentik/sources/ldap/tasks.py | 2 ++ authentik/sources/plex/settings.py | 3 ++- 15 files changed, 42 insertions(+), 18 deletions(-) diff --git a/authentik/admin/settings.py b/authentik/admin/settings.py index 37b9d8bda..42974f8b9 100644 --- a/authentik/admin/settings.py +++ b/authentik/admin/settings.py @@ -1,12 +1,13 @@ """authentik admin settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "admin_latest_version": { "task": "authentik.admin.tasks.update_latest_version", "schedule": crontab(minute=fqdn_rand("admin_latest_version"), hour="*"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, } } diff --git a/authentik/blueprints/settings.py b/authentik/blueprints/settings.py index c9ce11057..86df53a3d 100644 --- a/authentik/blueprints/settings.py +++ b/authentik/blueprints/settings.py @@ -1,17 +1,18 @@ """blueprint Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "blueprints_v1_discover": { "task": "authentik.blueprints.v1.tasks.blueprints_discovery", "schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, "blueprints_v1_cleanup": { "task": "authentik.blueprints.v1.tasks.clear_failed_blueprints", "schedule": crontab(minute=fqdn_rand("blueprints_v1_cleanup"), hour="*"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, } diff --git a/authentik/core/management/commands/worker.py b/authentik/core/management/commands/worker.py index b22187efe..dfed8d4e5 100644 --- a/authentik/core/management/commands/worker.py +++ b/authentik/core/management/commands/worker.py @@ -33,7 +33,6 @@ class Command(BaseCommand): task_events=True, beat=options.get("beat", True), schedule_filename=f"{tempdir}/celerybeat-schedule", - queues=["authentik", "authentik_scheduled", "authentik_events"], ) for task in CELERY_APP.tasks: LOGGER.debug("Registered task", task=task) diff --git a/authentik/crypto/settings.py b/authentik/crypto/settings.py index 939473a96..52256df88 100644 --- a/authentik/crypto/settings.py +++ b/authentik/crypto/settings.py @@ -1,12 +1,13 @@ """Crypto task Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "crypto_certificate_discovery": { "task": "authentik.crypto.tasks.certificate_discovery", "schedule": crontab(minute=fqdn_rand("crypto_certificate_discovery"), hour="*"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, } diff --git a/authentik/enterprise/settings.py b/authentik/enterprise/settings.py index af1da7294..812e56db4 100644 --- a/authentik/enterprise/settings.py +++ b/authentik/enterprise/settings.py @@ -1,12 +1,13 @@ """Enterprise additional settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "enterprise_calculate_license": { "task": "authentik.enterprise.tasks.calculate_license", "schedule": crontab(minute=fqdn_rand("calculate_license"), hour="*/8"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, } } diff --git a/authentik/events/settings.py b/authentik/events/settings.py index 4e41e3953..b29ea0a1b 100644 --- a/authentik/events/settings.py +++ b/authentik/events/settings.py @@ -1,12 +1,13 @@ """Event Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "events_notification_cleanup": { "task": "authentik.events.tasks.notification_cleanup", "schedule": crontab(minute=fqdn_rand("notification_cleanup"), hour="*/8"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, } diff --git a/authentik/events/tasks.py b/authentik/events/tasks.py index 9c00860f8..e2f28ee44 100644 --- a/authentik/events/tasks.py +++ b/authentik/events/tasks.py @@ -20,6 +20,7 @@ from authentik.events.monitored_tasks import ( TaskResultStatus, prefill_task, ) +from authentik.lib.config import CONFIG from authentik.policies.engine import PolicyEngine from authentik.policies.models import PolicyBinding, PolicyEngineMode from authentik.root.celery import CELERY_APP @@ -89,7 +90,7 @@ def event_trigger_handler(event_uuid: str, trigger_name: str): user.pk, str(trigger.pk), ], - queue="authentik_events", + priority=CONFIG.get_int("worker.priority.events"), ) if transport.send_once: break diff --git a/authentik/lib/default.yml b/authentik/lib/default.yml index 793bece13..fcfe59544 100644 --- a/authentik/lib/default.yml +++ b/authentik/lib/default.yml @@ -114,3 +114,8 @@ web: worker: concurrency: 2 + priority: + default: 4 + scheduled: 9 + sync: 9 + events: 8 diff --git a/authentik/outposts/settings.py b/authentik/outposts/settings.py index 6ce2d52c8..71c4fde0f 100644 --- a/authentik/outposts/settings.py +++ b/authentik/outposts/settings.py @@ -1,27 +1,28 @@ """Outposts Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "outposts_controller": { "task": "authentik.outposts.tasks.outpost_controller_all", "schedule": crontab(minute=fqdn_rand("outposts_controller"), hour="*/4"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, "outposts_service_connection_check": { "task": "authentik.outposts.tasks.outpost_service_connection_monitor", "schedule": crontab(minute="3-59/15"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, "outpost_token_ensurer": { "task": "authentik.outposts.tasks.outpost_token_ensurer", "schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, "outpost_connection_discovery": { "task": "authentik.outposts.tasks.outpost_connection_discovery", "schedule": crontab(minute=fqdn_rand("outpost_connection_discovery"), hour="*/8"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, } diff --git a/authentik/policies/reputation/settings.py b/authentik/policies/reputation/settings.py index b51ecc187..6ac87991a 100644 --- a/authentik/policies/reputation/settings.py +++ b/authentik/policies/reputation/settings.py @@ -1,10 +1,12 @@ """Reputation Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG + CELERY_BEAT_SCHEDULE = { "policies_reputation_save": { "task": "authentik.policies.reputation.tasks.save_reputation", "schedule": crontab(minute="1-59/5"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, } diff --git a/authentik/providers/scim/settings.py b/authentik/providers/scim/settings.py index 02b2e6e6e..4563439da 100644 --- a/authentik/providers/scim/settings.py +++ b/authentik/providers/scim/settings.py @@ -1,12 +1,13 @@ """SCIM task Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "providers_scim_sync": { "task": "authentik.providers.scim.tasks.scim_sync_all", "schedule": crontab(minute=fqdn_rand("scim_sync_all"), hour="*"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, } diff --git a/authentik/root/settings.py b/authentik/root/settings.py index ee31f2cc6..87208ca54 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -339,17 +339,23 @@ CELERY = { "clean_expired_models": { "task": "authentik.core.tasks.clean_expired_models", "schedule": crontab(minute="2-59/5"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, "user_cleanup": { "task": "authentik.core.tasks.clean_temporary_users", "schedule": crontab(minute="9-59/5"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, }, "task_create_missing_queues": True, "task_default_queue": "authentik", + "task_default_priority": 4, + "task_inherit_parent_priority": True, "broker_url": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}", + "broker_transport_options": { + "queue_order_strategy": "priority", + "priority_steps": list(range(10)), + }, "result_backend": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}", } diff --git a/authentik/sources/ldap/settings.py b/authentik/sources/ldap/settings.py index 6b526b357..8378074ea 100644 --- a/authentik/sources/ldap/settings.py +++ b/authentik/sources/ldap/settings.py @@ -1,12 +1,13 @@ """LDAP Settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "sources_ldap_sync": { "task": "authentik.sources.ldap.tasks.ldap_sync_all", "schedule": crontab(minute=fqdn_rand("sources_ldap_sync"), hour="*/2"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, } } diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index 9c4d6af73..a05c75f9b 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -58,10 +58,12 @@ def ldap_sync_single(source_pk: str): group( ldap_sync_paginator(source, UserLDAPSynchronizer) + ldap_sync_paginator(source, GroupLDAPSynchronizer), + priority=CONFIG.get_int("worker.priority.sync"), ), # Membership sync needs to run afterwards group( ldap_sync_paginator(source, MembershipLDAPSynchronizer), + priority=CONFIG.get_int("worker.priority.sync"), ), ) task() diff --git a/authentik/sources/plex/settings.py b/authentik/sources/plex/settings.py index c23143484..f51e22a2c 100644 --- a/authentik/sources/plex/settings.py +++ b/authentik/sources/plex/settings.py @@ -1,12 +1,13 @@ """Plex source settings""" from celery.schedules import crontab +from authentik.lib.config import CONFIG from authentik.lib.utils.time import fqdn_rand CELERY_BEAT_SCHEDULE = { "check_plex_token": { "task": "authentik.sources.plex.tasks.check_plex_token_all", "schedule": crontab(minute=fqdn_rand("check_plex_token"), hour="*/3"), - "options": {"queue": "authentik_scheduled"}, + "options": {"priority": CONFIG.get_int("worker.priority.scheduled")}, }, }