Merge pull request #683 from BeryJu/new-forms-part-3

New forms part 3
This commit is contained in:
Jens L 2021-04-03 14:05:20 +02:00 committed by GitHub
commit 6eff2fe0d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
240 changed files with 6150 additions and 3652 deletions

3
.github/codecov.yml vendored Normal file
View file

@ -0,0 +1,3 @@
coverage:
precision: 2
round: up

View file

@ -0,0 +1,31 @@
"""Meta API"""
from drf_yasg.utils import swagger_auto_schema
from rest_framework.fields import CharField
from rest_framework.permissions import IsAdminUser
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from authentik.core.api.utils import PassiveSerializer
from authentik.lib.utils.reflection import get_apps
class AppSerializer(PassiveSerializer):
"""Serialize Application info"""
name = CharField()
label = CharField()
class AppsViewSet(ViewSet):
"""Read-only view set list all installed apps"""
permission_classes = [IsAdminUser]
@swagger_auto_schema(responses={200: AppSerializer(many=True)})
def list(self, request: Request) -> Response:
"""List current messages and pass into Serializer"""
data = []
for app in get_apps():
data.append({"name": app.name, "label": app.verbose_name})
return Response(AppSerializer(data, many=True).data)

View file

@ -7,5 +7,4 @@ class AuthentikAdminConfig(AppConfig):
name = "authentik.admin" name = "authentik.admin"
label = "authentik_admin" label = "authentik_admin"
mountpoint = "administration/"
verbose_name = "authentik Admin" verbose_name = "authentik Admin"

View file

@ -1,107 +0,0 @@
"""Additional fields"""
import yaml
from django import forms
from django.utils.datastructures import MultiValueDict
from django.utils.translation import gettext_lazy as _
class ArrayFieldSelectMultiple(forms.SelectMultiple):
"""This is a Form Widget for use with a Postgres ArrayField. It implements
a multi-select interface that can be given a set of `choices`.
You can provide a `delimiter` keyword argument to specify the delimeter used.
https://gist.github.com/stephane/00e73c0002de52b1c601"""
def __init__(self, *args, **kwargs):
# Accept a `delimiter` argument, and grab it (defaulting to a comma)
self.delimiter = kwargs.pop("delimiter", ",")
super().__init__(*args, **kwargs)
def value_from_datadict(self, data, files, name):
if isinstance(data, MultiValueDict):
# Normally, we'd want a list here, which is what we get from the
# SelectMultiple superclass, but the SimpleArrayField expects to
# get a delimited string, so we're doing a little extra work.
return self.delimiter.join(data.getlist(name))
return data.get(name)
def get_context(self, name, value, attrs):
return super().get_context(name, value.split(self.delimiter), attrs)
class CodeMirrorWidget(forms.Textarea):
"""Custom Textarea-based Widget that triggers a CodeMirror editor"""
# CodeMirror mode to enable
mode: str
template_name = "fields/codemirror.html"
def __init__(self, *args, mode="yaml", **kwargs):
super().__init__(*args, **kwargs)
self.mode = mode
def render(self, *args, **kwargs):
attrs = kwargs.setdefault("attrs", {})
attrs["mode"] = self.mode
return super().render(*args, **kwargs)
class InvalidYAMLInput(str):
"""Invalid YAML String type"""
class YAMLString(str):
"""YAML String type"""
class YAMLField(forms.JSONField):
"""Django's JSON Field converted to YAML"""
default_error_messages = {
"invalid": _("'%(value)s' value must be valid YAML."),
}
widget = forms.Textarea
def to_python(self, value):
if self.disabled:
return value
if value in self.empty_values:
return None
if isinstance(value, (list, dict, int, float, YAMLString)):
return value
try:
converted = yaml.safe_load(value)
except yaml.YAMLError:
raise forms.ValidationError(
self.error_messages["invalid"],
code="invalid",
params={"value": value},
)
if isinstance(converted, str):
return YAMLString(converted)
if converted is None:
return {}
return converted
def bound_data(self, data, initial):
if self.disabled:
return initial
try:
return yaml.safe_load(data)
except yaml.YAMLError:
return InvalidYAMLInput(data)
def prepare_value(self, value):
if isinstance(value, InvalidYAMLInput):
return value
return yaml.dump(value, explicit_start=True, default_flow_style=False)
def has_changed(self, initial, data):
if super().has_changed(initial, data):
return True
# For purposes of seeing whether something has changed, True isn't the
# same as 1 and the order of keys doesn't matter.
data = self.to_python(data)
return yaml.dump(initial, sort_keys=True) != yaml.dump(data, sort_keys=True)

View file

@ -1 +0,0 @@
<ak-codemirror mode="{{ widget.attrs.mode }}"><textarea class="pf-c-form-control" name="{{ widget.name }}">{% if widget.value %}{{ widget.value }}{% endif %}</textarea></ak-codemirror>

View file

@ -1,18 +0,0 @@
{% extends base_template|default:"generic/form.html" %}
{% load authentik_utils %}
{% load i18n %}
{% block above_form %}
<h1>
{% blocktrans with type=form|form_verbose_name %}
Create {{ type }}
{% endblocktrans %}
</h1>
{% endblock %}
{% block action %}
{% blocktrans with type=form|form_verbose_name %}
Create {{ type }}
{% endblocktrans %}
{% endblock %}

View file

@ -1,38 +0,0 @@
{% load i18n %}
{% load authentik_utils %}
{% load static %}
{% block content %}
<section class="pf-c-page__main-section pf-m-light">
<div class="pf-c-content">
{% block above_form %}
{% endblock %}
</div>
</section>
<section class="pf-c-page__main-section">
<div class="pf-l-stack">
<div class="pf-l-stack__item">
<div class="pf-c-card">
<div class="pf-c-card__body">
<form id="main-form" action="" method="post" class="pf-c-form pf-m-horizontal" enctype="multipart/form-data">
{% include 'partials/form_horizontal.html' with form=form %}
{% block beneath_form %}
{% endblock %}
</form>
</div>
</div>
</div>
</div>
</section>
<footer class="pf-c-modal-box__footer">
<ak-spinner-button form="main-form">
{% block action %}{% endblock %}
</ak-spinner-button>&nbsp;
<a class="pf-c-button pf-m-secondary" href="#/">{% trans "Cancel" %}</a>
</footer>
{% endblock %}
{% block scripts %}
{{ block.super }}
{{ form.media.js }}
{% endblock %}

View file

@ -1,18 +0,0 @@
{% extends base_template|default:"generic/form.html" %}
{% load authentik_utils %}
{% load i18n %}
{% block above_form %}
<h1>
{% blocktrans with type=form|form_verbose_name|title inst=form.instance %}
Update {{ inst }}
{% endblocktrans %}
</h1>
{% endblock %}
{% block action %}
{% blocktrans with type=form|form_verbose_name %}
Update {{ type }}
{% endblocktrans %}
{% endblock %}

View file

@ -71,3 +71,8 @@ class TestAdminAPI(TestCase):
"""Test metrics API""" """Test metrics API"""
response = self.client.get(reverse("authentik_api:admin_metrics-list")) response = self.client.get(reverse("authentik_api:admin_metrics-list"))
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
def test_apps(self):
"""Test apps API"""
response = self.client.get(reverse("authentik_api:apps-list"))
self.assertEqual(response.status_code, 200)

View file

@ -1,66 +0,0 @@
"""admin tests"""
from importlib import import_module
from typing import Callable
from django.forms import ModelForm
from django.test import Client, TestCase
from django.urls import reverse
from django.urls.exceptions import NoReverseMatch
from authentik.admin.urls import urlpatterns
from authentik.core.models import Group, User
from authentik.lib.utils.reflection import get_apps
class TestAdmin(TestCase):
"""Generic admin tests"""
def setUp(self):
self.user = User.objects.create_user(username="test")
self.user.ak_groups.add(Group.objects.filter(is_superuser=True).first())
self.user.save()
self.client = Client()
self.client.force_login(self.user)
def generic_view_tester(view_name: str) -> Callable:
"""This is used instead of subTest for better visibility"""
def tester(self: TestAdmin):
try:
full_url = reverse(f"authentik_admin:{view_name}")
response = self.client.get(full_url)
self.assertTrue(response.status_code < 500)
except NoReverseMatch:
pass
return tester
for url in urlpatterns:
method_name = url.name.replace("-", "_")
setattr(TestAdmin, f"test_view_{method_name}", generic_view_tester(url.name))
def generic_form_tester(form: ModelForm) -> Callable:
"""Test a form"""
def tester(self: TestAdmin):
form_inst = form()
self.assertFalse(form_inst.is_valid())
return tester
# Load the forms module from every app, so we have all forms loaded
for app in get_apps():
module = app.__module__.replace(".apps", ".forms")
try:
import_module(module)
except ImportError:
pass
for form_class in ModelForm.__subclasses__():
setattr(
TestAdmin, f"test_form_{form_class.__name__}", generic_form_tester(form_class)
)

View file

@ -1,74 +0,0 @@
"""authentik URL Configuration"""
from django.urls import path
from authentik.admin.views import (
outposts_service_connections,
policies,
property_mappings,
providers,
sources,
stages,
)
from authentik.providers.saml.views.metadata import MetadataImportView
urlpatterns = [
# Sources
path("sources/create/", sources.SourceCreateView.as_view(), name="source-create"),
path(
"sources/<uuid:pk>/update/",
sources.SourceUpdateView.as_view(),
name="source-update",
),
# Policies
path("policies/create/", policies.PolicyCreateView.as_view(), name="policy-create"),
path(
"policies/<uuid:pk>/update/",
policies.PolicyUpdateView.as_view(),
name="policy-update",
),
# Providers
path(
"providers/create/",
providers.ProviderCreateView.as_view(),
name="provider-create",
),
path(
"providers/create/saml/from-metadata/",
MetadataImportView.as_view(),
name="provider-saml-from-metadata",
),
path(
"providers/<int:pk>/update/",
providers.ProviderUpdateView.as_view(),
name="provider-update",
),
# Stages
path("stages/create/", stages.StageCreateView.as_view(), name="stage-create"),
path(
"stages/<uuid:pk>/update/",
stages.StageUpdateView.as_view(),
name="stage-update",
),
# Property Mappings
path(
"property-mappings/create/",
property_mappings.PropertyMappingCreateView.as_view(),
name="property-mapping-create",
),
path(
"property-mappings/<uuid:pk>/update/",
property_mappings.PropertyMappingUpdateView.as_view(),
name="property-mapping-update",
),
# Outpost Service Connections
path(
"outpost_service_connections/create/",
outposts_service_connections.OutpostServiceConnectionCreateView.as_view(),
name="outpost-service-connection-create",
),
path(
"outpost_service_connections/<uuid:pk>/update/",
outposts_service_connections.OutpostServiceConnectionUpdateView.as_view(),
name="outpost-service-connection-update",
),
]

View file

@ -1,44 +0,0 @@
"""authentik OutpostServiceConnection administration"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.mixins import (
PermissionRequiredMixin as DjangoPermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.urls import reverse_lazy
from django.utils.translation import gettext as _
from guardian.mixins import PermissionRequiredMixin
from authentik.admin.views.utils import InheritanceCreateView, InheritanceUpdateView
from authentik.outposts.models import OutpostServiceConnection
class OutpostServiceConnectionCreateView(
SuccessMessageMixin,
LoginRequiredMixin,
DjangoPermissionRequiredMixin,
InheritanceCreateView,
):
"""Create new OutpostServiceConnection"""
model = OutpostServiceConnection
permission_required = "authentik_outposts.add_outpostserviceconnection"
template_name = "generic/create.html"
success_url = reverse_lazy("authentik_core:if-admin")
success_message = _("Successfully created Outpost Service Connection")
class OutpostServiceConnectionUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
PermissionRequiredMixin,
InheritanceUpdateView,
):
"""Update outpostserviceconnection"""
model = OutpostServiceConnection
permission_required = "authentik_outposts.change_outpostserviceconnection"
template_name = "generic/update.html"
success_url = reverse_lazy("authentik_core:if-admin")
success_message = _("Successfully updated Outpost Service Connection")

View file

@ -1,44 +0,0 @@
"""authentik Policy administration"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.mixins import (
PermissionRequiredMixin as DjangoPermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.urls import reverse_lazy
from django.utils.translation import gettext as _
from guardian.mixins import PermissionRequiredMixin
from authentik.admin.views.utils import InheritanceCreateView, InheritanceUpdateView
from authentik.policies.models import Policy
class PolicyCreateView(
SuccessMessageMixin,
LoginRequiredMixin,
DjangoPermissionRequiredMixin,
InheritanceCreateView,
):
"""Create new Policy"""
model = Policy
permission_required = "authentik_policies.add_policy"
template_name = "generic/create.html"
success_url = reverse_lazy("authentik_core:if-admin")
success_message = _("Successfully created Policy")
class PolicyUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
PermissionRequiredMixin,
InheritanceUpdateView,
):
"""Update policy"""
model = Policy
permission_required = "authentik_policies.change_policy"
template_name = "generic/update.html"
success_url = reverse_lazy("authentik_core:if-admin")
success_message = _("Successfully updated Policy")

View file

@ -1,41 +0,0 @@
"""authentik PropertyMapping administration"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.mixins import (
PermissionRequiredMixin as DjangoPermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.utils.translation import gettext as _
from guardian.mixins import PermissionRequiredMixin
from authentik.admin.views.utils import InheritanceCreateView, InheritanceUpdateView
from authentik.core.models import PropertyMapping
class PropertyMappingCreateView(
SuccessMessageMixin,
LoginRequiredMixin,
DjangoPermissionRequiredMixin,
InheritanceCreateView,
):
"""Create new PropertyMapping"""
model = PropertyMapping
permission_required = "authentik_core.add_propertymapping"
success_url = "/"
template_name = "generic/create.html"
success_message = _("Successfully created Property Mapping")
class PropertyMappingUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
PermissionRequiredMixin,
InheritanceUpdateView,
):
"""Update property_mapping"""
model = PropertyMapping
permission_required = "authentik_core.change_propertymapping"
success_url = "/"
template_name = "generic/update.html"
success_message = _("Successfully updated Property Mapping")

View file

@ -1,41 +0,0 @@
"""authentik Provider administration"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.mixins import (
PermissionRequiredMixin as DjangoPermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.utils.translation import gettext as _
from guardian.mixins import PermissionRequiredMixin
from authentik.admin.views.utils import InheritanceCreateView, InheritanceUpdateView
from authentik.core.models import Provider
class ProviderCreateView(
SuccessMessageMixin,
LoginRequiredMixin,
DjangoPermissionRequiredMixin,
InheritanceCreateView,
):
"""Create new Provider"""
model = Provider
permission_required = "authentik_core.add_provider"
success_url = "/"
template_name = "generic/create.html"
success_message = _("Successfully created Provider")
class ProviderUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
PermissionRequiredMixin,
InheritanceUpdateView,
):
"""Update provider"""
model = Provider
permission_required = "authentik_core.change_provider"
success_url = "/"
template_name = "generic/update.html"
success_message = _("Successfully updated Provider")

View file

