diff --git a/authentik/core/tests/utils.py b/authentik/core/tests/utils.py index a0c05370a..bd67ee909 100644 --- a/authentik/core/tests/utils.py +++ b/authentik/core/tests/utils.py @@ -26,11 +26,12 @@ def create_test_admin_user(name: Optional[str] = None) -> User: """Generate a test-admin user""" uid = generate_id(20) if not name else name group = Group.objects.create(name=uid, is_superuser=True) - user = User.objects.create( + user: User = User.objects.create( username=uid, name=uid, email=f"{uid}@goauthentik.io", ) + user.set_password(uid) group.users.add(user) return user diff --git a/tests/e2e/test_provider_ldap.py b/tests/e2e/test_provider_ldap.py index 72d674767..63c5b8b77 100644 --- a/tests/e2e/test_provider_ldap.py +++ b/tests/e2e/test_provider_ldap.py @@ -1,4 +1,5 @@ """LDAP and Outpost e2e tests""" +from dataclasses import asdict from sys import platform from time import sleep from unittest.case import skipUnless @@ -9,13 +10,14 @@ from guardian.shortcuts import get_anonymous_user from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server from ldap3.core.exceptions import LDAPInvalidCredentialsResult -from authentik.core.models import Application, Group, User +from authentik.core.models import Application, User +from authentik.core.tests.utils import create_test_admin_user from authentik.events.models import Event, EventAction from authentik.flows.models import Flow from authentik.outposts.managed import MANAGED_OUTPOST -from authentik.outposts.models import Outpost, OutpostType +from authentik.outposts.models import Outpost, OutpostConfig, OutpostType from authentik.providers.ldap.models import LDAPProvider, SearchModes -from tests.e2e.utils import USER, SeleniumTestCase, apply_migration, object_manager, retry +from tests.e2e.utils import SeleniumTestCase, apply_migration, object_manager, retry @skipUnless(platform.startswith("linux"), "requires local docker") @@ -47,14 +49,14 @@ class TestProviderLDAP(SeleniumTestCase): def _prepare(self) -> User: """prepare user, provider, app and container""" # set additionalHeaders to test later - user = USER() + user = create_test_admin_user() user.attributes["extraAttribute"] = "bar" user.save() ldap: LDAPProvider = LDAPProvider.objects.create( name="ldap_provider", authorization_flow=Flow.objects.get(slug="default-authentication-flow"), - search_group=Group.objects.first(), + search_group=user.ak_groups.first(), search_mode=SearchModes.CACHED, ) # we need to create an application to actually access the ldap @@ -62,10 +64,10 @@ class TestProviderLDAP(SeleniumTestCase): outpost: Outpost = Outpost.objects.create( name="ldap_outpost", type=OutpostType.LDAP, + _config=asdict(OutpostConfig(log_level="debug")), ) outpost.providers.add(ldap) outpost.save() - user = outpost.user self.ldap_container = self.start_ldap(outpost) @@ -78,7 +80,7 @@ class TestProviderLDAP(SeleniumTestCase): break healthcheck_retries += 1 sleep(0.5) - return user + return user, outpost @retry() @apply_migration("authentik_core", "0002_auto_20200523_1133_squashed_0011_provider_name_temp") @@ -86,22 +88,22 @@ class TestProviderLDAP(SeleniumTestCase): @object_manager def test_ldap_bind_success(self): """Test simple bind""" - self._prepare() + user, _ = self._prepare() server = Server("ldap://localhost:3389", get_info=ALL) _connection = Connection( server, raise_exceptions=True, - user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", - password=USER().username, + user=f"cn={user.username},ou=users,DC=ldap,DC=goauthentik,DC=io", + password=user.username, ) _connection.bind() self.assertTrue( Event.objects.filter( action=EventAction.LOGIN, user={ - "pk": USER().pk, - "email": USER().email, - "username": USER().username, + "pk": user.pk, + "email": user.email, + "username": user.username, }, ) ) @@ -112,22 +114,22 @@ class TestProviderLDAP(SeleniumTestCase): @object_manager def test_ldap_bind_success_ssl(self): """Test simple bind with ssl""" - self._prepare() + user, _ = self._prepare() server = Server("ldaps://localhost:6636", get_info=ALL) _connection = Connection( server, raise_exceptions=True, - user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", - password=USER().username, + user=f"cn={user.username},ou=users,DC=ldap,DC=goauthentik,DC=io", + password=user.username, ) _connection.bind() self.assertTrue( Event.objects.filter( action=EventAction.LOGIN, user={ - "pk": USER().pk, - "email": USER().email, - "username": USER().username, + "pk": user.pk, + "email": user.email, + "username": user.username, }, ) ) @@ -138,13 +140,13 @@ class TestProviderLDAP(SeleniumTestCase): @object_manager def test_ldap_bind_fail(self): """Test simple bind (failed)""" - self._prepare() + user, _ = self._prepare() server = Server("ldap://localhost:3389", get_info=ALL) _connection = Connection( server, raise_exceptions=True, - user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", - password=USER().username + "fqwerwqer", + user=f"cn={user.username},ou=users,DC=ldap,DC=goauthentik,DC=io", + password=user.username + "fqwerwqer", ) with self.assertRaises(LDAPInvalidCredentialsResult): _connection.bind() @@ -162,22 +164,22 @@ class TestProviderLDAP(SeleniumTestCase): @object_manager def test_ldap_bind_search(self): """Test simple bind + search""" - outpost_user = self._prepare() + user, outpost = self._prepare() server = Server("ldap://localhost:3389", get_info=ALL) _connection = Connection( server, raise_exceptions=True, - user=f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io", - password=USER().username, + user=f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", + password=user.username, ) _connection.bind() self.assertTrue( Event.objects.filter( action=EventAction.LOGIN, user={ - "pk": USER().pk, - "email": USER().email, - "username": USER().username, + "pk": user.pk, + "email": user.email, + "username": user.username, }, ) ) @@ -195,15 +197,16 @@ class TestProviderLDAP(SeleniumTestCase): for obj in response: del obj["raw_attributes"] del obj["raw_dn"] + o_user = outpost.suer self.assertCountEqual( response, [ { - "dn": f"cn={outpost_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", + "dn": f"cn={o_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", "attributes": { - "cn": [outpost_user.username], - "sAMAccountName": [outpost_user.username], - "uid": [outpost_user.uid], + "cn": [o_user.username], + "sAMAccountName": [o_user.username], + "uid": [o_user.uid], "name": [""], "displayName": [""], "mail": [""], @@ -213,8 +216,8 @@ class TestProviderLDAP(SeleniumTestCase): "inetOrgPerson", "goauthentik.io/ldap/user", ], - "uidNumber": [str(2000 + outpost_user.pk)], - "gidNumber": [str(2000 + outpost_user.pk)], + "uidNumber": [str(2000 + o_user.pk)], + "gidNumber": [str(2000 + o_user.pk)], "memberOf": [], "accountStatus": ["true"], "superuser": ["false"], @@ -253,23 +256,26 @@ class TestProviderLDAP(SeleniumTestCase): "type": "searchResEntry", }, { - "dn": f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io", + "dn": f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", "attributes": { - "cn": [USER().username], - "sAMAccountName": [USER().username], - "uid": [USER().uid], - "name": [USER().name], - "displayName": [USER().name], - "mail": [USER().email], + "cn": [user.username], + "sAMAccountName": [user.username], + "uid": [user.uid], + "name": [user.name], + "displayName": [user.name], + "mail": [user.email], "objectClass": [ "user", "organizationalPerson", "inetOrgPerson", "goauthentik.io/ldap/user", ], - "uidNumber": [str(2000 + USER().pk)], - "gidNumber": [str(2000 + USER().pk)], - "memberOf": ["cn=authentik Admins,ou=groups,dc=ldap,dc=goauthentik,dc=io"], + "uidNumber": [str(2000 + user.pk)], + "gidNumber": [str(2000 + user.pk)], + "memberOf": [ + f"cn={group.name},ou=groups,dc=ldap,dc=goauthentik,dc=io" + for group in user.ak_groups.all() + ], "accountStatus": ["true"], "superuser": ["true"], "goauthentik.io/ldap/active": ["true"],