This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/passbook/core/models.py

357 lines
11 KiB
Python
Raw Normal View History

2018-11-11 12:41:48 +00:00
"""passbook core models"""
from datetime import timedelta
2020-09-24 13:45:58 +00:00
from typing import Any, Dict, Optional, Type
from uuid import uuid4
2018-11-11 12:41:48 +00:00
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import UserManager as DjangoUserManager
2018-11-11 12:41:48 +00:00
from django.db import models
2020-07-05 21:14:57 +00:00
from django.db.models import Q, QuerySet
from django.forms import ModelForm
from django.http import HttpRequest
2020-09-25 21:59:06 +00:00
from django.utils.functional import cached_property
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from guardian.mixins import GuardianUserMixin
2018-11-16 08:10:35 +00:00
from model_utils.managers import InheritanceManager
2019-10-01 08:24:10 +00:00
from structlog import get_logger
2018-11-11 12:41:48 +00:00
from passbook.core.exceptions import PropertyMappingExpressionException
from passbook.core.signals import password_changed
2020-02-20 16:23:05 +00:00
from passbook.core.types import UILoginButton, UIUserSettings
WIP Use Flows for Sources and Providers (#32) * core: start migrating to flows for authorisation * sources/oauth: start type-hinting * core: create default user * core: only show user delete button if an unenrollment flow exists * flows: Correctly check initial policies on flow with context * policies: add more verbosity to engine * sources/oauth: migrate to flows * sources/oauth: fix typing errors * flows: add more tests * sources/oauth: start implementing unittests * sources/ldap: add option to disable user sync, move connection init to model * sources/ldap: re-add default PropertyMappings * providers/saml: re-add default PropertyMappings * admin: fix missing stage count * stages/identification: fix sources not being shown * crypto: fix being unable to save with private key * crypto: re-add default self-signed keypair * policies: rewrite cache_key to prevent wrong cache * sources/saml: migrate to flows for auth and enrollment * stages/consent: add new stage * admin: fix PropertyMapping widget not rendering properly * core: provider.authorization_flow is mandatory * flows: add support for "autosubmit" attribute on form * flows: add InMemoryStage for dynamic stages * flows: optionally allow empty flows from FlowPlanner * providers/saml: update to authorization_flow * sources/*: fix flow executor URL * flows: fix pylint error * flows: wrap responses in JSON object to easily handle redirects * flow: dont cache plan's context * providers/oauth: rewrite OAuth2 Provider to use flows * providers/*: update docstrings of models * core: fix forms not passing help_text through safe * flows: fix HttpResponses not being converted to JSON * providers/oidc: rewrite to use flows * flows: fix linting
2020-06-07 14:35:08 +00:00
from passbook.flows.models import Flow
from passbook.lib.models import CreatedUpdatedModel
from passbook.policies.models import PolicyBindingModel
2018-11-11 12:41:48 +00:00
LOGGER = get_logger()
PASSBOOK_USER_DEBUG = "passbook_user_debug"
2018-11-11 12:41:48 +00:00
2020-05-16 14:11:53 +00:00
def default_token_duration():
"""Default duration a Token is valid"""
return now() + timedelta(minutes=30)
2019-12-31 11:51:16 +00:00
class Group(models.Model):
"""Custom Group model which supports a basic hierarchy"""
group_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
2019-12-31 11:51:16 +00:00
name = models.CharField(_("name"), max_length=80)
is_superuser = models.BooleanField(
default=False, help_text=_("Users added to this group will be superusers.")
)
2019-12-31 11:51:16 +00:00
parent = models.ForeignKey(
"Group",
blank=True,
null=True,
on_delete=models.SET_NULL,
related_name="children",
)
2020-08-15 19:04:22 +00:00
attributes = models.JSONField(default=dict, blank=True)
def __str__(self):
2019-10-07 14:33:48 +00:00
return f"Group {self.name}"
class Meta:
2020-09-30 17:34:22 +00:00
unique_together = (
(
"name",
"parent",
),
)
2019-12-31 11:51:16 +00:00
class UserManager(DjangoUserManager):
"""Custom user manager that doesn't assign is_superuser and is_staff"""
def create_user(self, username, email=None, password=None, **extra_fields):
"""Custom user manager that doesn't assign is_superuser and is_staff"""
return self._create_user(username, email, password, **extra_fields)
class User(GuardianUserMixin, AbstractUser):
2018-11-11 12:41:48 +00:00
"""Custom User model to allow easier adding o f user-based settings"""
uuid = models.UUIDField(default=uuid4, editable=False)
name = models.TextField(help_text=_("User's display name."))
sources = models.ManyToManyField("Source", through="UserSourceConnection")
pb_groups = models.ManyToManyField("Group", related_name="users")
password_change_date = models.DateTimeField(auto_now_add=True)
2020-08-15 19:04:22 +00:00
attributes = models.JSONField(default=dict, blank=True)
objects = UserManager()
2020-09-24 13:45:58 +00:00
def group_attributes(self) -> Dict[str, Any]:
"""Get a dictionary containing the attributes from all groups the user belongs to,
including the users attributes"""
2020-09-24 13:45:58 +00:00
final_attributes = {}
for group in self.pb_groups.all().order_by("name"):
final_attributes.update(group.attributes)
final_attributes.update(self.attributes)
2020-09-24 13:45:58 +00:00
return final_attributes
2020-09-25 21:59:06 +00:00
@cached_property
def is_superuser(self) -> bool:
"""Get supseruser status based on membership in a group with superuser status"""
return self.pb_groups.filter(is_superuser=True).exists()
@property
def is_staff(self) -> bool:
"""superuser == staff user"""
2020-09-25 21:59:06 +00:00
return self.is_superuser # type: ignore
def set_password(self, password, signal=True):
if self.pk and signal:
password_changed.send(sender=self, user=self, password=password)
self.password_change_date = now()
return super().set_password(password)
2018-11-16 10:41:14 +00:00
class Meta:
permissions = (
("reset_user_password", "Reset Password"),
("impersonate", "Can impersonate other users"),
)
verbose_name = _("User")
verbose_name_plural = _("Users")
2019-12-31 11:51:16 +00:00
class Provider(models.Model):
"""Application-independent Provider instance. For example SAML2 Remote, OAuth2 Application"""
2020-10-03 18:01:10 +00:00
name = models.TextField()
WIP Use Flows for Sources and Providers (#32) * core: start migrating to flows for authorisation * sources/oauth: start type-hinting * core: create default user * core: only show user delete button if an unenrollment flow exists * flows: Correctly check initial policies on flow with context * policies: add more verbosity to engine * sources/oauth: migrate to flows * sources/oauth: fix typing errors * flows: add more tests * sources/oauth: start implementing unittests * sources/ldap: add option to disable user sync, move connection init to model * sources/ldap: re-add default PropertyMappings * providers/saml: re-add default PropertyMappings * admin: fix missing stage count * stages/identification: fix sources not being shown * crypto: fix being unable to save with private key * crypto: re-add default self-signed keypair * policies: rewrite cache_key to prevent wrong cache * sources/saml: migrate to flows for auth and enrollment * stages/consent: add new stage * admin: fix PropertyMapping widget not rendering properly * core: provider.authorization_flow is mandatory * flows: add support for "autosubmit" attribute on form * flows: add InMemoryStage for dynamic stages * flows: optionally allow empty flows from FlowPlanner * providers/saml: update to authorization_flow * sources/*: fix flow executor URL * flows: fix pylint error * flows: wrap responses in JSON object to easily handle redirects * flow: dont cache plan's context * providers/oauth: rewrite OAuth2 Provider to use flows * providers/*: update docstrings of models * core: fix forms not passing help_text through safe * flows: fix HttpResponses not being converted to JSON * providers/oidc: rewrite to use flows * flows: fix linting
2020-06-07 14:35:08 +00:00
authorization_flow = models.ForeignKey(
Flow,
on_delete=models.CASCADE,
help_text=_("Flow used when authorizing this provider."),
related_name="provider_authorization",
)
property_mappings = models.ManyToManyField(
"PropertyMapping", default=None, blank=True
)
objects = InheritanceManager()
@property
def launch_url(self) -> Optional[str]:
"""URL to this provider and initiate authorization for the user.
Can return None for providers that are not URL-based"""
return None
@property
def form(self) -> Type[ModelForm]:
"""Return Form class used to edit this object"""
raise NotImplementedError
def __str__(self):
2020-10-03 18:01:10 +00:00
return self.name
class Application(PolicyBindingModel):
2018-11-11 12:41:48 +00:00
"""Every Application which uses passbook for authentication/identification/authorization
needs an Application record. Other authentication types can subclass this Model to
add custom fields and other properties"""
name = models.TextField(help_text=_("Application's display Name."))
slug = models.SlugField(help_text=_("Internal application name, used in URLs."))
provider = models.OneToOneField(
"Provider", null=True, blank=True, default=None, on_delete=models.SET_DEFAULT
2019-12-31 11:51:16 +00:00
)
meta_launch_url = models.URLField(default="", blank=True)
meta_icon_url = models.TextField(default="", blank=True)
meta_description = models.TextField(default="", blank=True)
meta_publisher = models.TextField(default="", blank=True)
2018-11-11 12:41:48 +00:00
def get_launch_url(self) -> Optional[str]:
"""Get launch URL if set, otherwise attempt to get launch URL based on provider."""
if self.meta_launch_url:
return self.meta_launch_url
if self.provider:
return self.get_provider().launch_url
return None
def get_provider(self) -> Optional[Provider]:
"""Get casted provider instance"""
if not self.provider:
return None
return Provider.objects.get_subclass(pk=self.provider.pk)
2018-11-11 12:41:48 +00:00
def __str__(self):
return self.name
class Meta:
verbose_name = _("Application")
verbose_name_plural = _("Applications")
2019-10-07 14:33:48 +00:00
class Source(PolicyBindingModel):
2018-11-11 12:41:48 +00:00
"""Base Authentication source, i.e. an OAuth Provider, SAML Remote or LDAP Server"""
name = models.TextField(help_text=_("Source's display Name."))
2020-07-09 20:57:27 +00:00
slug = models.SlugField(
help_text=_("Internal source name, used in URLs."), unique=True
)
2018-11-11 12:41:48 +00:00
enabled = models.BooleanField(default=True)
2019-12-31 11:51:16 +00:00
property_mappings = models.ManyToManyField(
"PropertyMapping", default=None, blank=True
)
2018-11-11 12:41:48 +00:00
WIP Use Flows for Sources and Providers (#32) * core: start migrating to flows for authorisation * sources/oauth: start type-hinting * core: create default user * core: only show user delete button if an unenrollment flow exists * flows: Correctly check initial policies on flow with context * policies: add more verbosity to engine * sources/oauth: migrate to flows * sources/oauth: fix typing errors * flows: add more tests * sources/oauth: start implementing unittests * sources/ldap: add option to disable user sync, move connection init to model * sources/ldap: re-add default PropertyMappings * providers/saml: re-add default PropertyMappings * admin: fix missing stage count * stages/identification: fix sources not being shown * crypto: fix being unable to save with private key * crypto: re-add default self-signed keypair * policies: rewrite cache_key to prevent wrong cache * sources/saml: migrate to flows for auth and enrollment * stages/consent: add new stage * admin: fix PropertyMapping widget not rendering properly * core: provider.authorization_flow is mandatory * flows: add support for "autosubmit" attribute on form * flows: add InMemoryStage for dynamic stages * flows: optionally allow empty flows from FlowPlanner * providers/saml: update to authorization_flow * sources/*: fix flow executor URL * flows: fix pylint error * flows: wrap responses in JSON object to easily handle redirects * flow: dont cache plan's context * providers/oauth: rewrite OAuth2 Provider to use flows * providers/*: update docstrings of models * core: fix forms not passing help_text through safe * flows: fix HttpResponses not being converted to JSON * providers/oidc: rewrite to use flows * flows: fix linting
2020-06-07 14:35:08 +00:00
authentication_flow = models.ForeignKey(
Flow,
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
help_text=_("Flow to use when authenticating existing users."),
related_name="source_authentication",
)
enrollment_flow = models.ForeignKey(
Flow,
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
help_text=_("Flow to use when enrolling new users."),
related_name="source_enrollment",
)
2018-11-16 08:10:35 +00:00
objects = InheritanceManager()
@property
def form(self) -> Type[ModelForm]:
"""Return Form class used to edit this object"""
raise NotImplementedError
@property
def ui_login_button(self) -> Optional[UILoginButton]:
"""If source uses a http-based flow, return UI Information about the login
button. If source doesn't use http-based flow, return None."""
return None
@property
def ui_additional_info(self) -> Optional[str]:
"""Return additional Info, such as a callback URL. Show in the administration interface."""
return None
@property
def ui_user_settings(self) -> Optional[UIUserSettings]:
"""Entrypoint to integrate with User settings. Can either return None if no
user settings are available, or an instanace of UIUserSettings."""
return None
2019-03-13 15:49:30 +00:00
2018-11-11 12:41:48 +00:00
def __str__(self):
return self.name
2019-10-07 14:33:48 +00:00
class UserSourceConnection(CreatedUpdatedModel):
"""Connection between User and Source."""
2018-11-11 12:41:48 +00:00
user = models.ForeignKey(User, on_delete=models.CASCADE)
source = models.ForeignKey(Source, on_delete=models.CASCADE)
2018-11-11 12:41:48 +00:00
class Meta:
2018-11-11 12:41:48 +00:00
unique_together = (("user", "source"),)
class ExpiringModel(models.Model):
"""Base Model which can expire, and is automatically cleaned up."""
expires = models.DateTimeField(default=default_token_duration)
expiring = models.BooleanField(default=True)
@classmethod
def filter_not_expired(cls, **kwargs) -> QuerySet:
"""Filer for tokens which are not expired yet or are not expiring,
and match filters in `kwargs`"""
query = Q(**kwargs)
query_not_expired_yet = Q(expires__lt=now(), expiring=True)
query_not_expiring = Q(expiring=False)
return cls.objects.filter(query & (query_not_expired_yet | query_not_expiring))
@property
def is_expired(self) -> bool:
"""Check if token is expired yet."""
return now() > self.expires
class Meta:
abstract = True
2020-07-05 21:14:57 +00:00
class TokenIntents(models.TextChoices):
"""Intents a Token can be created for."""
# Single user token
INTENT_VERIFICATION = "verification"
# Allow access to API
INTENT_API = "api"
class Token(ExpiringModel):
2020-07-05 21:14:57 +00:00
"""Token used to authenticate the User for API Access or confirm another Stage like Email."""
token_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
2020-07-05 21:14:57 +00:00
intent = models.TextField(
choices=TokenIntents.choices, default=TokenIntents.INTENT_VERIFICATION
)
2020-05-16 14:11:53 +00:00
user = models.ForeignKey("User", on_delete=models.CASCADE, related_name="+")
description = models.TextField(default="", blank=True)
def __str__(self):
return (
2020-07-05 21:14:57 +00:00
f"Token {self.token_uuid.hex} {self.description} (expires={self.expires})"
)
class Meta:
2020-05-16 14:11:53 +00:00
verbose_name = _("Token")
verbose_name_plural = _("Tokens")
class PropertyMapping(models.Model):
"""User-defined key -> x mapping which can be used by providers to expose extra data."""
pm_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField()
expression = models.TextField()
objects = InheritanceManager()
@property
def form(self) -> Type[ModelForm]:
"""Return Form class used to edit this object"""
raise NotImplementedError
def evaluate(
self, user: Optional[User], request: Optional[HttpRequest], **kwargs
) -> Any:
"""Evaluate `self.expression` using `**kwargs` as Context."""
from passbook.core.expression import PropertyMappingEvaluator
evaluator = PropertyMappingEvaluator()
evaluator.set_context(user, request, **kwargs)
try:
return evaluator.evaluate(self.expression)
except (ValueError, SyntaxError) as exc:
raise PropertyMappingExpressionException from exc
def __str__(self):
2019-10-07 14:33:48 +00:00
return f"Property Mapping {self.name}"
class Meta:
2019-12-31 11:51:16 +00:00
verbose_name = _("Property Mapping")
verbose_name_plural = _("Property Mappings")