sources/ldap: use openldap tests for entire sync
This commit is contained in:
parent
9c1ade59e9
commit
478d3430eb
|
@ -24,31 +24,32 @@ class GroupLDAPSynchronizer(BaseLDAPSynchronizer):
|
|||
group_count = 0
|
||||
for group in groups:
|
||||
attributes = group.get("attributes", {})
|
||||
group_dn = self._flatten(group.get("entryDN", ""))
|
||||
if self._source.object_uniqueness_field not in attributes:
|
||||
self._logger.warning(
|
||||
"Cannot find uniqueness Field in attributes",
|
||||
attributes=attributes.keys(),
|
||||
dn=attributes.get(LDAP_DISTINGUISHED_NAME, ""),
|
||||
dn=group_dn,
|
||||
)
|
||||
continue
|
||||
uniq = attributes[self._source.object_uniqueness_field]
|
||||
# TODO: Use Property Mappings
|
||||
name = self._flatten(attributes.get("name", ""))
|
||||
_, created = Group.objects.update_or_create(
|
||||
**{
|
||||
f"attributes__{LDAP_UNIQUENESS}": uniq,
|
||||
"parent": self._source.sync_parent_group,
|
||||
"defaults": {
|
||||
"name": attributes.get("name", ""),
|
||||
"name": name,
|
||||
"attributes": {
|
||||
LDAP_UNIQUENESS: uniq,
|
||||
LDAP_DISTINGUISHED_NAME: attributes.get(
|
||||
"distinguishedName"
|
||||
),
|
||||
LDAP_DISTINGUISHED_NAME: group_dn,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
self._logger.debug(
|
||||
"Synced group", group=attributes.get("name", ""), created=created
|
||||
"Synced group", group=name, created=created
|
||||
)
|
||||
group_count += 1
|
||||
return group_count
|
||||
|
|
|
@ -28,7 +28,7 @@ def mock_slapd_connection(password: str) -> Connection:
|
|||
{
|
||||
"name": "test-group",
|
||||
"uid": "unique-test-group",
|
||||
"objectClass": "group",
|
||||
"objectClass": "groupOfNames",
|
||||
"member": ["cn=user0,ou=users,dc=goauthentik,dc=io"],
|
||||
},
|
||||
)
|
||||
|
@ -37,7 +37,7 @@ def mock_slapd_connection(password: str) -> Connection:
|
|||
"cn=group2,ou=groups,dc=goauthentik,dc=io",
|
||||
{
|
||||
"name": "test-group",
|
||||
"objectClass": "group",
|
||||
"objectClass": "groupOfNames",
|
||||
},
|
||||
)
|
||||
connection.strategy.add_entry(
|
||||
|
|
|
@ -37,12 +37,6 @@ class LDAPSyncTests(TestCase):
|
|||
| Q(name__startswith="authentik default Active Directory Mapping")
|
||||
)
|
||||
)
|
||||
print(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default Active Directory Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_ad_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
"""LDAP Source tests"""
|
||||
from authentik.sources.ldap.tests.mock_slapd import mock_slapd_connection
|
||||
from unittest.mock import PropertyMock, patch
|
||||
|
||||
from django.db.models import Q
|
||||
from django.test import TestCase
|
||||
|
||||
from authentik.core.models import Group, User
|
||||
|
@ -14,8 +16,6 @@ from authentik.sources.ldap.tasks import ldap_sync_all
|
|||
from authentik.sources.ldap.tests.mock_ad import mock_ad_connection
|
||||
|
||||
LDAP_PASSWORD = generate_client_secret()
|
||||
LDAP_CONNECTION_PATCH = PropertyMock(return_value=mock_ad_connection(LDAP_PASSWORD))
|
||||
|
||||
|
||||
class LDAPSyncTests(TestCase):
|
||||
"""LDAP Sync tests"""
|
||||
|
@ -29,28 +29,102 @@ class LDAPSyncTests(TestCase):
|
|||
additional_user_dn="ou=users",
|
||||
additional_group_dn="ou=groups",
|
||||
)
|
||||
self.source.property_mappings.set(LDAPPropertyMapping.objects.all())
|
||||
self.source.save()
|
||||
|
||||
@patch("authentik.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
|
||||
def test_sync_users(self):
|
||||
def test_sync_users_ad(self):
|
||||
"""Test user sync"""
|
||||
user_sync = UserLDAPSynchronizer(self.source)
|
||||
user_sync.sync()
|
||||
self.assertTrue(User.objects.filter(username="user0_sn").exists())
|
||||
self.assertFalse(User.objects.filter(username="user1_sn").exists())
|
||||
self.source.property_mappings.set(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default Active Directory Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_ad_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
user_sync = UserLDAPSynchronizer(self.source)
|
||||
user_sync.sync()
|
||||
self.assertTrue(User.objects.filter(username="user0_sn").exists())
|
||||
self.assertFalse(User.objects.filter(username="user1_sn").exists())
|
||||
|
||||
@patch("authentik.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
|
||||
def test_sync_groups(self):
|
||||
def test_sync_users_openldap(self):
|
||||
"""Test user sync"""
|
||||
self.source.object_uniqueness_field = "uid"
|
||||
self.source.property_mappings.set(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default OpenLDAP Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
user_sync = UserLDAPSynchronizer(self.source)
|
||||
user_sync.sync()
|
||||
self.assertTrue(User.objects.filter(username="user0_sn").exists())
|
||||
self.assertFalse(User.objects.filter(username="user1_sn").exists())
|
||||
|
||||
def test_sync_groups_ad(self):
|
||||
"""Test group sync"""
|
||||
group_sync = GroupLDAPSynchronizer(self.source)
|
||||
group_sync.sync()
|
||||
membership_sync = MembershipLDAPSynchronizer(self.source)
|
||||
membership_sync.sync()
|
||||
group = Group.objects.filter(name="test-group")
|
||||
self.assertTrue(group.exists())
|
||||
self.source.property_mappings.set(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default Active Directory Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_ad_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
group_sync = GroupLDAPSynchronizer(self.source)
|
||||
group_sync.sync()
|
||||
membership_sync = MembershipLDAPSynchronizer(self.source)
|
||||
membership_sync.sync()
|
||||
group = Group.objects.filter(name="test-group")
|
||||
self.assertTrue(group.exists())
|
||||
|
||||
@patch("authentik.sources.ldap.models.LDAPSource.connection", LDAP_CONNECTION_PATCH)
|
||||
def test_tasks(self):
|
||||
def test_sync_groups_openldap(self):
|
||||
"""Test group sync"""
|
||||
self.source.object_uniqueness_field = "uid"
|
||||
self.source.group_object_filter = "(objectClass=groupOfNames)"
|
||||
self.source.property_mappings.set(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default OpenLDAP Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
group_sync = GroupLDAPSynchronizer(self.source)
|
||||
group_sync.sync()
|
||||
membership_sync = MembershipLDAPSynchronizer(self.source)
|
||||
membership_sync.sync()
|
||||
group = Group.objects.filter(name="test-group")
|
||||
self.assertTrue(group.exists())
|
||||
|
||||
def test_tasks_ad(self):
|
||||
"""Test Scheduled tasks"""
|
||||
ldap_sync_all.delay().get()
|
||||
self.source.property_mappings.set(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default Active Directory Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_ad_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
ldap_sync_all.delay().get()
|
||||
|
||||
def test_tasks_openldap(self):
|
||||
"""Test Scheduled tasks"""
|
||||
self.source.object_uniqueness_field = "uid"
|
||||
self.source.group_object_filter = "(objectClass=groupOfNames)"
|
||||
self.source.property_mappings.set(
|
||||
LDAPPropertyMapping.objects.filter(
|
||||
Q(name__startswith="authentik default LDAP Mapping")
|
||||
| Q(name__startswith="authentik default OpenLDAP Mapping")
|
||||
)
|
||||
)
|
||||
self.source.save()
|
||||
connection = PropertyMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
|
||||
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
|
||||
ldap_sync_all.delay().get()
|
||||
|
|
Reference in a new issue