tests/e2e: fix ldap provider tests
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
b523cd064b
commit
6eb132c48b
|
@ -26,11 +26,12 @@ def create_test_admin_user(name: Optional[str] = None) -> User:
|
||||||
"""Generate a test-admin user"""
|
"""Generate a test-admin user"""
|
||||||
uid = generate_id(20) if not name else name
|
uid = generate_id(20) if not name else name
|
||||||
group = Group.objects.create(name=uid, is_superuser=True)
|
group = Group.objects.create(name=uid, is_superuser=True)
|
||||||
user = User.objects.create(
|
user: User = User.objects.create(
|
||||||
username=uid,
|
username=uid,
|
||||||
name=uid,
|
name=uid,
|
||||||
email=f"{uid}@goauthentik.io",
|
email=f"{uid}@goauthentik.io",
|
||||||
)
|
)
|
||||||
|
user.set_password(uid)
|
||||||
group.users.add(user)
|
group.users.add(user)
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
"""LDAP and Outpost e2e tests"""
|
"""LDAP and Outpost e2e tests"""
|
||||||
|
from dataclasses import asdict
|
||||||
from sys import platform
|
from sys import platform
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from unittest.case import skipUnless
|
from unittest.case import skipUnless
|
||||||
|
@ -9,13 +10,14 @@ from guardian.shortcuts import get_anonymous_user
|
||||||
from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server
|
from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server
|
||||||
from ldap3.core.exceptions import LDAPInvalidCredentialsResult
|
from ldap3.core.exceptions import LDAPInvalidCredentialsResult
|
||||||
|
|
||||||
from authentik.core.models import Application, Group, User
|
from authentik.core.models import Application, User
|
||||||
|
from authentik.core.tests.utils import create_test_admin_user
|
||||||
from authentik.events.models import Event, EventAction
|
from authentik.events.models import Event, EventAction
|
||||||
from authentik.flows.models import Flow
|
from authentik.flows.models import Flow
|
||||||
from authentik.outposts.managed import MANAGED_OUTPOST
|
from authentik.outposts.managed import MANAGED_OUTPOST
|
||||||
from authentik.outposts.models import Outpost, OutpostType
|
from authentik.outposts.models import Outpost, OutpostConfig, OutpostType
|
||||||
from authentik.providers.ldap.models import LDAPProvider, SearchModes
|
from authentik.providers.ldap.models import LDAPProvider, SearchModes
|
||||||
from tests.e2e.utils import USER, SeleniumTestCase, apply_migration, object_manager, retry
|
from tests.e2e.utils import SeleniumTestCase, apply_migration, object_manager, retry
|
||||||
|
|
||||||
|
|
||||||
@skipUnless(platform.startswith("linux"), "requires local docker")
|
@skipUnless(platform.startswith("linux"), "requires local docker")
|
||||||
|
@ -47,14 +49,14 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
def _prepare(self) -> User:
|
def _prepare(self) -> User:
|
||||||
"""prepare user, provider, app and container"""
|
"""prepare user, provider, app and container"""
|
||||||
# set additionalHeaders to test later
|
# set additionalHeaders to test later
|
||||||
user = USER()
|
user = create_test_admin_user()
|
||||||
user.attributes["extraAttribute"] = "bar"
|
user.attributes["extraAttribute"] = "bar"
|
||||||
user.save()
|
user.save()
|
||||||
|
|
||||||
ldap: LDAPProvider = LDAPProvider.objects.create(
|
ldap: LDAPProvider = LDAPProvider.objects.create(
|
||||||
name="ldap_provider",
|
name="ldap_provider",
|
||||||
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
|
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
|
||||||
search_group=Group.objects.first(),
|
search_group=user.ak_groups.first(),
|
||||||
search_mode=SearchModes.CACHED,
|
search_mode=SearchModes.CACHED,
|
||||||
)
|
)
|
||||||
# we need to create an application to actually access the ldap
|
# we need to create an application to actually access the ldap
|
||||||
|
@ -62,10 +64,10 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
outpost: Outpost = Outpost.objects.create(
|
outpost: Outpost = Outpost.objects.create(
|
||||||
name="ldap_outpost",
|
name="ldap_outpost",
|
||||||
type=OutpostType.LDAP,
|
type=OutpostType.LDAP,
|
||||||
|
_config=asdict(OutpostConfig(log_level="debug")),
|
||||||
)
|
)
|
||||||
outpost.providers.add(ldap)
|
outpost.providers.add(ldap)
|
||||||
outpost.save()
|
outpost.save()
|
||||||
user = outpost.user
|
|
||||||
|
|
||||||
self.ldap_container = self.start_ldap(outpost)
|
self.ldap_container = self.start_ldap(outpost)
|
||||||
|
|
||||||
|
@ -78,7 +80,7 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
break
|
break
|
||||||
healthcheck_retries += 1
|
healthcheck_retries += 1
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
return user
|
return user, outpost
|
||||||
|
|
||||||
@retry()
|
@retry()
|
||||||
@apply_migration("authentik_core", "0002_auto_20200523_1133_squashed_0011_provider_name_temp")
|
@apply_migration("authentik_core", "0002_auto_20200523_1133_squashed_0011_provider_name_temp")
|
||||||
|
@ -86,22 +88,22 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
@object_manager
|
@object_manager
|
||||||
def test_ldap_bind_success(self):
|
def test_ldap_bind_success(self):
|
||||||
"""Test simple bind"""
|
"""Test simple bind"""
|
||||||
self._prepare()
|
user, _ = self._prepare()
|
||||||
server = Server("ldap://localhost:3389", get_info=ALL)
|
server = Server("ldap://localhost:3389", get_info=ALL)
|
||||||
_connection = Connection(
|
_connection = Connection(
|
||||||
server,
|
server,
|
||||||
raise_exceptions=True,
|
raise_exceptions=True,
|
||||||
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
user=f"cn={user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
||||||
password=USER().username,
|
password=user.username,
|
||||||
)
|
)
|
||||||
_connection.bind()
|
_connection.bind()
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
Event.objects.filter(
|
Event.objects.filter(
|
||||||
action=EventAction.LOGIN,
|
action=EventAction.LOGIN,
|
||||||
user={
|
user={
|
||||||
"pk": USER().pk,
|
"pk": user.pk,
|
||||||
"email": USER().email,
|
"email": user.email,
|
||||||
"username": USER().username,
|
"username": user.username,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -112,22 +114,22 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
@object_manager
|
@object_manager
|
||||||
def test_ldap_bind_success_ssl(self):
|
def test_ldap_bind_success_ssl(self):
|
||||||
"""Test simple bind with ssl"""
|
"""Test simple bind with ssl"""
|
||||||
self._prepare()
|
user, _ = self._prepare()
|
||||||
server = Server("ldaps://localhost:6636", get_info=ALL)
|
server = Server("ldaps://localhost:6636", get_info=ALL)
|
||||||
_connection = Connection(
|
_connection = Connection(
|
||||||
server,
|
server,
|
||||||
raise_exceptions=True,
|
raise_exceptions=True,
|
||||||
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
user=f"cn={user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
||||||
password=USER().username,
|
password=user.username,
|
||||||
)
|
)
|
||||||
_connection.bind()
|
_connection.bind()
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
Event.objects.filter(
|
Event.objects.filter(
|
||||||
action=EventAction.LOGIN,
|
action=EventAction.LOGIN,
|
||||||
user={
|
user={
|
||||||
"pk": USER().pk,
|
"pk": user.pk,
|
||||||
"email": USER().email,
|
"email": user.email,
|
||||||
"username": USER().username,
|
"username": user.username,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -138,13 +140,13 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
@object_manager
|
@object_manager
|
||||||
def test_ldap_bind_fail(self):
|
def test_ldap_bind_fail(self):
|
||||||
"""Test simple bind (failed)"""
|
"""Test simple bind (failed)"""
|
||||||
self._prepare()
|
user, _ = self._prepare()
|
||||||
server = Server("ldap://localhost:3389", get_info=ALL)
|
server = Server("ldap://localhost:3389", get_info=ALL)
|
||||||
_connection = Connection(
|
_connection = Connection(
|
||||||
server,
|
server,
|
||||||
raise_exceptions=True,
|
raise_exceptions=True,
|
||||||
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
user=f"cn={user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
||||||
password=USER().username + "fqwerwqer",
|
password=user.username + "fqwerwqer",
|
||||||
)
|
)
|
||||||
with self.assertRaises(LDAPInvalidCredentialsResult):
|
with self.assertRaises(LDAPInvalidCredentialsResult):
|
||||||
_connection.bind()
|
_connection.bind()
|
||||||
|
@ -162,22 +164,22 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
@object_manager
|
@object_manager
|
||||||
def test_ldap_bind_search(self):
|
def test_ldap_bind_search(self):
|
||||||
"""Test simple bind + search"""
|
"""Test simple bind + search"""
|
||||||
outpost_user = self._prepare()
|
user, outpost = self._prepare()
|
||||||
server = Server("ldap://localhost:3389", get_info=ALL)
|
server = Server("ldap://localhost:3389", get_info=ALL)
|
||||||
_connection = Connection(
|
_connection = Connection(
|
||||||
server,
|
server,
|
||||||
raise_exceptions=True,
|
raise_exceptions=True,
|
||||||
user=f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
user=f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
||||||
password=USER().username,
|
password=user.username,
|
||||||
)
|
)
|
||||||
_connection.bind()
|
_connection.bind()
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
Event.objects.filter(
|
Event.objects.filter(
|
||||||
action=EventAction.LOGIN,
|
action=EventAction.LOGIN,
|
||||||
user={
|
user={
|
||||||
"pk": USER().pk,
|
"pk": user.pk,
|
||||||
"email": USER().email,
|
"email": user.email,
|
||||||
"username": USER().username,
|
"username": user.username,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -195,15 +197,16 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
for obj in response:
|
for obj in response:
|
||||||
del obj["raw_attributes"]
|
del obj["raw_attributes"]
|
||||||
del obj["raw_dn"]
|
del obj["raw_dn"]
|
||||||
|
o_user = outpost.suer
|
||||||
self.assertCountEqual(
|
self.assertCountEqual(
|
||||||
response,
|
response,
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"dn": f"cn={outpost_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
"dn": f"cn={o_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
||||||
"attributes": {
|
"attributes": {
|
||||||
"cn": [outpost_user.username],
|
"cn": [o_user.username],
|
||||||
"sAMAccountName": [outpost_user.username],
|
"sAMAccountName": [o_user.username],
|
||||||
"uid": [outpost_user.uid],
|
"uid": [o_user.uid],
|
||||||
"name": [""],
|
"name": [""],
|
||||||
"displayName": [""],
|
"displayName": [""],
|
||||||
"mail": [""],
|
"mail": [""],
|
||||||
|
@ -213,8 +216,8 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
"inetOrgPerson",
|
"inetOrgPerson",
|
||||||
"goauthentik.io/ldap/user",
|
"goauthentik.io/ldap/user",
|
||||||
],
|
],
|
||||||
"uidNumber": [str(2000 + outpost_user.pk)],
|
"uidNumber": [str(2000 + o_user.pk)],
|
||||||
"gidNumber": [str(2000 + outpost_user.pk)],
|
"gidNumber": [str(2000 + o_user.pk)],
|
||||||
"memberOf": [],
|
"memberOf": [],
|
||||||
"accountStatus": ["true"],
|
"accountStatus": ["true"],
|
||||||
"superuser": ["false"],
|
"superuser": ["false"],
|
||||||
|
@ -253,23 +256,26 @@ class TestProviderLDAP(SeleniumTestCase):
|
||||||
"type": "searchResEntry",
|
"type": "searchResEntry",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"dn": f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
"dn": f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
||||||
"attributes": {
|
"attributes": {
|
||||||
"cn": [USER().username],
|
"cn": [user.username],
|
||||||
"sAMAccountName": [USER().username],
|
"sAMAccountName": [user.username],
|
||||||
"uid": [USER().uid],
|
"uid": [user.uid],
|
||||||
"name": [USER().name],
|
"name": [user.name],
|
||||||
"displayName": [USER().name],
|
"displayName": [user.name],
|
||||||
"mail": [USER().email],
|
"mail": [user.email],
|
||||||
"objectClass": [
|
"objectClass": [
|
||||||
"user",
|
"user",
|
||||||
"organizationalPerson",
|
"organizationalPerson",
|
||||||
"inetOrgPerson",
|
"inetOrgPerson",
|
||||||
"goauthentik.io/ldap/user",
|
"goauthentik.io/ldap/user",
|
||||||
],
|
],
|
||||||
"uidNumber": [str(2000 + USER().pk)],
|
"uidNumber": [str(2000 + user.pk)],
|
||||||
"gidNumber": [str(2000 + USER().pk)],
|
"gidNumber": [str(2000 + user.pk)],
|
||||||
"memberOf": ["cn=authentik Admins,ou=groups,dc=ldap,dc=goauthentik,dc=io"],
|
"memberOf": [
|
||||||
|
f"cn={group.name},ou=groups,dc=ldap,dc=goauthentik,dc=io"
|
||||||
|
for group in user.ak_groups.all()
|
||||||
|
],
|
||||||
"accountStatus": ["true"],
|
"accountStatus": ["true"],
|
||||||
"superuser": ["true"],
|
"superuser": ["true"],
|
||||||
"goauthentik.io/ldap/active": ["true"],
|
"goauthentik.io/ldap/active": ["true"],
|
||||||
|
|
Reference in a new issue