@ -1,43 +0,0 @@
"""authentik Source administration"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.mixins import (
PermissionRequiredMixin as DjangoPermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.utils.translation import gettext as _
from guardian.mixins import PermissionRequiredMixin
from authentik.admin.views.utils import InheritanceCreateView, InheritanceUpdateView
from authentik.core.models import Source
class SourceCreateView(
SuccessMessageMixin,
LoginRequiredMixin,
DjangoPermissionRequiredMixin,
InheritanceCreateView,
):
"""Create new Source"""
model = Source
permission_required = "authentik_core.add_source"
success_url = "/"
template_name = "generic/create.html"
success_message = _("Successfully created Source")
class SourceUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
PermissionRequiredMixin,
InheritanceUpdateView,
):
"""Update source"""
model = Source
permission_required = "authentik_core.change_source"
success_url = "/"
template_name = "generic/update.html"
success_message = _("Successfully updated Source")

View file

@ -1,43 +0,0 @@
"""authentik Stage administration"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.mixins import (
PermissionRequiredMixin as DjangoPermissionRequiredMixin,
)
from django.contrib.messages.views import SuccessMessageMixin
from django.urls import reverse_lazy
from django.utils.translation import gettext as _
from guardian.mixins import PermissionRequiredMixin
from authentik.admin.views.utils import InheritanceCreateView, InheritanceUpdateView
from authentik.flows.models import Stage
class StageCreateView(
SuccessMessageMixin,
LoginRequiredMixin,
DjangoPermissionRequiredMixin,
InheritanceCreateView,
):
"""Create new Stage"""
model = Stage
template_name = "generic/create.html"
permission_required = "authentik_flows.add_stage"
success_url = reverse_lazy("authentik_core:if-admin")
success_message = _("Successfully created Stage")
class StageUpdateView(
SuccessMessageMixin,
LoginRequiredMixin,
PermissionRequiredMixin,
InheritanceUpdateView,
):
"""Update stage"""
model = Stage
permission_required = "authentik_flows.update_application"
template_name = "generic/update.html"
success_url = reverse_lazy("authentik_core:if-admin")
success_message = _("Successfully updated Stage")

View file

@ -1,50 +0,0 @@
"""authentik admin util views"""
from typing import Any
from django.http import Http404
from django.views.generic import UpdateView
from authentik.lib.utils.reflection import all_subclasses
from authentik.lib.views import CreateAssignPermView
class InheritanceCreateView(CreateAssignPermView):
"""CreateView for objects using InheritanceManager"""
def get_form_class(self):
provider_type = self.request.GET.get("type")
try:
model = next(
x for x in all_subclasses(self.model) if x.__name__ == provider_type
)
except StopIteration as exc:
raise Http404 from exc
return model().form
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
kwargs = super().get_context_data(**kwargs)
form_cls = self.get_form_class()
if hasattr(form_cls, "template_name"):
kwargs["base_template"] = form_cls.template_name
return kwargs
class InheritanceUpdateView(UpdateView):
"""UpdateView for objects using InheritanceManager"""
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
kwargs = super().get_context_data(**kwargs)
form_cls = self.get_form_class()
if hasattr(form_cls, "template_name"):
kwargs["base_template"] = form_cls.template_name
return kwargs
def get_form_class(self):
return self.get_object().form
def get_object(self, queryset=None):
return (
self.model.objects.filter(pk=self.kwargs.get("pk"))
.select_subclasses()
.first()
)

View file

@ -5,6 +5,7 @@ from drf_yasg.views import get_schema_view
from rest_framework import routers from rest_framework import routers
from rest_framework.permissions import AllowAny from rest_framework.permissions import AllowAny
from authentik.admin.api.meta import AppsViewSet
from authentik.admin.api.metrics import AdministrationMetricsViewSet from authentik.admin.api.metrics import AdministrationMetricsViewSet
from authentik.admin.api.tasks import TaskViewSet from authentik.admin.api.tasks import TaskViewSet
from authentik.admin.api.version import VersionViewSet from authentik.admin.api.version import VersionViewSet
@ -103,6 +104,7 @@ router.register("admin/version", VersionViewSet, basename="admin_version")
router.register("admin/workers", WorkerViewSet, basename="admin_workers") router.register("admin/workers", WorkerViewSet, basename="admin_workers")
router.register("admin/metrics", AdministrationMetricsViewSet, basename="admin_metrics") router.register("admin/metrics", AdministrationMetricsViewSet, basename="admin_metrics")
router.register("admin/system_tasks", TaskViewSet, basename="admin_system_tasks") router.register("admin/system_tasks", TaskViewSet, basename="admin_system_tasks")
router.register("admin/apps", AppsViewSet, basename="apps")
router.register("core/applications", ApplicationViewSet) router.register("core/applications", ApplicationViewSet)
router.register("core/groups", GroupViewSet) router.register("core/groups", GroupViewSet)

View file

@ -1,20 +0,0 @@
"""authentik core admin"""
from django.apps import AppConfig, apps
from django.contrib import admin
from django.contrib.admin.sites import AlreadyRegistered
from guardian.admin import GuardedModelAdmin
def admin_autoregister(app: AppConfig):
"""Automatically register all models from app"""
for model in app.get_models():
try:
admin.site.register(model, GuardedModelAdmin)
except AlreadyRegistered:
pass
for _app in apps.get_app_configs():
if _app.label.startswith("authentik_"):
admin_autoregister(_app)

View file

@ -124,7 +124,13 @@ class ApplicationViewSet(ModelViewSet):
], ],
responses={200: "Success"}, responses={200: "Success"},
) )
@action(detail=True, methods=["POST"], parser_classes=(MultiPartParser,)) @action(
detail=True,
pagination_class=None,
filter_backends=[],
methods=["POST"],
parser_classes=(MultiPartParser,),
)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def set_icon(self, request: Request, slug: str): def set_icon(self, request: Request, slug: str):
"""Set application icon""" """Set application icon"""
@ -140,7 +146,7 @@ class ApplicationViewSet(ModelViewSet):
"authentik_core.view_application", ["authentik_events.view_event"] "authentik_core.view_application", ["authentik_events.view_event"]
) )
@swagger_auto_schema(responses={200: CoordinateSerializer(many=True)}) @swagger_auto_schema(responses={200: CoordinateSerializer(many=True)})
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument # pylint: disable=unused-argument
def metrics(self, request: Request, slug: str): def metrics(self, request: Request, slug: str):
"""Metrics for application logins""" """Metrics for application logins"""

View file

