saml_idp: cleanup urls, codex -> utils, remove registry

This commit is contained in:
Jens Langhammer 2018-12-26 17:21:20 +01:00
parent 2eae37107d
commit 60d4a30992
No known key found for this signature in database
GPG key ID: BEBC05297D92821B
12 changed files with 166 additions and 111 deletions

View file

@ -7,7 +7,7 @@ from logging import getLogger
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from passbook.lib.config import CONFIG from passbook.lib.config import CONFIG
from passbook.saml_idp import codex, exceptions, xml_render from passbook.saml_idp import exceptions, utils, xml_render
MINUTES = 60 MINUTES = 60
HOURS = 60 * MINUTES HOURS = 60 * MINUTES
@ -110,7 +110,7 @@ class Processor:
def _decode_request(self): def _decode_request(self):
"""Decodes _request_xml from _saml_request.""" """Decodes _request_xml from _saml_request."""
self._request_xml = codex.decode_base64_and_inflate(self._saml_request).decode('utf-8') self._request_xml = utils.decode_base64_and_inflate(self._saml_request).decode('utf-8')
self._logger.debug('SAML request decoded') self._logger.debug('SAML request decoded')
@ -140,7 +140,7 @@ class Processor:
def _encode_response(self): def _encode_response(self):
"""Encodes _response_xml to _encoded_xml.""" """Encodes _response_xml to _encoded_xml."""
self._saml_response = codex.nice64(str.encode(self._response_xml)) self._saml_response = utils.nice64(str.encode(self._response_xml))
def _extract_saml_request(self): def _extract_saml_request(self):
"""Retrieves the _saml_request AuthnRequest from the _django_request.""" """Retrieves the _saml_request AuthnRequest from the _django_request."""
@ -187,7 +187,7 @@ class Processor:
'acs_url': self._request_params['ACS_URL'], 'acs_url': self._request_params['ACS_URL'],
'saml_response': self._saml_response, 'saml_response': self._saml_response,
'relay_state': self._relay_state, 'relay_state': self._relay_state,
'autosubmit': self._remote.application, 'autosubmit': self._remote.application.skip_authorization,
} }
def _parse_request(self): def _parse_request(self):
@ -295,11 +295,11 @@ class Processor:
# Return proper template params. # Return proper template params.
return self._get_django_response_params() return self._get_django_response_params()
def init_deep_link(self, request, sp_config, url): def init_deep_link(self, request, url):
"""Initialize this Processor to make an IdP-initiated call to the SP's """Initialize this Processor to make an IdP-initiated call to the SP's
deep-linked URL.""" deep-linked URL."""
self._reset(request, sp_config) self._reset(request)
acs_url = self._remote['acs_url'] acs_url = self._remote.acs_url
# NOTE: The following request params are made up. Some are blank, # NOTE: The following request params are made up. Some are blank,
# because they comes over in the AuthnRequest, but we don't have an # because they comes over in the AuthnRequest, but we don't have an
# AuthnRequest in this case: # AuthnRequest in this case:

View file

@ -1,22 +0,0 @@
"""Wrappers to de/encode and de/inflate strings"""
import base64
import zlib
def decode_base64_and_inflate(b64string):
"""Base64 decode and ZLib decompress b64string"""
decoded_data = base64.b64decode(b64string)
return zlib.decompress(decoded_data, -15)
def deflate_and_base64_encode(string_val):
"""Base64 and ZLib Compress b64string"""
zlibbed_str = zlib.compress(string_val)
compressed_string = zlibbed_str[2:-4]
return base64.b64encode(compressed_string)
def nice64(src):
""" Returns src base64-encoded and formatted nicely for our XML. """
return base64.b64encode(src).decode('utf-8').replace('\n', '')

View file

@ -3,6 +3,7 @@
from django import forms from django import forms
from passbook.saml_idp.models import SAMLProvider, get_provider_choices from passbook.saml_idp.models import SAMLProvider, get_provider_choices
from passbook.saml_idp.utils import CertificateBuilder
class SAMLProviderForm(forms.ModelForm): class SAMLProviderForm(forms.ModelForm):
@ -10,6 +11,13 @@ class SAMLProviderForm(forms.ModelForm):
processor_path = forms.ChoiceField(choices=get_provider_choices(), label='Processor') processor_path = forms.ChoiceField(choices=get_provider_choices(), label='Processor')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
builder = CertificateBuilder()
builder.build()
self.fields['signing_cert'].initial = builder.certificate
self.fields['signing_key'].initial = builder.private_key
class Meta: class Meta:
model = SAMLProvider model = SAMLProvider

View file

@ -1,10 +1,11 @@
"""passbook saml_idp Models""" """passbook saml_idp Models"""
from django.db import models from django.db import models
from django.shortcuts import reverse
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from passbook.core.models import Provider from passbook.core.models import Provider
from passbook.lib.utils.reflection import class_to_path from passbook.lib.utils.reflection import class_to_path, path_to_class
from passbook.saml_idp.base import Processor from passbook.saml_idp.base import Processor
@ -21,14 +22,26 @@ class SAMLProvider(Provider):
signing_key = models.TextField() signing_key = models.TextField()
form = 'passbook.saml_idp.forms.SAMLProviderForm' form = 'passbook.saml_idp.forms.SAMLProviderForm'
_processor = None
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._meta.get_field('processor_path').choices = get_provider_choices() self._meta.get_field('processor_path').choices = get_provider_choices()
@property
def processor(self):
if not self._processor:
self._processor = path_to_class(self.processor_path)(self)
return self._processor
def __str__(self): def __str__(self):
return "SAMLProvider %s (processor=%s)" % (self.name, self.processor_path) return "SAMLProvider %s (processor=%s)" % (self.name, self.processor_path)
def link_download_metadata(self):
"""Get link to download XML metadata for admin interface"""
return reverse('passbook_saml_idp:metadata_xml',
kwargs={'provider_id': self.pk})
class Meta: class Meta:
verbose_name = _('SAML Provider') verbose_name = _('SAML Provider')

View file

@ -1,28 +0,0 @@
"""Registers and loads Processor classes from settings."""
from logging import getLogger
from passbook.lib.utils.reflection import path_to_class
from passbook.saml_idp.exceptions import CannotHandleAssertion
from passbook.saml_idp.models import SAMLProvider
LOGGER = getLogger(__name__)
def get_processor(remote):
"""Get an instance of the processor with config."""
proc = path_to_class(remote.processor_path)
return proc(remote)
def find_processor(request):
"""Returns the Processor instance that is willing to handle this request."""
for remote in SAMLProvider.objects.all():
proc = get_processor(remote)
try:
if proc.can_handle(request):
return proc, remote
except CannotHandleAssertion as exc:
# Log these, but keep looking.
LOGGER.debug('%s %s', proc, exc)
raise CannotHandleAssertion('No Processors to handle this request.')

View file

@ -1,8 +0,0 @@
{% extends "core/base.html" %}
{% comment %}
This is a placeholder template. You can override this saml2idp/base.html
template to make all the saml2idp templates fit better into your site's
look-and-feel. That may be easier than overriding all the saml2idp templates
individually.
{% endcomment %}

View file

