sources/ldap: add lock to sync (#6930)

This commit is contained in:
Jens L 2023-09-18 21:38:01 +02:00 committed by GitHub
parent 42c3cfa65d
commit 000244e387
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,6 +4,8 @@ from uuid import uuid4
from celery import chain, group from celery import chain, group
from django.core.cache import cache from django.core.cache import cache
from ldap3.core.exceptions import LDAPException from ldap3.core.exceptions import LDAPException
from redis.exceptions import LockError
from redis.lock import Lock
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
@ -45,18 +47,28 @@ def ldap_sync_single(source_pk: str):
source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first() source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first()
if not source: if not source:
return return
task = chain( lock = Lock(cache.client.get_client(), name=f"goauthentik.io/sources/ldap/sync-{source.slug}")
# User and group sync can happen at once, they have no dependencies on each other if lock.locked():
group( LOGGER.debug("LDAP sync locked, skipping task", source=source.slug)
ldap_sync_paginator(source, UserLDAPSynchronizer) return
+ ldap_sync_paginator(source, GroupLDAPSynchronizer), try:
), with lock:
# Membership sync needs to run afterwards task = chain(
group( # User and group sync can happen at once, they have no dependencies on each other
ldap_sync_paginator(source, MembershipLDAPSynchronizer), group(
), ldap_sync_paginator(source, UserLDAPSynchronizer)
) + ldap_sync_paginator(source, GroupLDAPSynchronizer),
task() ),
# Membership sync needs to run afterwards
group(
ldap_sync_paginator(source, MembershipLDAPSynchronizer),
),
)
task()
except LockError:
# This should never happen, we check if the lock is locked above so this
# would only happen if there was some other timeout
LOGGER.debug("Failed to acquire lock for LDAP sync", source=source.slug)
def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) -> list: def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) -> list: