lifecycle: make worker wait for migrations to be done (#3254)
* lifecycle: make worker wait for migrations to be done * retry managed reconcile task
This commit is contained in:
parent
e704092d19
commit
e9d9d658c4
|
@ -11,7 +11,11 @@ from authentik.events.monitored_tasks import (
|
||||||
from authentik.managed.manager import ObjectManager
|
from authentik.managed.manager import ObjectManager
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
@CELERY_APP.task(
|
||||||
|
bind=True,
|
||||||
|
base=MonitoredTask,
|
||||||
|
retry_backoff=True,
|
||||||
|
)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def managed_reconcile(self: MonitoredTask):
|
def managed_reconcile(self: MonitoredTask):
|
||||||
"""Run ObjectManager to ensure objects are up-to-date"""
|
"""Run ObjectManager to ensure objects are up-to-date"""
|
||||||
|
@ -22,3 +26,4 @@ def managed_reconcile(self: MonitoredTask):
|
||||||
)
|
)
|
||||||
except DatabaseError as exc: # pragma: no cover
|
except DatabaseError as exc: # pragma: no cover
|
||||||
self.set_status(TaskResult(TaskResultStatus.WARNING, [str(exc)]))
|
self.set_status(TaskResult(TaskResultStatus.WARNING, [str(exc)]))
|
||||||
|
self.retry()
|
||||||
|
|
|
@ -44,6 +44,11 @@ if [[ "$1" == "server" ]]; then
|
||||||
/authentik-proxy
|
/authentik-proxy
|
||||||
elif [[ "$1" == "worker" ]]; then
|
elif [[ "$1" == "worker" ]]; then
|
||||||
wait_for_db
|
wait_for_db
|
||||||
|
# Check if the migration lock is set, and exit if so
|
||||||
|
# the orchestrator should restart this container, and this prevents
|
||||||
|
# errors when startup tasks are attempted to be run without
|
||||||
|
# migrations in place
|
||||||
|
python -m lifecycle.migrate check_lock
|
||||||
echo "worker" > $MODE_FILE
|
echo "worker" > $MODE_FILE
|
||||||
check_if_root "celery -A authentik.root.celery worker -Ofair --max-tasks-per-child=1 --autoscale 3,1 -E -B -s /tmp/celerybeat-schedule -Q authentik,authentik_scheduled,authentik_events"
|
check_if_root "celery -A authentik.root.celery worker -Ofair --max-tasks-per-child=1 --autoscale 3,1 -E -B -s /tmp/celerybeat-schedule -Q authentik,authentik_scheduled,authentik_events"
|
||||||
elif [[ "$1" == "bash" ]]; then
|
elif [[ "$1" == "bash" ]]; then
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
"""System Migration handler"""
|
"""System Migration handler"""
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
from importlib.util import module_from_spec, spec_from_file_location
|
from importlib.util import module_from_spec, spec_from_file_location
|
||||||
from inspect import getmembers, isclass
|
from inspect import getmembers, isclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
@ -50,7 +51,16 @@ def release_lock():
|
||||||
curr.execute("SELECT pg_advisory_unlock(%s)", (ADV_LOCK_UID,))
|
curr.execute("SELECT pg_advisory_unlock(%s)", (ADV_LOCK_UID,))
|
||||||
|
|
||||||
|
|
||||||
|
def is_locked():
|
||||||
|
"""Check if lock is currently active (used by worker to wait for migrations)"""
|
||||||
|
curr.executor("SELECT count(*) FROM pg_locks WHERE objid = %s", (ADV_LOCK_UID,))
|
||||||
|
return curr.rowcount
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
if sys.argv[1] == "check_lock":
|
||||||
|
sys.exit(is_locked())
|
||||||
|
|
||||||
conn = connect(
|
conn = connect(
|
||||||
dbname=CONFIG.y("postgresql.name"),
|
dbname=CONFIG.y("postgresql.name"),
|
||||||
|
|
Reference in a new issue