@ -1,47 +1,49 @@
{% extends "core/skel.html" %} {% extends "login/base.html" %}
{% load utils %} {% load utils %}
{% load i18n %} {% load i18n %}
{% block title %} {% block title %}
{% title 'SSO - Authorize External Source' %} {% title 'Authorize Application' %}
{% endblock %} {% endblock %}
{% block body %} {% block card %}
<div class="login-wrapper"> <header class="login-pf-header">
<form class="login" method="post"> <h1>{% trans 'Authorize Application' %}</h1>
{% csrf_token %} </header>
<input type="hidden" name="ACSUrl" value="{{ acs_url }}"> {% include 'partials/messages.html' %}
<input type="hidden" name="RelayState" value="{{ relay_state }}" /> <form method="POST" action="{{ acs_url }}">>
<input type="hidden" name="SAMLResponse" value="{{ saml_response }}" /> {% csrf_token %}
<label class="title"> <input type="hidden" name="ACSUrl" value="{{ acs_url }}">
<clr-icon shape="passbook" class="is-info" size="48"></clr-icon> <input type="hidden" name="RelayState" value="{{ relay_state }}" />
{% config 'passbook.branding' %} <input type="hidden" name="SAMLResponse" value="{{ saml_response }}" />
</label> <label class="title">
<label class="subtitle"> <clr-icon shape="passbook" class="is-info" size="48"></clr-icon>
{% trans 'SSO - Authorize External Source' %} {% config 'passbook.branding' %}
</label> </label>
<div class="login-group"> <label class="subtitle">
<p class="subtitle"> {% trans 'SSO - Authorize External Source' %}
{% blocktrans with remote=remote.name %} </label>
You're about to sign into {{ remote }} <div class="login-group">
{% endblocktrans %} <p class="subtitle">
</p> {% blocktrans with remote=remote.name %}
<p> You're about to sign into {{ remote }}
{% blocktrans with user=user %} {% endblocktrans %}
You are logged in as {{ user }}. Not you? </p>
{% endblocktrans %} <p>
<a href="{% url 'account-logout' %}">{% trans 'Logout' %}</a> {% blocktrans with user=user %}
</p> You are logged in as {{ user }}. Not you?
<div class="row"> {% endblocktrans %}
<div class="col-md-6"> <a href="{% url 'passbook_core:auth-logout' %}">{% trans 'Logout' %}</a>
<input class="btn btn-success btn-block" type="submit" value="{% trans "Continue" %}" /> </p>
</div> <div class="row">
<div class="col-md-6"> <div class="col-md-6">
<a href="{% url 'common-index' %}" class="btn btn-outline btn-block">{% trans "Cancel" %}</a> <input class="btn btn-success btn-block" type="submit" value="{% trans "Continue" %}" />
</div> </div>
<div class="col-md-6">
<a href="{% url 'passbook_core:overview' %}" class="btn btn-outline btn-block">{% trans "Cancel" %}</a>
</div> </div>
</div> </div>
</form> </div>
</div> </form>
{% endblock %} {% endblock %}

View file

@ -0,0 +1,87 @@
"""Wrappers to de/encode and de/inflate strings"""
import base64
import datetime
import uuid
import zlib
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
def decode_base64_and_inflate(b64string):
"""Base64 decode and ZLib decompress b64string"""
decoded_data = base64.b64decode(b64string)
return zlib.decompress(decoded_data, -15)
def deflate_and_base64_encode(string_val):
"""Base64 and ZLib Compress b64string"""
zlibbed_str = zlib.compress(string_val)
compressed_string = zlibbed_str[2:-4]
return base64.b64encode(compressed_string)
def nice64(src):
""" Returns src base64-encoded and formatted nicely for our XML. """
return base64.b64encode(src).decode('utf-8').replace('\n', '')
class CertificateBuilder:
"""Build self-signed certificates"""
__public_key = None
__private_key = None
__builder = None
__certificate = None
def __init__(self):
self.__public_key = None
self.__private_key = None
self.__builder = None
self.__certificate = None
def build(self):
one_day = datetime.timedelta(1, 0, 0)
self.__private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.__public_key = self.__private_key.public_key()
self.__builder = \
x509.CertificateBuilder(). \
subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'passbook Self-signed SAML Certificate'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'passbook'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Self-signed'),
])). \
issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'passbook Self-signed SAML Certificate'),
])). \
not_valid_before(datetime.datetime.today() - one_day). \
not_valid_after(datetime.datetime.today() + datetime.timedelta(days=365)). \
serial_number(int(uuid.uuid4())). \
public_key(self.__public_key)
self.__certificate = self.__builder.sign(
private_key=self.__private_key, algorithm=hashes.SHA256(),
backend=default_backend()
)
@property
def private_key(self):
"""Return private key in PEM format"""
return self.__private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode('utf-8')
@property
def certificate(self):
"""Return certificate in PEM format"""
return self.__certificate.public_bytes(
encoding=serialization.Encoding.PEM,
).decode('utf-8')

View file

@ -82,6 +82,6 @@ def get_response_xml(parameters, saml_provider: 'SAMLProvider', assertion_id='')
# LOGGER.debug("Raw response: %s", raw_response) # LOGGER.debug("Raw response: %s", raw_response)
signed = sign_with_signxml( signed = sign_with_signxml(
saml_provider.signing_key, raw_response, [saml_provider.signing_cert], saml_provider.signing_key, raw_response, saml_provider.signing_cert,
reference_uri=assertion_id).decode("utf-8") reference_uri=assertion_id).decode("utf-8")
return signed return signed