@ -1,7 +1,6 @@
"""PropertyMapping API Views""" """PropertyMapping API Views"""
from json import dumps from json import dumps
from django.urls import reverse
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from guardian.shortcuts import get_objects_for_user from guardian.shortcuts import get_objects_for_user
from rest_framework import mixins from rest_framework import mixins
@ -19,9 +18,10 @@ from authentik.core.api.utils import (
PassiveSerializer, PassiveSerializer,
TypeCreateSerializer, TypeCreateSerializer,
) )
from authentik.core.expression import PropertyMappingEvaluator
from authentik.core.models import PropertyMapping from authentik.core.models import PropertyMapping
from authentik.lib.templatetags.authentik_utils import verbose_name
from authentik.lib.utils.reflection import all_subclasses from authentik.lib.utils.reflection import all_subclasses
from authentik.managed.api import ManagedSerializer
from authentik.policies.api.exec import PolicyTestSerializer from authentik.policies.api.exec import PolicyTestSerializer
@ -32,29 +32,30 @@ class PropertyMappingTestResultSerializer(PassiveSerializer):
successful = BooleanField(read_only=True) successful = BooleanField(read_only=True)
class PropertyMappingSerializer(ModelSerializer, MetaNameSerializer): class PropertyMappingSerializer(ManagedSerializer, ModelSerializer, MetaNameSerializer):
"""PropertyMapping Serializer""" """PropertyMapping Serializer"""
object_type = SerializerMethodField(method_name="get_type") component = SerializerMethodField()
def get_type(self, obj): def get_component(self, obj: PropertyMapping) -> str:
"""Get object type so that we know which API Endpoint to use to get the full object""" """Get object's component so that we know how to edit the object"""
return obj._meta.object_name.lower().replace("propertymapping", "") return obj.component
def to_representation(self, instance: PropertyMapping): def validate_expression(self, expression: str) -> str:
# pyright: reportGeneralTypeIssues=false """Test Syntax"""
if instance.__class__ == PropertyMapping: evaluator = PropertyMappingEvaluator()
return super().to_representation(instance) evaluator.validate(expression)
return instance.serializer(instance=instance).data return expression
class Meta: class Meta:
model = PropertyMapping model = PropertyMapping
fields = [ fields = [
"pk", "pk",
"managed",
"name", "name",
"expression", "expression",
"object_type", "component",
"verbose_name", "verbose_name",
"verbose_name_plural", "verbose_name_plural",
] ]
@ -80,17 +81,17 @@ class PropertyMappingViewSet(
return PropertyMapping.objects.select_subclasses() return PropertyMapping.objects.select_subclasses()
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)}) @swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def types(self, request: Request) -> Response: def types(self, request: Request) -> Response:
"""Get all creatable property-mapping types""" """Get all creatable property-mapping types"""
data = [] data = []
for subclass in all_subclasses(self.queryset.model): for subclass in all_subclasses(self.queryset.model):
subclass: PropertyMapping
data.append( data.append(
{ {
"name": verbose_name(subclass), "name": subclass._meta.verbose_name,
"description": subclass.__doc__, "description": subclass.__doc__,
"link": reverse("authentik_admin:property-mapping-create") "component": subclass.component,
+ f"?type={subclass.__name__}",
} }
) )
return Response(TypeCreateSerializer(data, many=True).data) return Response(TypeCreateSerializer(data, many=True).data)
@ -100,7 +101,7 @@ class PropertyMappingViewSet(
request_body=PolicyTestSerializer(), request_body=PolicyTestSerializer(),
responses={200: PropertyMappingTestResultSerializer}, responses={200: PropertyMappingTestResultSerializer},
) )
@action(detail=True, methods=["POST"]) @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
# pylint: disable=unused-argument, invalid-name # pylint: disable=unused-argument, invalid-name
def test(self, request: Request, pk: str) -> Response: def test(self, request: Request, pk: str) -> Response:
"""Test Property Mapping""" """Test Property Mapping"""
@ -116,7 +117,7 @@ class PropertyMappingViewSet(
if not users.exists(): if not users.exists():
raise PermissionDenied() raise PermissionDenied()
response_data = {"successful": True} response_data = {"successful": True, "result": ""}
try: try:
result = mapping.evaluate( result = mapping.evaluate(
users.first(), users.first(),

View file

@ -1,5 +1,4 @@
"""Provider API Views""" """Provider API Views"""
from django.urls import reverse
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from rest_framework import mixins from rest_framework import mixins
@ -12,7 +11,6 @@ from rest_framework.viewsets import GenericViewSet
from authentik.core.api.utils import MetaNameSerializer, TypeCreateSerializer from authentik.core.api.utils import MetaNameSerializer, TypeCreateSerializer
from authentik.core.models import Provider from authentik.core.models import Provider
from authentik.lib.templatetags.authentik_utils import verbose_name
from authentik.lib.utils.reflection import all_subclasses from authentik.lib.utils.reflection import all_subclasses
@ -22,11 +20,14 @@ class ProviderSerializer(ModelSerializer, MetaNameSerializer):
assigned_application_slug = ReadOnlyField(source="application.slug") assigned_application_slug = ReadOnlyField(source="application.slug")
assigned_application_name = ReadOnlyField(source="application.name") assigned_application_name = ReadOnlyField(source="application.name")
object_type = SerializerMethodField() component = SerializerMethodField()
def get_object_type(self, obj): def get_component(self, obj: Provider): # pragma: no cover
"""Get object type so that we know which API Endpoint to use to get the full object""" """Get object component so that we know how to edit the object"""
return obj._meta.object_name.lower().replace("provider", "") # pyright: reportGeneralTypeIssues=false
if obj.__class__ == Provider:
return ""
return obj.component
class Meta: class Meta:
@ -34,10 +35,9 @@ class ProviderSerializer(ModelSerializer, MetaNameSerializer):
fields = [ fields = [
"pk", "pk",
"name", "name",
"application",
"authorization_flow", "authorization_flow",
"property_mappings", "property_mappings",
"object_type", "component",
"assigned_application_slug", "assigned_application_slug",
"assigned_application_name", "assigned_application_name",
"verbose_name", "verbose_name",
@ -67,24 +67,24 @@ class ProviderViewSet(
return Provider.objects.select_subclasses() return Provider.objects.select_subclasses()
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)}) @swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def types(self, request: Request) -> Response: def types(self, request: Request) -> Response:
"""Get all creatable provider types""" """Get all creatable provider types"""
data = [] data = []
for subclass in all_subclasses(self.queryset.model): for subclass in all_subclasses(self.queryset.model):
subclass: Provider
data.append( data.append(
{ {
"name": verbose_name(subclass), "name": subclass._meta.verbose_name,
"description": subclass.__doc__, "description": subclass.__doc__,
"link": reverse("authentik_admin:provider-create") "component": subclass().component,
+ f"?type={subclass.__name__}",
} }
) )
data.append( data.append(
{ {
"name": _("SAML Provider from Metadata"), "name": _("SAML Provider from Metadata"),
"description": _("Create a SAML Provider by importing its Metadata."), "description": _("Create a SAML Provider by importing its Metadata."),
"link": reverse("authentik_admin:provider-saml-from-metadata"), "component": "ak-provider-saml-import-form",
} }
) )
return Response(TypeCreateSerializer(data, many=True).data) return Response(TypeCreateSerializer(data, many=True).data)

View file

@ -1,7 +1,6 @@
"""Source API Views""" """Source API Views"""
from typing import Iterable from typing import Iterable
from django.urls import reverse
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from rest_framework import mixins from rest_framework import mixins
from rest_framework.decorators import action from rest_framework.decorators import action
@ -14,7 +13,6 @@ from structlog.stdlib import get_logger
from authentik.core.api.utils import MetaNameSerializer, TypeCreateSerializer from authentik.core.api.utils import MetaNameSerializer, TypeCreateSerializer
from authentik.core.models import Source from authentik.core.models import Source
from authentik.core.types import UserSettingSerializer from authentik.core.types import UserSettingSerializer
from authentik.lib.templatetags.authentik_utils import verbose_name
from authentik.lib.utils.reflection import all_subclasses from authentik.lib.utils.reflection import all_subclasses
from authentik.policies.engine import PolicyEngine from authentik.policies.engine import PolicyEngine
@ -24,11 +22,11 @@ LOGGER = get_logger()
class SourceSerializer(ModelSerializer, MetaNameSerializer): class SourceSerializer(ModelSerializer, MetaNameSerializer):
"""Source Serializer""" """Source Serializer"""
object_type = SerializerMethodField() component = SerializerMethodField()
def get_object_type(self, obj): def get_component(self, obj: Source):
"""Get object type so that we know which API Endpoint to use to get the full object""" """Get object component so that we know how to edit the object"""
return obj._meta.object_name.lower().replace("source", "") return obj.component
class Meta: class Meta:
@ -40,7 +38,7 @@ class SourceSerializer(ModelSerializer, MetaNameSerializer):
"enabled", "enabled",
"authentication_flow", "authentication_flow",
"enrollment_flow", "enrollment_flow",
"object_type", "component",
"verbose_name", "verbose_name",
"verbose_name_plural", "verbose_name_plural",
"policy_engine_mode", "policy_engine_mode",
@ -63,23 +61,24 @@ class SourceViewSet(
return Source.objects.select_subclasses() return Source.objects.select_subclasses()
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)}) @swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def types(self, request: Request) -> Response: def types(self, request: Request) -> Response:
"""Get all creatable source types""" """Get all creatable source types"""
data = [] data = []
for subclass in all_subclasses(self.queryset.model): for subclass in all_subclasses(self.queryset.model):
subclass: Source
# pyright: reportGeneralTypeIssues=false
data.append( data.append(
{ {
"name": verbose_name(subclass), "name": subclass._meta.verbose_name,
"description": subclass.__doc__, "description": subclass.__doc__,
"link": reverse("authentik_admin:source-create") "component": subclass().component,
+ f"?type={subclass.__name__}",
} }
) )
return Response(TypeCreateSerializer(data, many=True).data) return Response(TypeCreateSerializer(data, many=True).data)
@swagger_auto_schema(responses={200: UserSettingSerializer(many=True)}) @swagger_auto_schema(responses={200: UserSettingSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def user_settings(self, request: Request) -> Response: def user_settings(self, request: Request) -> Response:
"""Get all sources the user can configure""" """Get all sources the user can configure"""
_all_sources: Iterable[Source] = Source.objects.filter( _all_sources: Iterable[Source] = Source.objects.filter(

View file

@ -13,9 +13,10 @@ from authentik.core.api.users import UserSerializer
from authentik.core.api.utils import PassiveSerializer from authentik.core.api.utils import PassiveSerializer
from authentik.core.models import Token from authentik.core.models import Token
from authentik.events.models import Event, EventAction from authentik.events.models import Event, EventAction
from authentik.managed.api import ManagedSerializer
class TokenSerializer(ModelSerializer): class TokenSerializer(ManagedSerializer, ModelSerializer):
"""Token Serializer""" """Token Serializer"""
user = UserSerializer(required=False) user = UserSerializer(required=False)
@ -25,6 +26,7 @@ class TokenSerializer(ModelSerializer):
model = Token model = Token
fields = [ fields = [
"pk", "pk",
"managed",
"identifier", "identifier",
"intent", "intent",
"user", "user",
@ -66,7 +68,7 @@ class TokenViewSet(ModelViewSet):
@permission_required("authentik_core.view_token_key") @permission_required("authentik_core.view_token_key")
@swagger_auto_schema(responses={200: TokenViewSerializer(many=False)}) @swagger_auto_schema(responses={200: TokenViewSerializer(many=False)})
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument # pylint: disable=unused-argument
def view_key(self, request: Request, identifier: str) -> Response: def view_key(self, request: Request, identifier: str) -> Response:
"""Return token key and log access""" """Return token key and log access"""

View file

@ -93,7 +93,7 @@ class UserViewSet(ModelViewSet):
return User.objects.all().exclude(pk=get_anonymous_user().pk) return User.objects.all().exclude(pk=get_anonymous_user().pk)
@swagger_auto_schema(responses={200: SessionUserSerializer(many=False)}) @swagger_auto_schema(responses={200: SessionUserSerializer(many=False)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name # pylint: disable=invalid-name
def me(self, request: Request) -> Response: def me(self, request: Request) -> Response:
"""Get information about current user""" """Get information about current user"""
@ -109,7 +109,7 @@ class UserViewSet(ModelViewSet):
@permission_required("authentik_core.view_user", ["authentik_events.view_event"]) @permission_required("authentik_core.view_user", ["authentik_events.view_event"])
@swagger_auto_schema(responses={200: UserMetricsSerializer(many=False)}) @swagger_auto_schema(responses={200: UserMetricsSerializer(many=False)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def metrics(self, request: Request) -> Response: def metrics(self, request: Request) -> Response:
"""User metrics per 1h""" """User metrics per 1h"""
serializer = UserMetricsSerializer(True) serializer = UserMetricsSerializer(True)
@ -120,7 +120,7 @@ class UserViewSet(ModelViewSet):
@swagger_auto_schema( @swagger_auto_schema(
responses={"200": LinkSerializer(many=False)}, responses={"200": LinkSerializer(many=False)},
) )
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument # pylint: disable=invalid-name, unused-argument
def recovery(self, request: Request, pk: int) -> Response: def recovery(self, request: Request, pk: int) -> Response:
"""Create a temporary link that a user can use to recover their accounts""" """Create a temporary link that a user can use to recover their accounts"""

View file

@ -34,7 +34,7 @@ class TypeCreateSerializer(PassiveSerializer):
name = CharField(required=True) name = CharField(required=True)
description = CharField(required=True) description = CharField(required=True)
link = CharField(required=True) component = CharField(required=True)
class CacheSerializer(PassiveSerializer): class CacheSerializer(PassiveSerializer):

View file

@ -10,7 +10,6 @@ from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import UserManager as DjangoUserManager from django.contrib.auth.models import UserManager as DjangoUserManager
from django.db import models from django.db import models
from django.db.models import Q, QuerySet from django.db.models import Q, QuerySet
from django.forms import ModelForm
from django.http import HttpRequest from django.http import HttpRequest
from django.templatetags.static import static from django.templatetags.static import static
from django.utils.functional import cached_property from django.utils.functional import cached_property
@ -188,8 +187,8 @@ class Provider(SerializerModel):
return None return None
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
"""Return Form class used to edit this object""" """Return component used to edit this object"""
raise NotImplementedError raise NotImplementedError
@property @property
@ -276,8 +275,8 @@ class Source(SerializerModel, PolicyBindingModel):
objects = InheritanceManager() objects = InheritanceManager()
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
"""Return Form class used to edit this object""" """Return component used to edit this object"""
raise NotImplementedError raise NotImplementedError
@property @property
@ -382,8 +381,8 @@ class PropertyMapping(SerializerModel, ManagedModel):
objects = InheritanceManager() objects = InheritanceManager()
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
"""Return Form class used to edit this object""" """Return component used to edit this object"""
raise NotImplementedError raise NotImplementedError
@property @property

View file

@ -1,6 +1,5 @@
{% load static %} {% load static %}
{% load i18n %} {% load i18n %}
{% load authentik_utils %}
<!DOCTYPE html> <!DOCTYPE html>

View file

@ -1,7 +1,6 @@
{% extends 'base/skeleton.html' %} {% extends 'base/skeleton.html' %}
{% load i18n %} {% load i18n %}
{% load authentik_utils %}
{% block head %} {% block head %}
{{ block.super }} {{ block.super }}

View file

@ -2,7 +2,6 @@
{% load static %} {% load static %}
{% load i18n %} {% load i18n %}
{% load authentik_utils %}
{% block body %} {% block body %}
<div class="pf-c-background-image"> <div class="pf-c-background-image">

View file

@ -1,115 +0,0 @@
{% load authentik_utils %}
{% load i18n %}
{% csrf_token %}
{% for field in form %}
{% if field.field.widget|fieldtype == 'HiddenInput' %}
{{ field }}
{% else %}
<div class="pf-c-form__group {% if field.errors %} has-error {% endif %}">
{% if field.field.widget|fieldtype == 'RadioSelect' %}
<div class="pf-c-form__group-label">
<label class="pf-c-form__label" for="{{ field.name }}-{{ forloop.counter0 }}">
<span class="pf-c-form__label-text">{{ field.label }}</span>
{% if field.field.required %}
<span class="pf-c-form__label-required" aria-hidden="true">&#42;</span>
{% endif %}
</label>
</div>
<div class="pf-c-form__group-control">
{% for c in field %}
<div class="pf-c-radio">
<input class="pf-c-radio__input"
type="radio" id="{{ field.name }}-{{ forloop.counter0 }}"
name="{% if wizard %}{{ wizard.steps.current }}-{% endif %}{{ field.name }}"
value="{{ c.data.value }}"
{% if c.data.selected %} checked {% endif %}/>
<label class="pf-c-radio__label" for="{{ field.name }}-{{ forloop.counter0 }}">{{ c.choice_label }}</label>
</div>
{% endfor %}
{% if field.help_text %}
<p class="pf-c-form__helper-text">{{ field.help_text }}</p>
{% endif %}
</div>
{% elif field.field.widget|fieldtype == 'Select' or field.field.widget|fieldtype == "SelectMultiple" %}
<div class="pf-c-form__group-label">
<label class="pf-c-form__label" for="{{ field.name }}-{{ forloop.counter0 }}">
<span class="pf-c-form__label-text">{{ field.label }}</span>
{% if field.field.required %}
<span class="pf-c-form__label-required" aria-hidden="true">&#42;</span>
{% endif %}
</label>
</div>
<div class="pf-c-form__group-control">
<div class="pf-c-form__horizontal-group">
{{ field|css_class:"pf-c-form-control" }}
{% if field.help_text %}
<p class="pf-c-form__helper-text">{{ field.help_text|safe }}</p>
{% endif %}
{% if field.field.widget|fieldtype == 'SelectMultiple' %}
<p class="pf-c-form__helper-text">{% trans 'Hold control/command to select multiple items.' %}</p>
{% endif %}
</div>
</div>
{% elif field.field.widget|fieldtype == 'CheckboxInput' %}
<div class="pf-c-form__group-control">
<div class="pf-c-form__horizontal-group">
<div class="pf-c-check">
{{ field|css_class:"pf-c-check__input" }}
<label class="pf-c-check__label" for="{{ field.name }}-{{ forloop.counter0 }}">{{ field.label }}</label>
</div>
{% if field.help_text %}
<p class="pf-c-form__helper-text">{{ field.help_text|safe }}</p>
{% endif %}
</div>
</div>
{% elif field.field.widget|fieldtype == "FileInput" %}
<div class="pf-c-form__group-label">
<label class="pf-c-form__label" for="{{ field.name }}-{{ forloop.counter0 }}">
<span class="pf-c-form__label-text">{{ field.label }}</span>
{% if field.field.required %}
<span class="pf-c-form__label-required" aria-hidden="true">&#42;</span>
{% endif %}
</label>
</div>
<div class="pf-c-form__group-control">
<div class="c-form__horizontal-group">
{{ field|css_class:"pf-c-form-control" }}
{% if field.help_text %}
<p class="pf-c-form__helper-text">{{ field.help_text|safe }}</p>
{% endif %}
{% if field.value %}
<a target="_blank" href="{{ field.value.url }}" class="pf-c-form__helper-text">
{% blocktrans with current=field.value %}
Currently set to {{current}}.
{% endblocktrans %}
</a>
{% endif %}
</div>
</div>
{% else %}
<div class="pf-c-form__group-label">
<label class="pf-c-form__label" for="{{ field.name }}-{{ forloop.counter0 }}">
<span class="pf-c-form__label-text">{{ field.label }}</span>
{% if field.field.required %}
<span class="pf-c-form__label-required" aria-hidden="true">&#42;</span>
{% endif %}
</label>
</div>
<div class="pf-c-form__group-control">
<div class="c-form__horizontal-group">
{{ field|css_class:'pf-c-form-control' }}
{% if field.help_text %}
<p class="pf-c-form__helper-text">{{ field.help_text|safe }}</p>
{% endif %}
</div>
</div>
{% endif %}
{% for error in field.errors %}
<p class="pf-c-form__helper-text pf-m-error">
{{ error }}
</p>
{% endfor %}
</div>
{% endif %}
{% endfor %}

View file

@ -2,8 +2,10 @@
from json import dumps from json import dumps
from django.urls import reverse from django.urls import reverse
from rest_framework.serializers import ValidationError
from rest_framework.test import APITestCase from rest_framework.test import APITestCase
from authentik.core.api.propertymappings import PropertyMappingSerializer
from authentik.core.models import PropertyMapping, User from authentik.core.models import PropertyMapping, User
@ -19,7 +21,7 @@ class TestPropertyMappingAPI(APITestCase):
self.client.force_login(self.user) self.client.force_login(self.user)
def test_test_call(self): def test_test_call(self):
"""Test Policy's test endpoint""" """Test PropertMappings's test endpoint"""
response = self.client.post( response = self.client.post(
reverse( reverse(
"authentik_api:propertymapping-test", kwargs={"pk": self.mapping.pk} "authentik_api:propertymapping-test", kwargs={"pk": self.mapping.pk}
@ -32,3 +34,19 @@ class TestPropertyMappingAPI(APITestCase):
response.content.decode(), response.content.decode(),
{"result": dumps({"foo": "bar"}), "successful": True}, {"result": dumps({"foo": "bar"}), "successful": True},
) )
def test_validate(self):
"""Test PropertyMappings's validation"""
# Because the root property-mapping has no write operation, we just instantiate
# a serializer and test inline
expr = "return True"
self.assertEqual(PropertyMappingSerializer().validate_expression(expr), expr)
with self.assertRaises(ValidationError):
print(PropertyMappingSerializer().validate_expression("/"))
def test_types(self):
"""Test PropertyMappigns's types endpoint"""
response = self.client.get(
reverse("authentik_api:propertymapping-types"),
)
self.assertEqual(response.status_code, 200)

View file

@ -0,0 +1,24 @@
"""Test providers API"""
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import PropertyMapping, User
class TestProvidersAPI(APITestCase):
"""Test providers API"""
def setUp(self) -> None:
super().setUp()
self.mapping = PropertyMapping.objects.create(
name="dummy", expression="""return {'foo': 'bar'}"""
)
self.user = User.objects.get(username="akadmin")
self.client.force_login(self.user)
def test_types(self):
"""Test Providers's types endpoint"""
response = self.client.get(
reverse("authentik_api:provider-types"),
)
self.assertEqual(response.status_code, 200)

View file

@ -1,4 +1,5 @@
"""Crypto API Views""" """Crypto API Views"""
import django_filters
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509 import load_pem_x509_certificate from cryptography.x509 import load_pem_x509_certificate
@ -95,11 +96,29 @@ class CertificateGenerationSerializer(PassiveSerializer):
validity_days = IntegerField(initial=365) validity_days = IntegerField(initial=365)
class CertificateKeyPairFilter(django_filters.FilterSet):
"""Filter for certificates"""
has_key = django_filters.BooleanFilter(
label="Only return certificate-key pairs with keys", method="filter_has_key"
)
# pylint: disable=unused-argument
def filter_has_key(self, queryset, name, value): # pragma: no cover
"""Only return certificate-key pairs with keys"""
return queryset.exclude(key_data__exact="")
class Meta:
model = CertificateKeyPair
fields = ["name"]
class CertificateKeyPairViewSet(ModelViewSet): class CertificateKeyPairViewSet(ModelViewSet):
"""CertificateKeyPair Viewset""" """CertificateKeyPair Viewset"""
queryset = CertificateKeyPair.objects.all() queryset = CertificateKeyPair.objects.all()
serializer_class = CertificateKeyPairSerializer serializer_class = CertificateKeyPairSerializer
filterset_class = CertificateKeyPairFilter
@permission_required(None, ["authentik_crypto.add_certificatekeypair"]) @permission_required(None, ["authentik_crypto.add_certificatekeypair"])
@swagger_auto_schema( @swagger_auto_schema(
@ -125,7 +144,7 @@ class CertificateKeyPairViewSet(ModelViewSet):
return Response(serializer.data) return Response(serializer.data)
@swagger_auto_schema(responses={200: CertificateDataSerializer(many=False)}) @swagger_auto_schema(responses={200: CertificateDataSerializer(many=False)})
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument # pylint: disable=invalid-name, unused-argument
def view_certificate(self, request: Request, pk: str) -> Response: def view_certificate(self, request: Request, pk: str) -> Response:
"""Return certificate-key pairs certificate and log access""" """Return certificate-key pairs certificate and log access"""
@ -140,7 +159,7 @@ class CertificateKeyPairViewSet(ModelViewSet):
) )
@swagger_auto_schema(responses={200: CertificateDataSerializer(many=False)}) @swagger_auto_schema(responses={200: CertificateDataSerializer(many=False)})
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument # pylint: disable=invalid-name, unused-argument
def view_private_key(self, request: Request, pk: str) -> Response: def view_private_key(self, request: Request, pk: str) -> Response:
"""Return certificate-key pairs private key and log access""" """Return certificate-key pairs private key and log access"""

View file

@ -11,6 +11,7 @@ from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer, Serializer from rest_framework.serializers import ModelSerializer, Serializer
from rest_framework.viewsets import ReadOnlyModelViewSet from rest_framework.viewsets import ReadOnlyModelViewSet
from authentik.core.api.utils import TypeCreateSerializer
from authentik.events.models import Event, EventAction from authentik.events.models import Event, EventAction
@ -144,3 +145,18 @@ class EventViewSet(ReadOnlyModelViewSet):
.values("unique_users", "application", "counted_events") .values("unique_users", "application", "counted_events")
.order_by("-counted_events")[:top_n] .order_by("-counted_events")[:top_n]
) )
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False, pagination_class=None, filter_backends=[])
def actions(self, request: Request) -> Response:
"""Get all actions"""
data = []
for value, name in EventAction.choices:
data.append(
{
"name": name,
"description": "",
"component": value,
}
)
return Response(TypeCreateSerializer(data, many=True).data)

View file

@ -63,7 +63,7 @@ class NotificationTransportViewSet(ModelViewSet):
responses={200: NotificationTransportTestSerializer(many=False)}, responses={200: NotificationTransportTestSerializer(many=False)},
request_body=no_body, request_body=no_body,
) )
@action(detail=True, methods=["post"]) @action(detail=True, pagination_class=None, filter_backends=[], methods=["post"])
# pylint: disable=invalid-name, unused-argument # pylint: disable=invalid-name, unused-argument
def test(self, request: Request, pk=None) -> Response: def test(self, request: Request, pk=None) -> Response:
"""Send example notification using selected transport. Requires """Send example notification using selected transport. Requires

View file

@ -10,11 +10,12 @@ from authentik.events.models import Event, EventAction
class TestEventsAPI(APITestCase): class TestEventsAPI(APITestCase):
"""Test Event API""" """Test Event API"""
def test_top_n(self): def setUp(self) -> None:
"""Test top_per_user"""
user = User.objects.get(username="akadmin") user = User.objects.get(username="akadmin")
self.client.force_login(user) self.client.force_login(user)
def test_top_n(self):
"""Test top_per_user"""
event = Event.new(EventAction.AUTHORIZE_APPLICATION) event = Event.new(EventAction.AUTHORIZE_APPLICATION)
event.save() # We save to ensure nothing is un-saveable event.save() # We save to ensure nothing is un-saveable
response = self.client.get( response = self.client.get(
@ -22,3 +23,10 @@ class TestEventsAPI(APITestCase):
data={"filter_action": EventAction.AUTHORIZE_APPLICATION}, data={"filter_action": EventAction.AUTHORIZE_APPLICATION},
) )
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
def test_actions(self):
"""Test actions"""
response = self.client.get(
reverse("authentik_api:event-actions"),
)
self.assertEqual(response.status_code, 200)

View file

@ -98,7 +98,7 @@ class FlowViewSet(ModelViewSet):
@permission_required(None, ["authentik_flows.view_flow_cache"]) @permission_required(None, ["authentik_flows.view_flow_cache"])
@swagger_auto_schema(responses={200: CacheSerializer(many=False)}) @swagger_auto_schema(responses={200: CacheSerializer(many=False)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def cache_info(self, request: Request) -> Response: def cache_info(self, request: Request) -> Response:
"""Info about cached flows""" """Info about cached flows"""
return Response(data={"count": len(cache.keys("flow_*"))}) return Response(data={"count": len(cache.keys("flow_*"))})
@ -178,7 +178,7 @@ class FlowViewSet(ModelViewSet):
), ),
}, },
) )
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument # pylint: disable=unused-argument
def export(self, request: Request, slug: str) -> Response: def export(self, request: Request, slug: str) -> Response:
"""Export flow to .akflow file""" """Export flow to .akflow file"""
@ -189,7 +189,7 @@ class FlowViewSet(ModelViewSet):
return response return response
@swagger_auto_schema(responses={200: FlowDiagramSerializer()}) @swagger_auto_schema(responses={200: FlowDiagramSerializer()})
@action(detail=True, methods=["get"]) @action(detail=True, pagination_class=None, filter_backends=[], methods=["get"])
# pylint: disable=unused-argument # pylint: disable=unused-argument
def diagram(self, request: Request, slug: str) -> Response: def diagram(self, request: Request, slug: str) -> Response:
"""Return diagram for flow with slug `slug`, in the format used by flowchart.js""" """Return diagram for flow with slug `slug`, in the format used by flowchart.js"""
@ -270,7 +270,13 @@ class FlowViewSet(ModelViewSet):
], ],
responses={200: "Success"}, responses={200: "Success"},
) )
@action(detail=True, methods=["POST"], parser_classes=(MultiPartParser,)) @action(
detail=True,
pagination_class=None,
filter_backends=[],
methods=["POST"],
parser_classes=(MultiPartParser,),
)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def set_background(self, request: Request, slug: str): def set_background(self, request: Request, slug: str):
"""Set Flow background""" """Set Flow background"""
@ -285,7 +291,7 @@ class FlowViewSet(ModelViewSet):
@swagger_auto_schema( @swagger_auto_schema(
responses={200: LinkSerializer(many=False)}, responses={200: LinkSerializer(many=False)},
) )
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument # pylint: disable=unused-argument
def execute(self, request: Request, slug: str): def execute(self, request: Request, slug: str):
"""Execute flow for current user""" """Execute flow for current user"""

