*/saml: start implementing unittests, fix signing
This commit is contained in:
parent
2056b86ce7
commit
a393097504
|
@ -1,7 +1,6 @@
|
||||||
"""SAML Assertion generator"""
|
"""SAML Assertion generator"""
|
||||||
from types import GeneratorType
|
from types import GeneratorType
|
||||||
|
|
||||||
from defusedxml import ElementTree
|
|
||||||
from django.http import HttpRequest
|
from django.http import HttpRequest
|
||||||
from lxml import etree # nosec
|
from lxml import etree # nosec
|
||||||
from lxml.etree import Element, SubElement # nosec
|
from lxml.etree import Element, SubElement # nosec
|
||||||
|
@ -212,4 +211,4 @@ class AssertionProcessor:
|
||||||
signed, x509_cert=self.provider.signing_kp.certificate_data
|
signed, x509_cert=self.provider.signing_kp.certificate_data
|
||||||
)
|
)
|
||||||
return etree.tostring(signed).decode("utf-8") # nosec
|
return etree.tostring(signed).decode("utf-8") # nosec
|
||||||
return ElementTree.tostring(root_response).decode()
|
return etree.tostring(root_response).decode("utf-8") # nosec
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
"""SAML AuthNRequest Parser and dataclass"""
|
"""SAML AuthNRequest Parser and dataclass"""
|
||||||
from typing import Optional
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from cryptography.exceptions import InvalidSignature
|
from cryptography.exceptions import InvalidSignature
|
||||||
from defusedxml import ElementTree
|
from defusedxml import ElementTree
|
||||||
|
@ -9,7 +9,6 @@ from structlog import get_logger
|
||||||
|
|
||||||
from passbook.providers.saml.exceptions import CannotHandleAssertion
|
from passbook.providers.saml.exceptions import CannotHandleAssertion
|
||||||
from passbook.providers.saml.models import SAMLProvider
|
from passbook.providers.saml.models import SAMLProvider
|
||||||
from passbook.providers.saml.utils import get_random_id
|
|
||||||
from passbook.providers.saml.utils.encoding import decode_base64_and_inflate
|
from passbook.providers.saml.utils.encoding import decode_base64_and_inflate
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
|
@ -1,15 +1,20 @@
|
||||||
"""SAML AuthnRequest Processor"""
|
"""SAML AuthnRequest Processor"""
|
||||||
from defusedxml import ElementTree
|
from typing import Dict
|
||||||
|
|
||||||
from django.http import HttpRequest
|
from django.http import HttpRequest
|
||||||
|
from lxml import etree # nosec
|
||||||
from lxml.etree import Element # nosec
|
from lxml.etree import Element # nosec
|
||||||
|
from signxml import XMLSigner, methods
|
||||||
|
|
||||||
from passbook.providers.saml.utils import get_random_id
|
from passbook.providers.saml.utils import get_random_id
|
||||||
|
from passbook.providers.saml.utils.encoding import deflate_and_base64_encode
|
||||||
from passbook.providers.saml.utils.time import get_time_string
|
from passbook.providers.saml.utils.time import get_time_string
|
||||||
from passbook.sources.saml.models import SAMLSource
|
from passbook.sources.saml.models import SAMLSource
|
||||||
from passbook.sources.saml.processors.constants import (
|
from passbook.sources.saml.processors.constants import (
|
||||||
NS_MAP,
|
NS_MAP,
|
||||||
NS_SAML_ASSERTION,
|
NS_SAML_ASSERTION,
|
||||||
NS_SAML_PROTOCOL,
|
NS_SAML_PROTOCOL,
|
||||||
|
NS_SIGNATURE,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,9 +24,14 @@ class RequestProcessor:
|
||||||
source: SAMLSource
|
source: SAMLSource
|
||||||
http_request: HttpRequest
|
http_request: HttpRequest
|
||||||
|
|
||||||
|
request_id: str
|
||||||
|
issue_instant: str
|
||||||
|
|
||||||
def __init__(self, source: SAMLSource, request: HttpRequest):
|
def __init__(self, source: SAMLSource, request: HttpRequest):
|
||||||
self.source = source
|
self.source = source
|
||||||
self.http_request = request
|
self.http_request = request
|
||||||
|
self.request_id = get_random_id()
|
||||||
|
self.issue_instant = get_time_string()
|
||||||
|
|
||||||
def get_issuer(self) -> Element:
|
def get_issuer(self) -> Element:
|
||||||
"""Get Issuer Element"""
|
"""Get Issuer Element"""
|
||||||
|
@ -35,19 +45,60 @@ class RequestProcessor:
|
||||||
name_id_policy.text = self.source.name_id_policy
|
name_id_policy.text = self.source.name_id_policy
|
||||||
return name_id_policy
|
return name_id_policy
|
||||||
|
|
||||||
def build_auth_n(self) -> str:
|
def get_auth_n(self) -> Element:
|
||||||
"""Get full AuthnRequest"""
|
"""Get full AuthnRequest"""
|
||||||
auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP)
|
auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP)
|
||||||
auth_n_request.attrib[
|
auth_n_request.attrib[
|
||||||
"AssertionConsumerServiceURL"
|
"AssertionConsumerServiceURL"
|
||||||
] = self.source.build_full_url(self.http_request)
|
] = self.source.build_full_url(self.http_request)
|
||||||
auth_n_request.attrib["Destination"] = self.source.sso_url
|
auth_n_request.attrib["Destination"] = self.source.sso_url
|
||||||
auth_n_request.attrib["ID"] = get_random_id()
|
auth_n_request.attrib["ID"] = self.request_id
|
||||||
auth_n_request.attrib["IssueInstant"] = get_time_string()
|
auth_n_request.attrib["IssueInstant"] = self.issue_instant
|
||||||
auth_n_request.attrib["ProtocolBinding"] = self.source.binding_type
|
auth_n_request.attrib["ProtocolBinding"] = self.source.binding_type
|
||||||
auth_n_request.attrib["Version"] = "2.0"
|
auth_n_request.attrib["Version"] = "2.0"
|
||||||
# Create issuer object
|
# Create issuer object
|
||||||
auth_n_request.append(self.get_issuer())
|
auth_n_request.append(self.get_issuer())
|
||||||
# Create NameID Policy Object
|
# Create NameID Policy Object
|
||||||
auth_n_request.append(self.get_name_id_policy())
|
auth_n_request.append(self.get_name_id_policy())
|
||||||
return ElementTree.tostring(auth_n_request).decode()
|
return auth_n_request
|
||||||
|
|
||||||
|
def build_auth_n(self) -> str:
|
||||||
|
"""Get Signed string representation of AuthN Request
|
||||||
|
(used for POST Bindings)"""
|
||||||
|
auth_n_request = self.get_auth_n()
|
||||||
|
|
||||||
|
if self.source.signing_kp:
|
||||||
|
signed_request = XMLSigner().sign(
|
||||||
|
auth_n_request,
|
||||||
|
cert=self.source.signing_kp.certificate_data,
|
||||||
|
key=self.source.signing_kp.key_data,
|
||||||
|
)
|
||||||
|
return etree.tostring(signed_request).decode()
|
||||||
|
|
||||||
|
return etree.tostring(auth_n_request).decode()
|
||||||
|
|
||||||
|
def build_auth_n_detached(self) -> Dict[str, str]:
|
||||||
|
"""Get Dict AuthN Request for Redirect bindings, with detached
|
||||||
|
Signature"""
|
||||||
|
auth_n_request = self.get_auth_n()
|
||||||
|
|
||||||
|
response_dict = {
|
||||||
|
"SAMLRequest": deflate_and_base64_encode(
|
||||||
|
etree.tostring(auth_n_request).decode()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.source.signing_kp:
|
||||||
|
signer = XMLSigner(methods.detached)
|
||||||
|
signature = signer.sign(
|
||||||
|
auth_n_request,
|
||||||
|
cert=self.source.signing_kp.certificate_data,
|
||||||
|
key=self.source.signing_kp.key_data,
|
||||||
|
)
|
||||||
|
signature_value = signature.find(
|
||||||
|
f".//{{{NS_SIGNATURE}}}SignatureValue"
|
||||||
|
).text
|
||||||
|
response_dict["Signature"] = signature_value
|
||||||
|
response_dict["SigAlg"] = signer.sign_alg
|
||||||
|
|
||||||
|
return response_dict
|
||||||
|
|
|
@ -11,7 +11,7 @@ from django.views.decorators.csrf import csrf_exempt
|
||||||
from signxml import InvalidSignature
|
from signxml import InvalidSignature
|
||||||
|
|
||||||
from passbook.lib.views import bad_request_message
|
from passbook.lib.views import bad_request_message
|
||||||
from passbook.providers.saml.utils.encoding import deflate_and_base64_encode, nice64
|
from passbook.providers.saml.utils.encoding import nice64
|
||||||
from passbook.sources.saml.exceptions import (
|
from passbook.sources.saml.exceptions import (
|
||||||
MissingSAMLResponse,
|
MissingSAMLResponse,
|
||||||
UnsupportedNameIDFormat,
|
UnsupportedNameIDFormat,
|
||||||
|
@ -32,16 +32,15 @@ class InitiateView(View):
|
||||||
raise Http404
|
raise Http404
|
||||||
relay_state = request.GET.get("next", "")
|
relay_state = request.GET.get("next", "")
|
||||||
request.session["sso_destination"] = relay_state
|
request.session["sso_destination"] = relay_state
|
||||||
auth_n_req = RequestProcessor(source, request).build_auth_n()
|
auth_n_req = RequestProcessor(source, request)
|
||||||
# If the source is configured for Redirect bindings, we can just redirect there
|
# If the source is configured for Redirect bindings, we can just redirect there
|
||||||
if source.binding_type == SAMLBindingTypes.Redirect:
|
if source.binding_type == SAMLBindingTypes.Redirect:
|
||||||
saml_request = deflate_and_base64_encode(auth_n_req)
|
url_params = auth_n_req.build_auth_n_detached()
|
||||||
url_args = urlencode(
|
url_params["RelayState"] = relay_state
|
||||||
{"SAMLRequest": saml_request, "RelayState": relay_state}
|
url_args = urlencode(url_params)
|
||||||
)
|
|
||||||
return redirect(f"{source.sso_url}?{url_args}")
|
return redirect(f"{source.sso_url}?{url_args}")
|
||||||
# As POST Binding we show a form
|
# As POST Binding we show a form
|
||||||
saml_request = nice64(auth_n_req)
|
saml_request = nice64(auth_n_req.build_auth_n())
|
||||||
if source.binding_type == SAMLBindingTypes.POST:
|
if source.binding_type == SAMLBindingTypes.POST:
|
||||||
return render(
|
return render(
|
||||||
request,
|
request,
|
||||||
|
|
Reference in New Issue