View file

@ -4,6 +4,7 @@ from logging import getLogger
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from defusedxml import ElementTree from defusedxml import ElementTree
from lxml import etree
from signxml import XMLSigner from signxml import XMLSigner
from passbook.lib.utils.template import render_to_string from passbook.lib.utils.template import render_to_string
@ -16,9 +17,11 @@ def sign_with_signxml(private_key, data, cert, reference_uri=None):
key = serialization.load_pem_private_key( key = serialization.load_pem_private_key(
str.encode('\n'.join([x.strip() for x in private_key.split('\n')])), str.encode('\n'.join([x.strip() for x in private_key.split('\n')])),
password=None, backend=default_backend()) password=None, backend=default_backend())
root = ElementTree.fromstring(data) root = etree.fromstring(data)
# root = ElementTree.fromstring(data, forbid_entities=False)
signer = XMLSigner(c14n_algorithm='http://www.w3.org/2001/10/xml-exc-c14n#') signer = XMLSigner(c14n_algorithm='http://www.w3.org/2001/10/xml-exc-c14n#')
return ElementTree.tostring(signer.sign(root, key=key, cert=cert, reference_uri=reference_uri)) signed = signer.sign(root, key=key, cert=cert, reference_uri=reference_uri)
return ElementTree.tostring(signed)
def get_signature_xml(): def get_signature_xml():

View file

@ -17,7 +17,7 @@ def totp_force_verify(get_response):
user_has_device(request.user) and \ user_has_device(request.user) and \
not request.user.is_verified() and \ not request.user.is_verified() and \
request.path != reverse('passbook_totp:totp-verify') and \ request.path != reverse('passbook_totp:totp-verify') and \
request.path != reverse('account-logout') and \ request.path != reverse('passbook_core:auth-logout') and \
not request.META.get('HTTP_AUTHORIZATION', '').startswith('Bearer'): not request.META.get('HTTP_AUTHORIZATION', '').startswith('Bearer'):
# User has TOTP set up but is not verified # User has TOTP set up but is not verified

View file

@ -50,7 +50,7 @@ def verify(request: HttpRequest) -> HttpResponse:
if 'next' in request.GET: if 'next' in request.GET:
return redirect(request.GET.get('next')) return redirect(request.GET.get('next'))
# Otherwise just index # Otherwise just index
return redirect(reverse('common-index')) return redirect(reverse('passbook_core:overview'))
messages.error(request, _('Invalid 2-Factor Token.')) messages.error(request, _('Invalid 2-Factor Token.'))
else: else:
form = TOTPVerifyForm() form = TOTPVerifyForm()
@ -60,7 +60,7 @@ def verify(request: HttpRequest) -> HttpResponse:
'title': _("SSO - Two-factor verification"), 'title': _("SSO - Two-factor verification"),
'primary_action': _("Verify"), 'primary_action': _("Verify"),
'extra_links': { 'extra_links': {
'account-logout': 'Logout', 'passbook_core:auth-logout': 'Logout',
} }
}) })
@ -100,7 +100,7 @@ def disable(request: HttpRequest) -> HttpResponse:
# current=True, # current=True,
# request=request, # request=request,
# send_notification=True) # send_notification=True)
return redirect(reverse('common-index')) return redirect(reverse('passbook_core:overview'))
# # pylint: disable=too-many-ancestors # # pylint: disable=too-many-ancestors
@ -126,7 +126,7 @@ def disable(request: HttpRequest) -> HttpResponse:
# finished_static_devices = StaticDevice.objects.filter(user=request.user, confirmed=True) # finished_static_devices = StaticDevice.objects.filter(user=request.user, confirmed=True)
# if finished_totp_devices.exists() or finished_static_devices.exists(): # if finished_totp_devices.exists() or finished_static_devices.exists():
# messages.error(request, _('You already have TOTP enabled!')) # messages.error(request, _('You already have TOTP enabled!'))
# return redirect(reverse('common-index')) # return redirect(reverse('passbook_core:overview'))
# # Check if there's an unconfirmed device left to set up # # Check if there's an unconfirmed device left to set up
# totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=False) # totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=False)
# if not totp_devices.exists(): # if not totp_devices.exists():