View file

@ -1,7 +1,6 @@
"""Flow Stage API Views""" """Flow Stage API Views"""
from typing import Iterable from typing import Iterable
from django.urls import reverse
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from rest_framework import mixins from rest_framework import mixins
from rest_framework.decorators import action from rest_framework.decorators import action
@ -15,7 +14,6 @@ from authentik.core.api.utils import MetaNameSerializer, TypeCreateSerializer
from authentik.core.types import UserSettingSerializer from authentik.core.types import UserSettingSerializer
from authentik.flows.api.flows import FlowSerializer from authentik.flows.api.flows import FlowSerializer
from authentik.flows.models import Stage from authentik.flows.models import Stage
from authentik.lib.templatetags.authentik_utils import verbose_name
from authentik.lib.utils.reflection import all_subclasses from authentik.lib.utils.reflection import all_subclasses
LOGGER = get_logger() LOGGER = get_logger()
@ -24,12 +22,15 @@ LOGGER = get_logger()
class StageSerializer(ModelSerializer, MetaNameSerializer): class StageSerializer(ModelSerializer, MetaNameSerializer):
"""Stage Serializer""" """Stage Serializer"""
object_type = SerializerMethodField() component = SerializerMethodField()
flow_set = FlowSerializer(many=True, required=False) flow_set = FlowSerializer(many=True, required=False)
def get_object_type(self, obj: Stage) -> str: def get_component(self, obj: Stage) -> str:
"""Get object type so that we know which API Endpoint to use to get the full object""" """Get object type so that we know how to edit the object"""
return obj._meta.object_name.lower().replace("stage", "") # pyright: reportGeneralTypeIssues=false
if obj.__class__ == Stage:
return ""
return obj.component
class Meta: class Meta:
@ -37,7 +38,7 @@ class StageSerializer(ModelSerializer, MetaNameSerializer):
fields = [ fields = [
"pk", "pk",
"name", "name",
"object_type", "component",
"verbose_name", "verbose_name",
"verbose_name_plural", "verbose_name_plural",
"flow_set", "flow_set",
@ -61,24 +62,24 @@ class StageViewSet(
return Stage.objects.select_subclasses() return Stage.objects.select_subclasses()
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)}) @swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def types(self, request: Request) -> Response: def types(self, request: Request) -> Response:
"""Get all creatable stage types""" """Get all creatable stage types"""
data = [] data = []
for subclass in all_subclasses(self.queryset.model, False): for subclass in all_subclasses(self.queryset.model, False):
subclass: Stage
data.append( data.append(
{ {
"name": verbose_name(subclass), "name": subclass._meta.verbose_name,
"description": subclass.__doc__, "description": subclass.__doc__,
"link": reverse("authentik_admin:stage-create") "component": subclass().component,
+ f"?type={subclass.__name__}",
} }
) )
data = sorted(data, key=lambda x: x["name"]) data = sorted(data, key=lambda x: x["name"])
return Response(TypeCreateSerializer(data, many=True).data) return Response(TypeCreateSerializer(data, many=True).data)
@swagger_auto_schema(responses={200: UserSettingSerializer(many=True)}) @swagger_auto_schema(responses={200: UserSettingSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def user_settings(self, request: Request) -> Response: def user_settings(self, request: Request) -> Response:
"""Get all stages the user can configure""" """Get all stages the user can configure"""
_all_stages: Iterable[Stage] = Stage.objects.all().select_subclasses() _all_stages: Iterable[Stage] = Stage.objects.all().select_subclasses()

View file

@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, Optional, Type
from uuid import uuid4 from uuid import uuid4
from django.db import models from django.db import models
from django.forms import ModelForm
from django.http import HttpRequest from django.http import HttpRequest
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from model_utils.managers import InheritanceManager from model_utils.managers import InheritanceManager
@ -60,8 +59,8 @@ class Stage(SerializerModel):
raise NotImplementedError raise NotImplementedError
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
"""Return Form class used to edit this object""" """Return component used to edit this object"""
raise NotImplementedError raise NotImplementedError
@property @property

View file

@ -37,7 +37,7 @@ class TestFlowsAPI(APITestCase):
def test_api_serializer(self): def test_api_serializer(self):
"""Test that stage serializer returns the correct type""" """Test that stage serializer returns the correct type"""
obj = DummyStage() obj = DummyStage()
self.assertEqual(StageSerializer().get_object_type(obj), "dummy") self.assertEqual(StageSerializer().get_component(obj), "ak-stage-dummy-form")
self.assertEqual(StageSerializer().get_verbose_name(obj), "Dummy Stage") self.assertEqual(StageSerializer().get_verbose_name(obj), "Dummy Stage")
def test_api_viewset(self): def test_api_viewset(self):
@ -90,3 +90,13 @@ class TestFlowsAPI(APITestCase):
) )
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertJSONEqual(response.content, {"diagram": DIAGRAM_SHORT_EXPECTED}) self.assertJSONEqual(response.content, {"diagram": DIAGRAM_SHORT_EXPECTED})
def test_types(self):
"""Test Stage's types endpoint"""
user = User.objects.get(username="akadmin")
self.client.force_login(user)
response = self.client.get(
reverse("authentik_api:stage-types"),
)
self.assertEqual(response.status_code, 200)

View file

@ -1,31 +0,0 @@
"""flow model tests"""
from typing import Callable, Type
from django.forms import ModelForm
from django.test import TestCase
from authentik.flows.models import Stage
from authentik.flows.stage import StageView
class TestStageProperties(TestCase):
"""Generic model properties tests"""
def stage_tester_factory(model: Type[Stage]) -> Callable:
"""Test a form"""
def tester(self: TestStageProperties):
model_inst = model()
self.assertTrue(issubclass(model_inst.form, ModelForm))
self.assertTrue(issubclass(model_inst.type, StageView))
return tester
for stage_type in Stage.__subclasses__():
setattr(
TestStageProperties,
f"test_stage_{stage_type.__name__}",
stage_tester_factory(stage_type),
)

View file

@ -22,7 +22,7 @@ def get_attrs(obj: SerializerModel) -> dict[str, Any]:
"user", "user",
"verbose_name", "verbose_name",
"verbose_name_plural", "verbose_name_plural",
"object_type", "component",
"flow_set", "flow_set",
"promptstage_set", "promptstage_set",
) )

View file

@ -3,8 +3,8 @@ import re
from textwrap import indent from textwrap import indent
from typing import Any, Iterable, Optional from typing import Any, Iterable, Optional
from django.core.exceptions import ValidationError
from requests import Session from requests import Session
from rest_framework.serializers import ValidationError
from sentry_sdk.hub import Hub from sentry_sdk.hub import Hub
from sentry_sdk.tracing import Span from sentry_sdk.tracing import Span
from structlog.stdlib import get_logger from structlog.stdlib import get_logger

View file

@ -1,40 +0,0 @@
"""authentik lib Templatetags"""
from django import template
from django.db.models import Model
from structlog.stdlib import get_logger
register = template.Library()
LOGGER = get_logger()
@register.filter("fieldtype")
def fieldtype(field):
"""Return classname"""
if isinstance(field.__class__, Model) or issubclass(field.__class__, Model):
return verbose_name(field)
return field.__class__.__name__
@register.filter(name="css_class")
def css_class(field, css):
"""Add css class to form field"""
return field.as_widget(attrs={"class": css})
@register.filter
def verbose_name(obj) -> str:
"""Return Object's Verbose Name"""
if not obj:
return ""
if hasattr(obj, "verbose_name"):
return obj.verbose_name
return obj._meta.verbose_name
@register.filter
def form_verbose_name(obj) -> str:
"""Return ModelForm's Object's Verbose Name"""
if not obj:
return ""
return verbose_name(obj._meta.model)

View file

@ -1,26 +0,0 @@
"""Utility Widgets"""
from itertools import groupby
from django.forms.models import ModelChoiceField, ModelChoiceIterator
class GroupedModelChoiceIterator(ModelChoiceIterator):
"""ModelChoiceField which groups objects by their verbose_name"""
def __iter__(self):
if self.field.empty_label is not None:
yield ("", self.field.empty_label)
queryset = self.queryset
# Can't use iterator() when queryset uses prefetch_related()
if not queryset._prefetch_related_lookups:
queryset = queryset.iterator()
# We can't use DB-level sorting as we sort by subclass
queryset = sorted(queryset, key=lambda x: x._meta.verbose_name)
for group, objs in groupby(queryset, key=lambda x: x._meta.verbose_name):
yield (group, [self.choice(obj) for obj in objs])
class GroupedModelChoiceField(ModelChoiceField):
"""ModelChoiceField which groups objects by their verbose_name"""
iterator = GroupedModelChoiceIterator

8
authentik/managed/api.py Normal file
View file

@ -0,0 +1,8 @@
"""Serializer mixin for managed models"""
from rest_framework.fields import CharField
class ManagedSerializer:
"""Managed Serializer"""
managed = CharField(read_only=True, allow_null=True)

