lib: add tests for ak_create_event (#5710)

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
# Conflicts:
#	locale/en/LC_MESSAGES/django.po
This commit is contained in:
Jens L 2023-05-22 00:18:54 +02:00 committed by Jens Langhammer
parent 2cd68dfa87
commit 573517bf0a
No known key found for this signature in database
3 changed files with 26 additions and 14 deletions

View file

@ -7,7 +7,6 @@ from smtplib import SMTPException
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
from uuid import uuid4 from uuid import uuid4
from django.conf import settings
from django.db import models from django.db import models
from django.db.models import Count, ExpressionWrapper, F from django.db.models import Count, ExpressionWrapper, F
from django.db.models.fields import DurationField from django.db.models.fields import DurationField
@ -207,9 +206,7 @@ class Event(SerializerModel, ExpiringModel):
self.user = get_user(user) self.user = get_user(user)
return self return self
def from_http( def from_http(self, request: HttpRequest, user: Optional[User] = None) -> "Event":
self, request: HttpRequest, user: Optional[settings.AUTH_USER_MODEL] = None
) -> "Event":
"""Add data from a Django-HttpRequest, allowing the creation of """Add data from a Django-HttpRequest, allowing the creation of
Events independently from requests. Events independently from requests.
`user` arguments optionally overrides user from requests.""" `user` arguments optionally overrides user from requests."""

View file

@ -140,17 +140,19 @@ class BaseEvaluator:
def expr_event_create(self, action: str, **kwargs): def expr_event_create(self, action: str, **kwargs):
"""Create event with supplied data and try to extract as much relevant data """Create event with supplied data and try to extract as much relevant data
from the context""" from the context"""
context = self._context.copy()
# If the result was a complex variable, we don't want to re-use it # If the result was a complex variable, we don't want to re-use it
self._context.pop("result", None) context.pop("result", None)
self._context.pop("handler", None) context.pop("handler", None)
kwargs["context"] = self._context event_kwargs = context
event_kwargs.update(kwargs)
event = Event.new( event = Event.new(
action, action,
app=self._filename, app=self._filename,
**kwargs, **event_kwargs,
) )
if "request" in self._context and isinstance(self._context["request"], PolicyRequest): if "request" in context and isinstance(context["request"], PolicyRequest):
policy_request: PolicyRequest = self._context["request"] policy_request: PolicyRequest = context["request"]
if policy_request.http_request: if policy_request.http_request:
event.from_http(policy_request) event.from_http(policy_request)
return return

View file

@ -2,28 +2,41 @@
from django.test import TestCase from django.test import TestCase
from authentik.core.tests.utils import create_test_admin_user from authentik.core.tests.utils import create_test_admin_user
from authentik.events.models import Event
from authentik.lib.expression.evaluator import BaseEvaluator from authentik.lib.expression.evaluator import BaseEvaluator
from authentik.lib.generators import generate_id
class TestEvaluator(TestCase): class TestEvaluator(TestCase):
"""Test Evaluator base functions""" """Test Evaluator base functions"""
def test_regex_match(self): def test_expr_regex_match(self):
"""Test expr_regex_match""" """Test expr_regex_match"""
self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar")) self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar"))
self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo")) self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo"))
def test_regex_replace(self): def test_expr_regex_replace(self):
"""Test expr_regex_replace""" """Test expr_regex_replace"""
self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa") self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa")
def test_user_by(self): def test_expr_user_by(self):
"""Test expr_user_by""" """Test expr_user_by"""
user = create_test_admin_user() user = create_test_admin_user()
self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username)) self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username))
self.assertIsNone(BaseEvaluator.expr_user_by(username="bar")) self.assertIsNone(BaseEvaluator.expr_user_by(username="bar"))
self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar")) self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar"))
def test_is_group_member(self): def test_expr_is_group_member(self):
"""Test expr_is_group_member""" """Test expr_is_group_member"""
self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test")) self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test"))
def test_expr_event_create(self):
"""Test expr_event_create"""
evaluator = BaseEvaluator(generate_id())
evaluator._context = {
"foo": "bar",
}
evaluator.evaluate("ak_create_event('foo', bar='baz')")
event = Event.objects.filter(action="custom_foo").first()
self.assertIsNotNone(event)
self.assertEqual(event.context, {"bar": "baz", "foo": "bar"})