View file

@ -1,9 +1,12 @@
"""Outpost API Views""" """Outpost API Views"""
from dataclasses import asdict from dataclasses import asdict
from django.urls import reverse from django.utils.translation import gettext_lazy as _
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from rest_framework import mixins from kubernetes.client.configuration import Configuration
from kubernetes.config.config_exception import ConfigException
from kubernetes.config.kube_config import load_kube_config_from_dict
from rest_framework import mixins, serializers
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.fields import BooleanField, CharField, SerializerMethodField from rest_framework.fields import BooleanField, CharField, SerializerMethodField
from rest_framework.request import Request from rest_framework.request import Request
@ -16,7 +19,6 @@ from authentik.core.api.utils import (
PassiveSerializer, PassiveSerializer,
TypeCreateSerializer, TypeCreateSerializer,
) )
from authentik.lib.templatetags.authentik_utils import verbose_name
from authentik.lib.utils.reflection import all_subclasses from authentik.lib.utils.reflection import all_subclasses
from authentik.outposts.models import ( from authentik.outposts.models import (
DockerServiceConnection, DockerServiceConnection,
@ -28,11 +30,11 @@ from authentik.outposts.models import (
class ServiceConnectionSerializer(ModelSerializer, MetaNameSerializer): class ServiceConnectionSerializer(ModelSerializer, MetaNameSerializer):
"""ServiceConnection Serializer""" """ServiceConnection Serializer"""
object_type = SerializerMethodField() component = SerializerMethodField()
def get_object_type(self, obj: OutpostServiceConnection) -> str: def get_component(self, obj: OutpostServiceConnection) -> str:
"""Get object type so that we know which API Endpoint to use to get the full object""" """Get object component so that we know how to edit the object"""
return obj._meta.object_name.lower().replace("serviceconnection", "") return obj.component
class Meta: class Meta:
@ -41,7 +43,7 @@ class ServiceConnectionSerializer(ModelSerializer, MetaNameSerializer):
"pk", "pk",
"name", "name",
"local", "local",
"object_type", "component",
"verbose_name", "verbose_name",
"verbose_name_plural", "verbose_name_plural",
] ]
@ -68,23 +70,24 @@ class ServiceConnectionViewSet(
filterset_fields = ["name"] filterset_fields = ["name"]
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)}) @swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def types(self, request: Request) -> Response: def types(self, request: Request) -> Response:
"""Get all creatable service connection types""" """Get all creatable service connection types"""
data = [] data = []
for subclass in all_subclasses(self.queryset.model): for subclass in all_subclasses(self.queryset.model):
subclass: OutpostServiceConnection
# pyright: reportGeneralTypeIssues=false
data.append( data.append(
{ {
"name": verbose_name(subclass), "name": subclass._meta.verbose_name,
"description": subclass.__doc__, "description": subclass.__doc__,
"link": reverse("authentik_admin:outpost-service-connection-create") "component": subclass().component,
+ f"?type={subclass.__name__}",
} }
) )
return Response(TypeCreateSerializer(data, many=True).data) return Response(TypeCreateSerializer(data, many=True).data)
@swagger_auto_schema(responses={200: ServiceConnectionStateSerializer(many=False)}) @swagger_auto_schema(responses={200: ServiceConnectionStateSerializer(many=False)})
@action(detail=True) @action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument, invalid-name # pylint: disable=unused-argument, invalid-name
def state(self, request: Request, pk: str) -> Response: def state(self, request: Request, pk: str) -> Response:
"""Get the service connection's state""" """Get the service connection's state"""
@ -115,6 +118,24 @@ class DockerServiceConnectionViewSet(ModelViewSet):
class KubernetesServiceConnectionSerializer(ServiceConnectionSerializer): class KubernetesServiceConnectionSerializer(ServiceConnectionSerializer):
"""KubernetesServiceConnection Serializer""" """KubernetesServiceConnection Serializer"""
def validate_kubeconfig(self, kubeconfig):
"""Validate kubeconfig by attempting to load it"""
if kubeconfig == {}:
if not self.validated_data["local"]:
raise serializers.ValidationError(
_(
"You can only use an empty kubeconfig when connecting to a local cluster."
)
)
# Empty kubeconfig is valid
return kubeconfig
config = Configuration()
try:
load_kube_config_from_dict(kubeconfig, client_configuration=config)
except ConfigException:
raise serializers.ValidationError(_("Invalid kubeconfig"))
return kubeconfig
class Meta: class Meta:
model = KubernetesServiceConnection model = KubernetesServiceConnection

View file

@ -1,75 +0,0 @@
"""Outpost forms"""
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from kubernetes.client.configuration import Configuration
from kubernetes.config.config_exception import ConfigException
from kubernetes.config.kube_config import load_kube_config_from_dict
from authentik.admin.fields import CodeMirrorWidget, YAMLField
from authentik.crypto.models import CertificateKeyPair
from authentik.outposts.models import (
DockerServiceConnection,
KubernetesServiceConnection,
)
class DockerServiceConnectionForm(forms.ModelForm):
"""Docker service-connection form"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["tls_authentication"].queryset = CertificateKeyPair.objects.filter(
key_data__isnull=False
)
class Meta:
model = DockerServiceConnection
fields = ["name", "local", "url", "tls_verification", "tls_authentication"]
widgets = {
"name": forms.TextInput,
"url": forms.TextInput,
}
labels = {
"url": _("URL"),
"tls_verification": _("TLS Verification Certificate"),
"tls_authentication": _("TLS Authentication Certificate"),
}
class KubernetesServiceConnectionForm(forms.ModelForm):
"""Kubernetes service-connection form"""
def clean_kubeconfig(self):
"""Validate kubeconfig by attempting to load it"""
kubeconfig = self.cleaned_data["kubeconfig"]
if kubeconfig == {}:
if not self.cleaned_data["local"]:
raise ValidationError(
_("You can only use an empty kubeconfig when local is enabled.")
)
# Empty kubeconfig is valid
return kubeconfig
config = Configuration()
try:
load_kube_config_from_dict(kubeconfig, client_configuration=config)
except ConfigException:
raise ValidationError(_("Invalid kubeconfig"))
return kubeconfig
class Meta:
model = KubernetesServiceConnection
fields = [
"name",
"local",
"kubeconfig",
]
widgets = {
"name": forms.TextInput,
"kubeconfig": CodeMirrorWidget,
}
field_classes = {
"kubeconfig": YAMLField,
}

View file

@ -1,14 +1,13 @@
"""Outpost models""" """Outpost models"""
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from datetime import datetime from datetime import datetime
from typing import Iterable, Optional, Type, Union from typing import Iterable, Optional, Union
from uuid import uuid4 from uuid import uuid4
from dacite import from_dict from dacite import from_dict
from django.core.cache import cache from django.core.cache import cache
from django.db import models, transaction from django.db import models, transaction
from django.db.models.base import Model from django.db.models.base import Model
from django.forms.models import ModelForm
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from docker.client import DockerClient from docker.client import DockerClient
from docker.errors import DockerException from docker.errors import DockerException
@ -132,8 +131,8 @@ class OutpostServiceConnection(models.Model):
raise NotImplementedError raise NotImplementedError
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
"""Return Form class used to edit this object""" """Return component used to edit this object"""
raise NotImplementedError raise NotImplementedError
class Meta: class Meta:
@ -180,10 +179,8 @@ class DockerServiceConnection(OutpostServiceConnection):
) )
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.outposts.forms import DockerServiceConnectionForm return "ak-service-connection-docker-form"
return DockerServiceConnectionForm
def __str__(self) -> str: def __str__(self) -> str:
return f"Docker Service-Connection {self.name}" return f"Docker Service-Connection {self.name}"
@ -237,10 +234,8 @@ class KubernetesServiceConnection(OutpostServiceConnection):
) )
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.outposts.forms import KubernetesServiceConnectionForm return "ak-service-connection-kubernetes-form"
return KubernetesServiceConnectionForm
def __str__(self) -> str: def __str__(self) -> str:
return f"Kubernetes Service-Connection {self.name}" return f"Kubernetes Service-Connection {self.name}"

View file

@ -0,0 +1,24 @@
"""Test outpost service connection API"""
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import PropertyMapping, User
class TestOutpostServiceConnectionsAPI(APITestCase):
"""Test outpost service connection API"""
def setUp(self) -> None:
super().setUp()
self.mapping = PropertyMapping.objects.create(
name="dummy", expression="""return {'foo': 'bar'}"""
)
self.user = User.objects.get(username="akadmin")
self.client.force_login(self.user)
def test_types(self):
"""Test OutpostServiceConnections's types endpoint"""
response = self.client.get(
reverse("authentik_api:outpostserviceconnection-types"),
)
self.assertEqual(response.status_code, 200)

View file

@ -1,6 +1,5 @@
"""policy API Views""" """policy API Views"""
from django.core.cache import cache from django.core.cache import cache
from django.urls import reverse
from drf_yasg.utils import no_body, swagger_auto_schema from drf_yasg.utils import no_body, swagger_auto_schema
from guardian.shortcuts import get_objects_for_user from guardian.shortcuts import get_objects_for_user
from rest_framework import mixins from rest_framework import mixins
@ -19,7 +18,6 @@ from authentik.core.api.utils import (
MetaNameSerializer, MetaNameSerializer,
TypeCreateSerializer, TypeCreateSerializer,
) )
from authentik.lib.templatetags.authentik_utils import verbose_name
from authentik.lib.utils.reflection import all_subclasses from authentik.lib.utils.reflection import all_subclasses
from authentik.policies.api.exec import PolicyTestResultSerializer, PolicyTestSerializer from authentik.policies.api.exec import PolicyTestResultSerializer, PolicyTestSerializer
from authentik.policies.models import Policy, PolicyBinding from authentik.policies.models import Policy, PolicyBinding
@ -34,16 +32,16 @@ class PolicySerializer(ModelSerializer, MetaNameSerializer):
_resolve_inheritance: bool _resolve_inheritance: bool
object_type = SerializerMethodField() component = SerializerMethodField()
bound_to = SerializerMethodField() bound_to = SerializerMethodField()
def __init__(self, *args, resolve_inheritance: bool = True, **kwargs): def __init__(self, *args, resolve_inheritance: bool = True, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._resolve_inheritance = resolve_inheritance self._resolve_inheritance = resolve_inheritance
def get_object_type(self, obj: Policy) -> str: def get_component(self, obj: Policy) -> str:
"""Get object type so that we know which API Endpoint to use to get the full object""" """Get object component so that we know how to edit the object"""
return obj._meta.object_name.lower().replace("policy", "") return obj.component
def get_bound_to(self, obj: Policy) -> int: def get_bound_to(self, obj: Policy) -> int:
"""Return objects policy is bound to""" """Return objects policy is bound to"""
@ -66,7 +64,7 @@ class PolicySerializer(ModelSerializer, MetaNameSerializer):
"pk", "pk",
"name", "name",
"execution_logging", "execution_logging",
"object_type", "component",
"verbose_name", "verbose_name",
"verbose_name_plural", "verbose_name_plural",
"bound_to", "bound_to",
@ -96,24 +94,24 @@ class PolicyViewSet(
) )
@swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)}) @swagger_auto_schema(responses={200: TypeCreateSerializer(many=True)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def types(self, request: Request) -> Response: def types(self, request: Request) -> Response:
"""Get all creatable policy types""" """Get all creatable policy types"""
data = [] data = []
for subclass in all_subclasses(self.queryset.model): for subclass in all_subclasses(self.queryset.model):
subclass: Policy
data.append( data.append(
{ {
"name": verbose_name(subclass), "name": subclass._meta.verbose_name,
"description": subclass.__doc__, "description": subclass.__doc__,
"link": reverse("authentik_admin:policy-create") "component": subclass().component,
+ f"?type={subclass.__name__}",
} }
) )
return Response(TypeCreateSerializer(data, many=True).data) return Response(TypeCreateSerializer(data, many=True).data)
@permission_required("authentik_policies.view_policy_cache") @permission_required("authentik_policies.view_policy_cache")
@swagger_auto_schema(responses={200: CacheSerializer(many=False)}) @swagger_auto_schema(responses={200: CacheSerializer(many=False)})
@action(detail=False) @action(detail=False, pagination_class=None, filter_backends=[])
def cache_info(self, request: Request) -> Response: def cache_info(self, request: Request) -> Response:
"""Info about cached policies""" """Info about cached policies"""
return Response(data={"count": len(cache.keys("policy_*"))}) return Response(data={"count": len(cache.keys("policy_*"))})
@ -139,7 +137,7 @@ class PolicyViewSet(
request_body=PolicyTestSerializer(), request_body=PolicyTestSerializer(),
responses={200: PolicyTestResultSerializer()}, responses={200: PolicyTestResultSerializer()},
) )
@action(detail=True, methods=["POST"]) @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
# pylint: disable=unused-argument, invalid-name # pylint: disable=unused-argument, invalid-name
def test(self, request: Request, pk: str) -> Response: def test(self, request: Request, pk: str) -> Response:
"""Test policy""" """Test policy"""

View file

@ -1,20 +0,0 @@
"""authentik Policy forms"""
from django import forms
from django.utils.translation import gettext as _
from authentik.policies.dummy.models import DummyPolicy
from authentik.policies.forms import PolicyForm
class DummyPolicyForm(PolicyForm):
"""DummyPolicyForm Form"""
class Meta:
model = DummyPolicy
fields = PolicyForm.Meta.fields + ["result", "wait_min", "wait_max"]
widgets = {
"name": forms.TextInput(),
}
labels = {"result": _("Allow user")}

View file

@ -1,10 +1,8 @@
"""Dummy policy""" """Dummy policy"""
from random import SystemRandom from random import SystemRandom
from time import sleep from time import sleep
from typing import Type
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
@ -32,10 +30,8 @@ class DummyPolicy(Policy):
return DummyPolicySerializer return DummyPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str: # pragma: no cover
from authentik.policies.dummy.forms import DummyPolicyForm return "ak-policy-dummy-form"
return DummyPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
"""Wait random time then return result""" """Wait random time then return result"""

View file

@ -2,7 +2,6 @@
from django.test import TestCase from django.test import TestCase
from guardian.shortcuts import get_anonymous_user from guardian.shortcuts import get_anonymous_user
from authentik.policies.dummy.forms import DummyPolicyForm
from authentik.policies.dummy.models import DummyPolicy from authentik.policies.dummy.models import DummyPolicy
from authentik.policies.engine import PolicyRequest from authentik.policies.engine import PolicyRequest
@ -22,18 +21,3 @@ class TestDummyPolicy(TestCase):
result = policy.passes(self.request) result = policy.passes(self.request)
self.assertFalse(result.passing) self.assertFalse(result.passing)
self.assertEqual(result.messages, ("dummy",)) self.assertEqual(result.messages, ("dummy",))
def test_form(self):
"""test form"""
form = DummyPolicyForm(
data={
"name": "dummy",
"negate": False,
"order": 0,
"timeout": 1,
"result": True,
"wait_min": 1,
"wait_max": 2,
}
)
self.assertTrue(form.is_valid())

View file

@ -1,25 +0,0 @@
"""authentik Event Matcher Policy forms"""
from django import forms
from django.utils.translation import gettext_lazy as _
from authentik.policies.event_matcher.models import EventMatcherPolicy
from authentik.policies.forms import PolicyForm
class EventMatcherPolicyForm(PolicyForm):
"""EventMatcherPolicy Form"""
class Meta:
model = EventMatcherPolicy
fields = PolicyForm.Meta.fields + [
"action",
"client_ip",
"app",
]
widgets = {
"name": forms.TextInput(),
"client_ip": forms.TextInput(),
}
labels = {"client_ip": _("Client IP")}

View file

@ -1,9 +1,6 @@
"""Event Matcher models""" """Event Matcher models"""
from typing import Type
from django.apps import apps from django.apps import apps
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
@ -63,10 +60,8 @@ class EventMatcherPolicy(Policy):
return EventMatcherPolicySerializer return EventMatcherPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.policies.event_matcher.forms import EventMatcherPolicyForm return "ak-policy-event-matcher-form"
return EventMatcherPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
if "event" not in request.context: if "event" not in request.context:

View file

@ -1,22 +0,0 @@
"""authentik PasswordExpiry Policy forms"""
from django import forms
from django.utils.translation import gettext as _
from authentik.policies.expiry.models import PasswordExpiryPolicy
from authentik.policies.forms import PolicyForm
class PasswordExpiryPolicyForm(PolicyForm):
"""Edit PasswordExpiryPolicy instances"""
class Meta:
model = PasswordExpiryPolicy
fields = PolicyForm.Meta.fields + ["days", "deny_only"]
widgets = {
"name": forms.TextInput(),
"order": forms.NumberInput(),
"days": forms.NumberInput(),
}
labels = {"deny_only": _("Only fail the policy, don't set user's password.")}

View file

@ -1,9 +1,7 @@
"""authentik password_expiry_policy Models""" """authentik password_expiry_policy Models"""
from datetime import timedelta from datetime import timedelta
from typing import Type
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.timezone import now from django.utils.timezone import now
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
@ -29,10 +27,8 @@ class PasswordExpiryPolicy(Policy):
return PasswordExpiryPolicySerializer return PasswordExpiryPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.policies.expiry.forms import PasswordExpiryPolicyForm return "ak-policy-password-expiry-form"
return PasswordExpiryPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
"""If password change date is more than x days in the past, call set_unusable_password """If password change date is more than x days in the past, call set_unusable_password

View file

@ -2,12 +2,19 @@
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from authentik.policies.api.policies import PolicySerializer from authentik.policies.api.policies import PolicySerializer
from authentik.policies.expression.evaluator import PolicyEvaluator
from authentik.policies.expression.models import ExpressionPolicy from authentik.policies.expression.models import ExpressionPolicy
class ExpressionPolicySerializer(PolicySerializer): class ExpressionPolicySerializer(PolicySerializer):
"""Group Membership Policy Serializer""" """Group Membership Policy Serializer"""
def validate_expression(self, expr: str) -> str:
"""validate the syntax of the expression"""
name = "temp-policy" if not self.instance else self.instance.name
PolicyEvaluator(name).validate(expr)
return expr
class Meta: class Meta:
model = ExpressionPolicy model = ExpressionPolicy
fields = PolicySerializer.Meta.fields + ["expression"] fields = PolicySerializer.Meta.fields + ["expression"]

View file

@ -1,31 +0,0 @@
"""authentik Expression Policy forms"""
from django import forms
from authentik.admin.fields import CodeMirrorWidget
from authentik.policies.expression.evaluator import PolicyEvaluator
from authentik.policies.expression.models import ExpressionPolicy
from authentik.policies.forms import PolicyForm
class ExpressionPolicyForm(PolicyForm):
"""ExpressionPolicy Form"""
template_name = "policy/expression/form.html"
def clean_expression(self):
"""Test Syntax"""
expression = self.cleaned_data.get("expression")
PolicyEvaluator(self.instance.name).validate(expression)
return expression
class Meta:
model = ExpressionPolicy
fields = PolicyForm.Meta.fields + [
"expression",
]
widgets = {
"name": forms.TextInput(),
"expression": CodeMirrorWidget(mode="python"),
}

View file

@ -1,8 +1,5 @@
"""authentik expression Policy Models""" """authentik expression Policy Models"""
from typing import Type
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
@ -23,10 +20,8 @@ class ExpressionPolicy(Policy):
return ExpressionPolicySerializer return ExpressionPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.policies.expression.forms import ExpressionPolicyForm return "ak-policy-expression-form"
return ExpressionPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
"""Evaluate and render expression. Returns PolicyResult(false) on error.""" """Evaluate and render expression. Returns PolicyResult(false) on error."""

View file

@ -1,14 +0,0 @@
{% extends "generic/form.html" %}
{% load i18n %}
{% block beneath_form %}
<div class="pf-c-form__group ">
<label for="" class="pf-c-form__label"></label>
<div class="c-form__horizontal-group">
<p>
Expression using Python. See <a target="_blank" href="https://goauthentik.io/docs/policies/expression/">here</a> for a list of all variables.
</p>
</div>
</div>
{% endblock %}

View file

@ -1,9 +1,11 @@
"""evaluator tests""" """evaluator tests"""
from django.core.exceptions import ValidationError
from django.test import TestCase from django.test import TestCase
from guardian.shortcuts import get_anonymous_user from guardian.shortcuts import get_anonymous_user
from rest_framework.serializers import ValidationError
from rest_framework.test import APITestCase
from authentik.policies.exceptions import PolicyException from authentik.policies.exceptions import PolicyException
from authentik.policies.expression.api import ExpressionPolicySerializer
from authentik.policies.expression.evaluator import PolicyEvaluator from authentik.policies.expression.evaluator import PolicyEvaluator
from authentik.policies.expression.models import ExpressionPolicy from authentik.policies.expression.models import ExpressionPolicy
from authentik.policies.types import PolicyRequest from authentik.policies.types import PolicyRequest
@ -60,3 +62,16 @@ class TestEvaluator(TestCase):
evaluator = PolicyEvaluator("test") evaluator = PolicyEvaluator("test")
with self.assertRaises(ValidationError): with self.assertRaises(ValidationError):
evaluator.validate(template) evaluator.validate(template)
class TestExpressionPolicyAPI(APITestCase):
"""Test expression policy's API"""
def test_validate(self):
"""Test ExpressionPolicy's validation"""
# Because the root property-mapping has no write operation, we just instantiate
# a serializer and test inline
expr = "return True"
self.assertEqual(ExpressionPolicySerializer().validate_expression(expr), expr)
with self.assertRaises(ValidationError):
print(ExpressionPolicySerializer().validate_expression("/"))

View file

@ -1,42 +0,0 @@
"""General fields"""
from django import forms
from authentik.core.models import Group
from authentik.lib.widgets import GroupedModelChoiceField
from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel
class PolicyBindingForm(forms.ModelForm):
"""Form to edit Policy to PolicyBindingModel Binding"""
target = GroupedModelChoiceField(
queryset=PolicyBindingModel.objects.all().select_subclasses(),
to_field_name="pbm_uuid",
)
policy = GroupedModelChoiceField(
queryset=Policy.objects.all().order_by("name").select_subclasses(),
required=False,
)
group = forms.ModelChoiceField(
queryset=Group.objects.all().order_by("name"), required=False
)
def __init__(self, *args, **kwargs): # pragma: no cover
super().__init__(*args, **kwargs)
if "target" in self.initial:
self.fields["target"].widget = forms.HiddenInput()
class Meta:
model = PolicyBinding
fields = ["enabled", "policy", "group", "user", "target", "order", "timeout"]
class PolicyForm(forms.ModelForm):
"""Base Policy form"""
class Meta:
model = Policy
fields = ["name", "execution_logging"]

View file

@ -1,19 +0,0 @@
"""authentik HaveIBeenPwned Policy forms"""
from django import forms
from authentik.policies.forms import PolicyForm
from authentik.policies.hibp.models import HaveIBeenPwendPolicy
class HaveIBeenPwnedPolicyForm(PolicyForm):
"""Edit HaveIBeenPwendPolicy instances"""
class Meta:
model = HaveIBeenPwendPolicy
fields = PolicyForm.Meta.fields + ["password_field", "allowed_count"]
widgets = {
"name": forms.TextInput(),
"password_field": forms.TextInput(),
}

View file

@ -1,9 +1,7 @@
"""authentik HIBP Models""" """authentik HIBP Models"""
from hashlib import sha1 from hashlib import sha1
from typing import Type
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from requests import get from requests import get
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
@ -35,10 +33,8 @@ class HaveIBeenPwendPolicy(Policy):
return HaveIBeenPwendPolicySerializer return HaveIBeenPwendPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.policies.hibp.forms import HaveIBeenPwnedPolicyForm return "ak-policy-hibp-form"
return HaveIBeenPwnedPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
"""Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5

View file

@ -1,9 +1,7 @@
"""Policy base models""" """Policy base models"""
from typing import Type
from uuid import uuid4 from uuid import uuid4
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from model_utils.managers import InheritanceManager from model_utils.managers import InheritanceManager
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
@ -147,8 +145,8 @@ class Policy(SerializerModel, CreatedUpdatedModel):
objects = InheritanceAutoManager() objects = InheritanceAutoManager()
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
"""Return Form class used to edit this object""" """Return component used to edit this object"""
raise NotImplementedError raise NotImplementedError
def __str__(self): def __str__(self):

View file

@ -1,36 +0,0 @@
"""authentik Policy forms"""
from django import forms
from django.utils.translation import gettext as _
from authentik.policies.forms import PolicyForm
from authentik.policies.password.models import PasswordPolicy
class PasswordPolicyForm(PolicyForm):
"""PasswordPolicy Form"""
class Meta:
model = PasswordPolicy
fields = PolicyForm.Meta.fields + [
"password_field",
"amount_uppercase",
"amount_lowercase",
"amount_symbols",
"length_min",
"symbol_charset",
"error_message",
]
widgets = {
"name": forms.TextInput(),
"password_field": forms.TextInput(),
"symbol_charset": forms.TextInput(),
"error_message": forms.TextInput(),
}
labels = {
"amount_uppercase": _("Minimum amount of Uppercase Characters"),
"amount_lowercase": _("Minimum amount of Lowercase Characters"),
"amount_symbols": _("Minimum amount of Symbols Characters"),
"length_min": _("Minimum Length"),
}

View file

@ -1,9 +1,7 @@
"""user field matcher models""" """user field matcher models"""
import re import re
from typing import Type
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
@ -38,10 +36,8 @@ class PasswordPolicy(Policy):
return PasswordPolicySerializer return PasswordPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.policies.password.forms import PasswordPolicyForm return "ak-policy-password-form"
return PasswordPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
if self.password_field not in request.context: if self.password_field not in request.context:

View file

@ -1,22 +0,0 @@
"""authentik reputation request forms"""
from django import forms
from django.utils.translation import gettext_lazy as _
from authentik.policies.forms import PolicyForm
from authentik.policies.reputation.models import ReputationPolicy
class ReputationPolicyForm(PolicyForm):
"""Form to edit ReputationPolicy"""
class Meta:
model = ReputationPolicy
fields = PolicyForm.Meta.fields + ["check_ip", "check_username", "threshold"]
widgets = {
"name": forms.TextInput(),
"value": forms.TextInput(),
}
labels = {
"check_ip": _("Check IP"),
}

View file

@ -1,9 +1,6 @@
"""authentik reputation request policy""" """authentik reputation request policy"""
from typing import Type
from django.core.cache import cache from django.core.cache import cache
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer from rest_framework.serializers import BaseSerializer
@ -30,10 +27,8 @@ class ReputationPolicy(Policy):
return ReputationPolicySerializer return ReputationPolicySerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.policies.reputation.forms import ReputationPolicyForm return "ak-policy-reputation-form"
return ReputationPolicyForm
def passes(self, request: PolicyRequest) -> PolicyResult: def passes(self, request: PolicyRequest) -> PolicyResult:
remote_ip = get_client_ip(request.http_request) or "255.255.255.255" remote_ip = get_client_ip(request.http_request) or "255.255.255.255"

View file

@ -2,7 +2,6 @@
{% load static %} {% load static %}
{% load i18n %} {% load i18n %}
{% load authentik_utils %}
{% block card_title %} {% block card_title %}
{% trans 'Permission denied' %} {% trans 'Permission denied' %}

View file

@ -1,30 +0,0 @@
"""flow model tests"""
from typing import Callable, Type
from django.forms import ModelForm
from django.test import TestCase
from authentik.lib.utils.reflection import all_subclasses
from authentik.policies.models import Policy
class TestPolicyProperties(TestCase):
"""Generic model properties tests"""
def policy_tester_factory(model: Type[Policy]) -> Callable:
"""Test a form"""
def tester(self: TestPolicyProperties):
model_inst = model()
self.assertTrue(issubclass(model_inst.form, ModelForm))
return tester
for policy_type in all_subclasses(Policy):
setattr(
TestPolicyProperties,
f"test_policy_{policy_type.__name__}",
policy_tester_factory(policy_type),
)

View file

@ -26,3 +26,10 @@ class TestPoliciesAPI(APITestCase):
self.assertJSONEqual( self.assertJSONEqual(
response.content.decode(), {"passing": True, "messages": ["dummy"]} response.content.decode(), {"passing": True, "messages": ["dummy"]}
) )
def test_types(self):
"""Test Policy's types endpoint"""
response = self.client.get(
reverse("authentik_api:policy-types"),
)
self.assertEqual(response.status_code, 200)

View file

@ -1,22 +1,35 @@
"""OAuth2Provider API Views""" """OAuth2Provider API Views"""
from django.urls import reverse from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.fields import ReadOnlyField from rest_framework.fields import ReadOnlyField
from rest_framework.generics import get_object_or_404 from rest_framework.generics import get_object_or_404
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.serializers import Serializer from rest_framework.serializers import ValidationError
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from authentik.core.api.providers import ProviderSerializer from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.utils import PassiveSerializer
from authentik.core.models import Provider from authentik.core.models import Provider
from authentik.providers.oauth2.models import OAuth2Provider from authentik.providers.oauth2.models import JWTAlgorithms, OAuth2Provider
class OAuth2ProviderSerializer(ProviderSerializer): class OAuth2ProviderSerializer(ProviderSerializer):
"""OAuth2Provider Serializer""" """OAuth2Provider Serializer"""
def validate_jwt_alg(self, value):
"""Ensure that when RS256 is selected, a certificate-key-pair is selected"""
if (
self.initial_data.get("rsa_key", None) is None
and value == JWTAlgorithms.RS256
):
raise ValidationError(
_("RS256 requires a Certificate-Key-Pair to be selected.")
)
return value
class Meta: class Meta:
model = OAuth2Provider model = OAuth2Provider
@ -36,7 +49,7 @@ class OAuth2ProviderSerializer(ProviderSerializer):
] ]
class OAuth2ProviderSetupURLs(Serializer): class OAuth2ProviderSetupURLs(PassiveSerializer):
"""OAuth2 Provider Metadata serializer""" """OAuth2 Provider Metadata serializer"""
issuer = ReadOnlyField() issuer = ReadOnlyField()
@ -46,12 +59,6 @@ class OAuth2ProviderSetupURLs(Serializer):
provider_info = ReadOnlyField() provider_info = ReadOnlyField()
logout = ReadOnlyField() logout = ReadOnlyField()
def create(self, request: Request) -> Response:
raise NotImplementedError
def update(self, request: Request) -> Response:
raise NotImplementedError
class OAuth2ProviderViewSet(ModelViewSet): class OAuth2ProviderViewSet(ModelViewSet):
"""OAuth2Provider Viewset""" """OAuth2Provider Viewset"""

View file

@ -1,25 +1,19 @@
"""OAuth2Provider API Views""" """OAuth2Provider API Views"""
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from authentik.core.api.utils import MetaNameSerializer from authentik.core.api.propertymappings import PropertyMappingSerializer
from authentik.providers.oauth2.models import ScopeMapping from authentik.providers.oauth2.models import ScopeMapping
class ScopeMappingSerializer(ModelSerializer, MetaNameSerializer): class ScopeMappingSerializer(PropertyMappingSerializer):
"""ScopeMapping Serializer""" """ScopeMapping Serializer"""
class Meta: class Meta:
model = ScopeMapping model = ScopeMapping
fields = [ fields = PropertyMappingSerializer.Meta.fields + [
"pk",
"name",
"scope_name", "scope_name",
"description", "description",
"expression",
"verbose_name",
"verbose_name_plural",
] ]

View file

@ -1,101 +0,0 @@
"""authentik OAuth2 Provider Forms"""
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext as _
from authentik.admin.fields import CodeMirrorWidget
from authentik.core.expression import PropertyMappingEvaluator
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Flow, FlowDesignation
from authentik.providers.oauth2.generators import (
generate_client_id,
generate_client_secret,
)
from authentik.providers.oauth2.models import (
JWTAlgorithms,
OAuth2Provider,
ScopeMapping,
)
class OAuth2ProviderForm(forms.ModelForm):
"""OAuth2 Provider form"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["authorization_flow"].queryset = Flow.objects.filter(
designation=FlowDesignation.AUTHORIZATION
)
self.fields["client_id"].initial = generate_client_id()
self.fields["client_secret"].initial = generate_client_secret()
self.fields["rsa_key"].queryset = CertificateKeyPair.objects.exclude(
key_data__exact=""
)
self.fields["property_mappings"].queryset = ScopeMapping.objects.all()
def clean_jwt_alg(self):
"""Ensure that when RS256 is selected, a certificate-key-pair is selected"""
if (
self.data["rsa_key"] == ""
and self.cleaned_data["jwt_alg"] == JWTAlgorithms.RS256
):
raise ValidationError(
_("RS256 requires a Certificate-Key-Pair to be selected.")
)
return self.cleaned_data["jwt_alg"]
class Meta:
model = OAuth2Provider
fields = [
"name",
"authorization_flow",
"client_type",
"client_id",
"client_secret",
"token_validity",
"jwt_alg",
"property_mappings",
"rsa_key",
"redirect_uris",
"sub_mode",
"include_claims_in_id_token",
"issuer_mode",
]
widgets = {
"name": forms.TextInput(),
"token_validity": forms.TextInput(),
}
labels = {"property_mappings": _("Scopes")}
help_texts = {
"property_mappings": _(
(
"Select which scopes <b>can</b> be used by the client. "
"The client stil has to specify the scope to access the data."
)
)
}
class ScopeMappingForm(forms.ModelForm):
"""Form to edit ScopeMappings"""
template_name = "providers/oauth2/property_mapping_form.html"
def clean_expression(self):
"""Test Syntax"""
expression = self.cleaned_data.get("expression")
evaluator = PropertyMappingEvaluator()
evaluator.validate(expression)
return expression
class Meta:
model = ScopeMapping
fields = ["name", "scope_name", "description", "expression"]
widgets = {
"name": forms.TextInput(),
"scope_name": forms.TextInput(),
"description": forms.TextInput(),
"expression": CodeMirrorWidget(mode="python"),
}

View file

@ -13,7 +13,6 @@ from uuid import uuid4
from dacite import from_dict from dacite import from_dict
from django.conf import settings from django.conf import settings
from django.db import models from django.db import models
from django.forms import ModelForm
from django.http import HttpRequest from django.http import HttpRequest
from django.utils import dateformat, timezone from django.utils import dateformat, timezone
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
@ -112,10 +111,8 @@ class ScopeMapping(PropertyMapping):
) )
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.providers.oauth2.forms import ScopeMappingForm return "ak-property-mapping-scope-form"
return ScopeMappingForm
@property @property
def serializer(self) -> Type[Serializer]: def serializer(self) -> Type[Serializer]:
@ -285,18 +282,16 @@ class OAuth2Provider(Provider):
launch_url = urlparse(main_url) launch_url = urlparse(main_url)
return main_url.replace(launch_url.path, "") return main_url.replace(launch_url.path, "")
@property
def component(self) -> str:
return "ak-provider-oauth2-form"
@property @property
def serializer(self) -> Type[Serializer]: def serializer(self) -> Type[Serializer]:
from authentik.providers.oauth2.api.provider import OAuth2ProviderSerializer from authentik.providers.oauth2.api.provider import OAuth2ProviderSerializer
return OAuth2ProviderSerializer return OAuth2ProviderSerializer
@property
def form(self) -> Type[ModelForm]:
from authentik.providers.oauth2.forms import OAuth2ProviderForm
return OAuth2ProviderForm
def __str__(self): def __str__(self):
return f"OAuth2 Provider {self.name}" return f"OAuth2 Provider {self.name}"

View file

@ -2,7 +2,6 @@
{% load static %} {% load static %}
{% load i18n %} {% load i18n %}
{% load authentik_utils %}
{% block head %} {% block head %}
{{ block.super }} {{ block.super }}

View file

@ -1,14 +0,0 @@
{% extends "generic/form.html" %}
{% load i18n %}
{% block beneath_form %}
<div class="pf-c-form__group ">
<label for="" class="pf-c-form__label"></label>
<div class="c-form__horizontal-group">
<p>
Expression using Python. See <a href="https://goauthentik.io/docs/property-mappings/expression/">here</a> for a list of all variables.
</p>
</div>
</div>
{% endblock %}

View file

@ -0,0 +1,37 @@
"""Test oauth2 provider API"""
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.flows.models import Flow, FlowDesignation
from authentik.providers.oauth2.models import JWTAlgorithms
class TestOAuth2ProviderAPI(APITestCase):
"""Test oauth2 provider API"""
def setUp(self) -> None:
super().setUp()
self.user = User.objects.get(username="akadmin")
self.client.force_login(self.user)
def test_validate(self):
"""Test OAuth2 Provider validation"""
response = self.client.post(
reverse(
"authentik_api:oauth2provider-list",
),
data={
"name": "test",
"jwt_alg": str(JWTAlgorithms.RS256),
"authorization_flow": Flow.objects.filter(
designation=FlowDesignation.AUTHORIZATION
)
.first()
.pk,
},
)
self.assertJSONEqual(
response.content.decode(),
{"jwt_alg": ["RS256 requires a Certificate-Key-Pair to be selected."]},
)

View file

@ -1,17 +1,16 @@
"""ProxyProvider API Views""" """ProxyProvider API Views"""
from drf_yasg.utils import swagger_serializer_method from drf_yasg.utils import swagger_serializer_method
from rest_framework.fields import CharField, ListField, SerializerMethodField from rest_framework.fields import CharField, ListField, SerializerMethodField
from rest_framework.request import Request from rest_framework.serializers import ModelSerializer
from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer, Serializer
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from authentik.core.api.providers import ProviderSerializer from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.utils import PassiveSerializer
from authentik.providers.oauth2.views.provider import ProviderInfoView from authentik.providers.oauth2.views.provider import ProviderInfoView
from authentik.providers.proxy.models import ProxyProvider from authentik.providers.proxy.models import ProxyProvider
class OpenIDConnectConfigurationSerializer(Serializer): class OpenIDConnectConfigurationSerializer(PassiveSerializer):
"""rest_framework Serializer for OIDC Configuration""" """rest_framework Serializer for OIDC Configuration"""
issuer = CharField() issuer = CharField()
@ -27,12 +26,6 @@ class OpenIDConnectConfigurationSerializer(Serializer):
subject_types_supported = ListField(child=CharField()) subject_types_supported = ListField(child=CharField())
token_endpoint_auth_methods_supported = ListField(child=CharField()) token_endpoint_auth_methods_supported = ListField(child=CharField())
def create(self, request: Request) -> Response:
raise NotImplementedError
def update(self, request: Request) -> Response:
raise NotImplementedError
class ProxyProviderSerializer(ProviderSerializer): class ProxyProviderSerializer(ProviderSerializer):
"""ProxyProvider Serializer""" """ProxyProvider Serializer"""

View file

@ -1,50 +0,0 @@
"""authentik Proxy Provider Forms"""
from django import forms
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Flow, FlowDesignation
from authentik.providers.proxy.models import ProxyProvider
class ProxyProviderForm(forms.ModelForm):
"""Proxy Provider form"""
instance: ProxyProvider
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["authorization_flow"].queryset = Flow.objects.filter(
designation=FlowDesignation.AUTHORIZATION
)
self.fields["certificate"].queryset = CertificateKeyPair.objects.filter(
key_data__isnull=False
).exclude(key_data="")
def save(self, *args, **kwargs):
actual_save = super().save(*args, **kwargs)
self.instance.set_oauth_defaults()
self.instance.save()
return actual_save
class Meta:
model = ProxyProvider
fields = [
"name",
"authorization_flow",
"internal_host",
"internal_host_ssl_validation",
"external_host",
"certificate",
"skip_path_regex",
"basic_auth_enabled",
"basic_auth_user_attribute",
"basic_auth_password_attribute",
]
widgets = {
"name": forms.TextInput(),
"internal_host": forms.TextInput(),
"external_host": forms.TextInput(),
"basic_auth_user_attribute": forms.TextInput(),
"basic_auth_password_attribute": forms.TextInput(),
}

View file

@ -5,7 +5,6 @@ from typing import Iterable, Optional, Type
from urllib.parse import urljoin from urllib.parse import urljoin
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from rest_framework.serializers import Serializer from rest_framework.serializers import Serializer
@ -102,10 +101,8 @@ class ProxyProvider(OutpostModel, OAuth2Provider):
cookie_secret = models.TextField(default=get_cookie_secret) cookie_secret = models.TextField(default=get_cookie_secret)
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.providers.proxy.forms import ProxyProviderForm return "ak-provider-proxy-form"
return ProxyProviderForm
@property @property
def serializer(self) -> Type[Serializer]: def serializer(self) -> Type[Serializer]:

View file

@ -1,17 +1,35 @@
"""SAMLProvider API Views""" """SAMLProvider API Views"""
from xml.etree.ElementTree import ParseError # nosec
from defusedxml.ElementTree import fromstring
from django.http.response import HttpResponse
from django.shortcuts import get_object_or_404
from django.utils.translation import gettext_lazy as _
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.fields import ReadOnlyField from rest_framework.fields import CharField, FileField, ReadOnlyField
from rest_framework.parsers import MultiPartParser
from rest_framework.permissions import AllowAny
from rest_framework.relations import SlugRelatedField
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer, Serializer from rest_framework.serializers import ValidationError
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from structlog.stdlib import get_logger
from authentik.api.decorators import permission_required
from authentik.core.api.propertymappings import PropertyMappingSerializer
from authentik.core.api.providers import ProviderSerializer from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.utils import MetaNameSerializer from authentik.core.api.utils import PassiveSerializer
from authentik.core.models import Provider from authentik.core.models import Provider
from authentik.flows.models import Flow, FlowDesignation
from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider
from authentik.providers.saml.views.metadata import DescriptorDownloadView from authentik.providers.saml.processors.metadata import MetadataProcessor
from authentik.providers.saml.processors.metadata_parser import (
ServiceProviderMetadataParser,
)
LOGGER = get_logger()
class SAMLProviderSerializer(ProviderSerializer): class SAMLProviderSerializer(ProviderSerializer):
@ -33,19 +51,26 @@ class SAMLProviderSerializer(ProviderSerializer):
"signature_algorithm", "signature_algorithm",
"signing_kp", "signing_kp",
"verification_kp", "verification_kp",
"sp_binding",
] ]
class SAMLMetadataSerializer(Serializer): class SAMLMetadataSerializer(PassiveSerializer):
"""SAML Provider Metadata serializer""" """SAML Provider Metadata serializer"""
metadata = ReadOnlyField() metadata = ReadOnlyField()
def create(self, request: Request) -> Response:
raise NotImplementedError
def update(self, request: Request) -> Response: class SAMLProviderImportSerializer(PassiveSerializer):
raise NotImplementedError """Import saml provider from XML Metadata"""
name = CharField(required=True)
# Using SlugField because https://github.com/OpenAPITools/openapi-generator/issues/3278
authorization_flow = SlugRelatedField(
queryset=Flow.objects.filter(designation=FlowDesignation.AUTHORIZATION),
slug_field="slug",
)
file = FileField()
class SAMLProviderViewSet(ModelViewSet): class SAMLProviderViewSet(ModelViewSet):
@ -55,32 +80,70 @@ class SAMLProviderViewSet(ModelViewSet):
serializer_class = SAMLProviderSerializer serializer_class = SAMLProviderSerializer
@swagger_auto_schema(responses={200: SAMLMetadataSerializer(many=False)}) @swagger_auto_schema(responses={200: SAMLMetadataSerializer(many=False)})
@action(methods=["GET"], detail=True) @action(methods=["GET"], detail=True, permission_classes=[AllowAny])
# pylint: disable=invalid-name, unused-argument # pylint: disable=invalid-name, unused-argument
def metadata(self, request: Request, pk: int) -> Response: def metadata(self, request: Request, pk: int) -> Response:
"""Return metadata as XML string""" """Return metadata as XML string"""
provider = self.get_object() # We don't use self.get_object() on purpose as this view is un-authenticated
provider = get_object_or_404(SAMLProvider, pk=pk)
try: try:
metadata = DescriptorDownloadView.get_metadata(request, provider) metadata = MetadataProcessor(provider, request).build_entity_descriptor()
if "download" in request._request.GET:
response = HttpResponse(metadata, content_type="application/xml")
response[
"Content-Disposition"
] = f'attachment; filename="{provider.name}_authentik_meta.xml"'
return response
return Response({"metadata": metadata}) return Response({"metadata": metadata})
except Provider.application.RelatedObjectDoesNotExist: # pylint: disable=no-member except Provider.application.RelatedObjectDoesNotExist: # pylint: disable=no-member
return Response({"metadata": ""}) return Response({"metadata": ""})
@permission_required(
None,
[
"authentik_providers_saml.add_samlprovider",
"authentik_crypto.add_certificatekeypair",
],
)
@swagger_auto_schema(
request_body=SAMLProviderImportSerializer(),
responses={204: "Successfully imported provider", 400: "Bad request"},
)
@action(detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
def import_metadata(self, request: Request) -> Response:
"""Create provider from SAML Metadata"""
data = SAMLProviderImportSerializer(data=request.data)
if not data.is_valid():
raise ValidationError(data.errors)
file = data.validated_data["file"]
# Validate syntax first
try:
fromstring(file.read())
except ParseError:
raise ValidationError(_("Invalid XML Syntax"))
file.seek(0)
try:
metadata = ServiceProviderMetadataParser().parse(file.read().decode())
metadata.to_provider(
data.validated_data["name"], data.validated_data["authorization_flow"]
)
except ValueError as exc: # pragma: no cover
LOGGER.warning(str(exc))
return ValidationError(
_("Failed to import Metadata: %(message)s" % {"message": str(exc)}),
)
return Response(status=204)
class SAMLPropertyMappingSerializer(ModelSerializer, MetaNameSerializer):
class SAMLPropertyMappingSerializer(PropertyMappingSerializer):
"""SAMLPropertyMapping Serializer""" """SAMLPropertyMapping Serializer"""
class Meta: class Meta:
model = SAMLPropertyMapping model = SAMLPropertyMapping
fields = [ fields = PropertyMappingSerializer.Meta.fields + [
"pk",
"name",
"saml_name", "saml_name",
"friendly_name", "friendly_name",
"expression",
"verbose_name",
"verbose_name_plural",
] ]

View file

@ -1,115 +0,0 @@
"""authentik SAML IDP Forms"""
from xml.etree.ElementTree import ParseError # nosec
from defusedxml.ElementTree import fromstring
from django import forms
from django.core.exceptions import ValidationError
from django.core.validators import FileExtensionValidator
from django.utils.html import mark_safe
from django.utils.translation import gettext_lazy as _
from authentik.admin.fields import CodeMirrorWidget
from authentik.core.expression import PropertyMappingEvaluator
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Flow, FlowDesignation
from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider
class SAMLProviderForm(forms.ModelForm):
"""SAML Provider form"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["authorization_flow"].queryset = Flow.objects.filter(
designation=FlowDesignation.AUTHORIZATION
)
self.fields["property_mappings"].queryset = SAMLPropertyMapping.objects.all()
self.fields["signing_kp"].queryset = CertificateKeyPair.objects.exclude(
key_data__iexact=""
)
class Meta:
model = SAMLProvider
fields = [
"name",
"authorization_flow",
"acs_url",
"issuer",
"sp_binding",
"audience",
"signing_kp",
"verification_kp",
"property_mappings",
"name_id_mapping",
"assertion_valid_not_before",
"assertion_valid_not_on_or_after",
"session_valid_not_on_or_after",
"digest_algorithm",
"signature_algorithm",
]
widgets = {
"name": forms.TextInput(),
"audience": forms.TextInput(),
"issuer": forms.TextInput(),
"assertion_valid_not_before": forms.TextInput(),
"assertion_valid_not_on_or_after": forms.TextInput(),
"session_valid_not_on_or_after": forms.TextInput(),
}
class SAMLPropertyMappingForm(forms.ModelForm):
"""SAML Property Mapping form"""
template_name = "providers/saml/property_mapping_form.html"
def clean_expression(self):
"""Test Syntax"""
expression = self.cleaned_data.get("expression")
evaluator = PropertyMappingEvaluator()
evaluator.validate(expression)
return expression
class Meta:
model = SAMLPropertyMapping
fields = ["name", "saml_name", "friendly_name", "expression"]
widgets = {
"name": forms.TextInput(),
"saml_name": forms.TextInput(),
"friendly_name": forms.TextInput(),
"expression": CodeMirrorWidget(mode="python"),
}
help_texts = {
"saml_name": mark_safe(
_(
"URN OID used by SAML. This is optional. "
'<a href="https://www.rfc-editor.org/rfc/rfc2798.html#section-2">Reference</a>.'
" If this property mapping is used for NameID Property, "
"this field is discarded."
)
),
}
class SAMLProviderImportForm(forms.Form):
"""Create a SAML Provider from SP Metadata."""
provider_name = forms.CharField()
authorization_flow = forms.ModelChoiceField(
queryset=Flow.objects.filter(designation=FlowDesignation.AUTHORIZATION)
)
metadata = forms.FileField(
validators=[FileExtensionValidator(allowed_extensions=["xml"])]
)
def clean_metadata(self):
"""Check if the flow is valid XML"""
metadata = self.cleaned_data["metadata"].read()
try:
fromstring(metadata)
except ParseError:
raise ValidationError(_("Invalid XML Syntax"))
self.cleaned_data["metadata"].seek(0)
return self.cleaned_data["metadata"]

View file

@ -3,7 +3,6 @@ from typing import Optional, Type
from urllib.parse import urlparse from urllib.parse import urlparse
from django.db import models from django.db import models
from django.forms import ModelForm
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import Serializer from rest_framework.serializers import Serializer
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
@ -171,10 +170,8 @@ class SAMLProvider(Provider):
return SAMLProviderSerializer return SAMLProviderSerializer
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.providers.saml.forms import SAMLProviderForm return "ak-provider-saml-form"
return SAMLProviderForm
def __str__(self): def __str__(self):
return f"SAML Provider {self.name}" return f"SAML Provider {self.name}"
@ -192,10 +189,8 @@ class SAMLPropertyMapping(PropertyMapping):
friendly_name = models.TextField(default=None, blank=True, null=True) friendly_name = models.TextField(default=None, blank=True, null=True)
@property @property
def form(self) -> Type[ModelForm]: def component(self) -> str:
from authentik.providers.saml.forms import SAMLPropertyMappingForm return "ak-property-mapping-saml-form"
return SAMLPropertyMappingForm
@property @property
def serializer(self) -> Type[Serializer]: def serializer(self) -> Type[Serializer]:

View file

@ -1,13 +0,0 @@
{% extends base_template|default:"generic/form.html" %}
{% load i18n %}
{% block above_form %}
<h1>
{% trans 'Import SAML Metadata' %}
</h1>
{% endblock %}
{% block action %}
{% trans 'Import Metadata' %}
{% endblock %}

View file

@ -1,14 +0,0 @@
{% extends "generic/form.html" %}
{% load i18n %}
{% block beneath_form %}
<div class="pf-c-form__group ">
<label for="" class="pf-c-form__label"></label>
<div class="c-form__horizontal-group">
<p>
Expression using Python. See <a href="https://goauthentik.io/docs/property-mappings/expression/">here</a> for a list of all variables.
</p>
</div>
</div>
{% endblock %}

View file

@ -0,0 +1,115 @@
"""SAML Provider API Tests"""
from tempfile import TemporaryFile
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Application, User
from authentik.flows.models import Flow, FlowDesignation
from authentik.providers.saml.models import SAMLProvider
from authentik.providers.saml.tests.test_metadata import METADATA_SIMPLE
class TestSAMLProviderAPI(APITestCase):
"""SAML Provider API Tests"""
def setUp(self) -> None:
super().setUp()
self.user = User.objects.get(username="akadmin")
self.client.force_login(self.user)
def test_metadata(self):
"""Test metadata export (normal)"""
provider = SAMLProvider.objects.create(
name="test",
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
)
Application.objects.create(name="test", provider=provider, slug="test")
response = self.client.get(
reverse("authentik_api:samlprovider-metadata", kwargs={"pk": provider.pk}),
)
self.assertEqual(200, response.status_code)
def test_metadata_download(self):
"""Test metadata export (download)"""
provider = SAMLProvider.objects.create(
name="test",
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
)
Application.objects.create(name="test", provider=provider, slug="test")
response = self.client.get(
reverse("authentik_api:samlprovider-metadata", kwargs={"pk": provider.pk})
+ "?download",
)
self.assertEqual(200, response.status_code)
self.assertIn("Content-Disposition", response)
def test_metadata_invalid(self):
"""Test metadata export (invalid)"""
# Provider without application
provider = SAMLProvider.objects.create(
name="test",
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
)
response = self.client.get(
reverse("authentik_api:samlprovider-metadata", kwargs={"pk": provider.pk}),
)
self.assertEqual(200, response.status_code)
def test_import_success(self):
"""Test metadata import (success case)"""
with TemporaryFile() as metadata:
metadata.write(METADATA_SIMPLE.encode())
metadata.seek(0)
response = self.client.post(
reverse("authentik_api:samlprovider-import-metadata"),
{
"file": metadata,
"name": "test",
"authorization_flow": Flow.objects.filter(
designation=FlowDesignation.AUTHORIZATION
)
.first()
.slug,
},
format="multipart",
)
self.assertEqual(204, response.status_code)
# We don't test the actual object being created here, that has its own tests
def test_import_failed(self):
"""Test metadata import (invalid xml)"""
with TemporaryFile() as metadata:
metadata.write(b"invalid")
metadata.seek(0)
response = self.client.post(
reverse("authentik_api:samlprovider-import-metadata"),
{
"file": metadata,
"name": "test",
"authorization_flow": Flow.objects.filter(
designation=FlowDesignation.AUTHORIZATION
)
.first()
.slug,
},
format="multipart",
)
self.assertEqual(400, response.status_code)
def test_import_invalid(self):
"""Test metadata import (invalid input)"""
response = self.client.post(
reverse("authentik_api:samlprovider-import-metadata"),
{
"name": "test",
},
format="multipart",
)
self.assertEqual(400, response.status_code)

View file

@ -1,7 +1,7 @@
"""authentik SAML IDP URLs""" """authentik SAML IDP URLs"""
from django.urls import path from django.urls import path
from authentik.providers.saml.views import metadata, sso from authentik.providers.saml.views import sso
urlpatterns = [ urlpatterns = [
# SSO Bindings # SSO Bindings
@ -21,9 +21,4 @@ urlpatterns = [
sso.SAMLSSOBindingInitView.as_view(), sso.SAMLSSOBindingInitView.as_view(),
name="sso-init", name="sso-init",
), ),
path(
"<slug:application_slug>/metadata/",
metadata.DescriptorDownloadView.as_view(),
name="metadata",
),
] ]

View file

@ -1,81 +0,0 @@
"""authentik SAML IDP Views"""
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.generic.edit import FormView
from structlog.stdlib import get_logger
from authentik.core.models import Application, Provider
from authentik.lib.views import bad_request_message
from authentik.providers.saml.forms import SAMLProviderImportForm
from authentik.providers.saml.models import SAMLProvider
from authentik.providers.saml.processors.metadata import MetadataProcessor
from authentik.providers.saml.processors.metadata_parser import (
ServiceProviderMetadataParser,
)
LOGGER = get_logger()
class DescriptorDownloadView(View):
"""Replies with the XML Metadata IDSSODescriptor."""
@staticmethod
def get_metadata(request: HttpRequest, provider: SAMLProvider) -> str:
"""Return rendered XML Metadata"""
return MetadataProcessor(provider, request).build_entity_descriptor()
def get(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""Replies with the XML Metadata IDSSODescriptor."""
application = get_object_or_404(Application, slug=application_slug)
provider: SAMLProvider = get_object_or_404(
SAMLProvider, pk=application.provider_id
)
try:
metadata = DescriptorDownloadView.get_metadata(request, provider)
except Provider.application.RelatedObjectDoesNotExist: # pylint: disable=no-member
return bad_request_message(
request, "Provider is not assigned to an application."
)
else:
response = HttpResponse(metadata, content_type="application/xml")
response[
"Content-Disposition"
] = f'attachment; filename="{provider.name}_authentik_meta.xml"'
return response
class MetadataImportView(LoginRequiredMixin, FormView):
"""Import Metadata from XML, and create provider"""
form_class = SAMLProviderImportForm
template_name = "providers/saml/import.html"
success_url = "/"
def dispatch(self, request, *args, **kwargs):
if not request.user.is_superuser:
return self.handle_no_permission()
return super().dispatch(request, *args, **kwargs)
def form_valid(self, form: SAMLProviderImportForm) -> HttpResponse:
try:
metadata = ServiceProviderMetadataParser().parse(
form.cleaned_data["metadata"].read().decode()
)
metadata.to_provider(
form.cleaned_data["provider_name"],
form.cleaned_data["authorization_flow"],
)
messages.success(self.request, _("Successfully created Provider"))
except ValueError as exc:
LOGGER.warning(str(exc))
messages.error(
self.request,
_("Failed to import Metadata: %(message)s" % {"message": str(exc)}),
)
return super().form_invalid(form)
return super().form_valid(form)

View file

@ -78,7 +78,6 @@ AUTHENTICATION_BACKENDS = [
# Application definition # Application definition
INSTALLED_APPS = [ INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth", "django.contrib.auth",
"django.contrib.contenttypes", "django.contrib.contenttypes",
"django.contrib.sessions", "django.contrib.sessions",

View file

@ -1,9 +1,7 @@
"""authentik URL Configuration""" """authentik URL Configuration"""
from django.conf import settings from django.conf import settings
from django.conf.urls.static import static from django.conf.urls.static import static
from django.contrib import admin
from django.urls import include, path from django.urls import include, path
from django.views.generic import RedirectView
from structlog.stdlib import get_logger from structlog.stdlib import get_logger
from authentik.core.views import error from authentik.core.views import error
@ -11,13 +9,6 @@ from authentik.lib.utils.reflection import get_apps
from authentik.root.monitoring import LiveView, MetricsView, ReadyView from authentik.root.monitoring import LiveView, MetricsView, ReadyView
LOGGER = get_logger() LOGGER = get_logger()
admin.autodiscover()
admin.site.login = RedirectView.as_view(
pattern_name="authentik_flows:default-authentication"
)
admin.site.logout = RedirectView.as_view(
pattern_name="authentik_flows:default-invalidation"
)
handler400 = error.BadRequestView.as_view() handler400 = error.BadRequestView.as_view()
handler403 = error.ForbiddenView.as_view() handler403 = error.ForbiddenView.as_view()
@ -54,7 +45,6 @@ for _authentik_app in get_apps():
) )
urlpatterns += [ urlpatterns += [
path("administration/django/", admin.site.urls),
path("metrics/", MetricsView.as_view(), name="metrics"), path("metrics/", MetricsView.as_view(), name="metrics"),
path("-/health/live/", LiveView.as_view(), name="health-live"), path("-/health/live/", LiveView.as_view(), name="health-live"),
path("-/health/ready/", ReadyView.as_view(), name="health-ready"), path("-/health/ready/", ReadyView.as_view(), name="health-ready"),

View file

@ -8,11 +8,11 @@ from rest_framework.decorators import action
from rest_framework.fields import DateTimeField from rest_framework.fields import DateTimeField
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from authentik.core.api.propertymappings import PropertyMappingSerializer
from authentik.core.api.sources import SourceSerializer from authentik.core.api.sources import SourceSerializer
from authentik.core.api.utils import MetaNameSerializer, PassiveSerializer from authentik.core.api.utils import PassiveSerializer
from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
@ -70,18 +70,13 @@ class LDAPSourceViewSet(ModelViewSet):
) )
class LDAPPropertyMappingSerializer(ModelSerializer, MetaNameSerializer): class LDAPPropertyMappingSerializer(PropertyMappingSerializer):
"""LDAP PropertyMapping Serializer""" """LDAP PropertyMapping Serializer"""
class Meta: class Meta:
model = LDAPPropertyMapping model = LDAPPropertyMapping
fields = [ fields = PropertyMappingSerializer.Meta.fields + [
"pk",
"name",
"expression",
"object_field", "object_field",
"verbose_name",
"verbose_name_plural",
] ]

Some files were not shown because too many files have changed in this diff Show more