providers/saml: disallow idp-initiated SSO by default and validate Request ID

This commit is contained in:
Jens Langhammer 2020-09-12 00:53:38 +02:00
parent c2ebaa7f64
commit ca0ba85023
10 changed files with 138 additions and 47 deletions

View file

@ -1,10 +1,9 @@
"""test SAML Source"""
from sys import platform
from time import sleep
from typing import Any, Dict, Optional
from unittest.case import skipUnless
from docker import DockerClient, from_env
from docker.models.containers import Container
from docker.types import Healthcheck
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
@ -74,41 +73,27 @@ Sm75WXsflOxuTn08LbgGc4s=
class TestSourceSAML(SeleniumTestCase):
"""test SAML Source flow"""
def setUp(self):
self.container = self.setup_client()
super().setUp()
def setup_client(self) -> Container:
"""Setup test IdP container"""
client: DockerClient = from_env()
container = client.containers.run(
image="kristophjunge/test-saml-idp:1.15",
detach=True,
network_mode="host",
auto_remove=True,
healthcheck=Healthcheck(
def get_container_specs(self) -> Optional[Dict[str, Any]]:
return {
"image": "kristophjunge/test-saml-idp:1.15",
"detach": True,
"network_mode": "host",
"auto_remove": True,
"healthcheck": Healthcheck(
test=["CMD", "curl", "http://localhost:8080"],
interval=5 * 100 * 1000000,
start_period=1 * 100 * 1000000,
),
environment={
"environment": {
"SIMPLESAMLPHP_SP_ENTITY_ID": "entity-id",
"SIMPLESAMLPHP_SP_ASSERTION_CONSUMER_SERVICE": (
f"{self.live_server_url}/source/saml/saml-idp-test/acs/"
),
},
)
while True:
container.reload()
status = container.attrs.get("State", {}).get("Health", {}).get("Status")
if status == "healthy":
return container
LOGGER.info("Container failed healthcheck")
sleep(1)
}
def test_idp_redirect(self):
"""test SAML Source With redirect binding"""
sleep(1)
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
@ -160,7 +145,6 @@ class TestSourceSAML(SeleniumTestCase):
def test_idp_post(self):
"""test SAML Source With post binding"""
sleep(1)
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
@ -214,7 +198,6 @@ class TestSourceSAML(SeleniumTestCase):
def test_idp_post_auto(self):
"""test SAML Source With post binding (auto redirect)"""
sleep(1)
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")

View file

@ -1,13 +1,22 @@
"""Test AuthN Request generator and parser"""
from django.contrib.sessions.middleware import SessionMiddleware
from django.http.request import QueryDict
from django.test import RequestFactory, TestCase
from guardian.utils import get_anonymous_user
from passbook.crypto.models import CertificateKeyPair
from passbook.flows.models import Flow
from passbook.providers.saml.models import SAMLProvider
from passbook.providers.saml.processors.assertion import AssertionProcessor
from passbook.providers.saml.processors.request_parser import AuthNRequestParser
from passbook.providers.saml.utils.encoding import deflate_and_base64_encode
from passbook.sources.saml.exceptions import MismatchedRequestID
from passbook.sources.saml.models import SAMLSource
from passbook.sources.saml.processors.request import RequestProcessor
from passbook.sources.saml.processors.request import (
SESSION_REQUEST_ID,
RequestProcessor,
)
from passbook.sources.saml.processors.response import ResponseProcessor
class TestAuthNRequest(TestCase):
@ -31,6 +40,11 @@ class TestAuthNRequest(TestCase):
def test_signed_valid(self):
"""Test generated AuthNRequest with valid signature"""
http_request = self.factory.get("/")
middleware = SessionMiddleware()
middleware.process_request(http_request)
http_request.session.save()
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
@ -44,6 +58,11 @@ class TestAuthNRequest(TestCase):
def test_signed_valid_detached(self):
"""Test generated AuthNRequest with valid signature (detached)"""
http_request = self.factory.get("/")
middleware = SessionMiddleware()
middleware.process_request(http_request)
http_request.session.save()
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
params = request_proc.build_auth_n_detached()
@ -53,3 +72,37 @@ class TestAuthNRequest(TestCase):
)
self.assertEqual(parsed_request.id, request_proc.request_id)
self.assertEqual(parsed_request.relay_state, "test_state")
def test_request_id_invalid(self):
"""Test generated AuthNRequest with invalid request ID"""
http_request = self.factory.get("/")
http_request.user = get_anonymous_user()
middleware = SessionMiddleware()
middleware.process_request(http_request)
http_request.session.save()
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# change the request ID
http_request.session[SESSION_REQUEST_ID] = "test"
http_request.session.save()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
deflate_and_base64_encode(request), "test_state"
)
# Now create a response and convert it to string (provider)
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Now parse the response (source)
http_request.POST = QueryDict(mutable=True)
http_request.POST["SAMLResponse"] = deflate_and_base64_encode(response)
response_parser = ResponseProcessor(self.source)
with self.assertRaises(MismatchedRequestID):
response_parser.parse(http_request)

View file

@ -15,9 +15,10 @@ class SAMLSourceSerializer(ModelSerializer):
fields = SOURCE_FORM_FIELDS + [
"issuer",
"sso_url",
"slo_url",
"allow_idp_initiated",
"name_id_policy",
"binding_type",
"slo_url",
"temporary_user_delete_after",
"signing_kp",
]

View file

@ -8,3 +8,7 @@ class MissingSAMLResponse(SentryIgnoredException):
class UnsupportedNameIDFormat(SentryIgnoredException):
"""Exception raised when SAML Response contains NameID Format not supported."""
class MismatchedRequestID(SentryIgnoredException):
"""Exception raised when the returned request ID doesn't match the saved ID."""

View file

@ -30,9 +30,10 @@ class SAMLSourceForm(forms.ModelForm):
fields = SOURCE_FORM_FIELDS + [
"issuer",
"sso_url",
"name_id_policy",
"binding_type",
"slo_url",
"binding_type",
"name_id_policy",
"allow_idp_initiated",
"temporary_user_delete_after",
"signing_kp",
]

View file

@ -0,0 +1,21 @@
# Generated by Django 3.1.1 on 2020-09-11 22:14
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("passbook_sources_saml", "0005_samlsource_name_id_policy"),
]
operations = [
migrations.AddField(
model_name="samlsource",
name="allow_idp_initiated",
field=models.BooleanField(
default=False,
help_text="Allows authentication flows initiated by the IdP. This can be a security risk, as no validation of the request ID is done.",
),
),
]

View file

@ -53,6 +53,21 @@ class SAMLSource(Source):
verbose_name=_("SSO URL"),
help_text=_("URL that the initial Login request is sent to."),
)
slo_url = models.URLField(
default=None,
blank=True,
null=True,
verbose_name=_("SLO URL"),
help_text=_("Optional URL if your IDP supports Single-Logout."),
)
allow_idp_initiated = models.BooleanField(
default=False,
help_text=_(
"Allows authentication flows initiated by the IdP. This can be a security risk, "
"as no validation of the request ID is done."
),
)
name_id_policy = models.TextField(
choices=SAMLNameIDPolicy.choices,
default=SAMLNameIDPolicy.TRANSIENT,
@ -66,14 +81,6 @@ class SAMLSource(Source):
default=SAMLBindingTypes.Redirect,
)
slo_url = models.URLField(
default=None,
blank=True,
null=True,
verbose_name=_("SLO URL"),
help_text=_("Optional URL if your IDP supports Single-Logout."),
)
temporary_user_delete_after = models.TextField(
default="days=1",
verbose_name=_("Delete temporary users after"),

View file

@ -20,6 +20,8 @@ from passbook.sources.saml.processors.constants import (
NS_SAML_PROTOCOL,
)
SESSION_REQUEST_ID = "passbook_source_saml_request_id"
class RequestProcessor:
"""SAML AuthnRequest Processor"""
@ -37,6 +39,7 @@ class RequestProcessor:
self.http_request = request
self.relay_state = relay_state
self.request_id = get_random_id()
self.http_request.session[SESSION_REQUEST_ID] = self.request_id
self.issue_instant = get_time_string()
def get_issuer(self) -> Element:

View file

@ -18,6 +18,7 @@ from passbook.lib.utils.urls import redirect_with_qs
from passbook.policies.utils import delete_none_keys
from passbook.providers.saml.utils.encoding import decode_base64_and_inflate
from passbook.sources.saml.exceptions import (
MismatchedRequestID,
MissingSAMLResponse,
UnsupportedNameIDFormat,
)
@ -29,6 +30,7 @@ from passbook.sources.saml.processors.constants import (
SAML_NAME_ID_FORMAT_WINDOWS,
SAML_NAME_ID_FORMAT_X509,
)
from passbook.sources.saml.processors.request import SESSION_REQUEST_ID
from passbook.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND
from passbook.stages.prompt.stage import PLAN_CONTEXT_PROMPT
@ -59,8 +61,9 @@ class ResponseProcessor:
# Check if response is compressed, b64 decode it
self._root_xml = decode_base64_and_inflate(raw_response)
self._root = ElementTree.fromstring(self._root_xml)
# Verify signed XML
self._verify_signed()
self._verify_request_id(request)
def _verify_signed(self):
"""Verify SAML Response's Signature"""
@ -70,6 +73,16 @@ class ResponseProcessor:
)
LOGGER.debug("Successfully verified signautre")
def _verify_request_id(self, request: HttpRequest):
if self._source.allow_idp_initiated:
return
if SESSION_REQUEST_ID not in request.session or "ID" not in self._root.attrib:
raise MismatchedRequestID(
"Missing request ID and IdP-initiated Logins are not allowed"
)
if request.session[SESSION_REQUEST_ID] != self._root.attrib["ID"]:
raise MismatchedRequestID("Mismatched request ID")
def _handle_name_id_transient(self, request: HttpRequest) -> HttpResponse:
"""Handle a NameID with the Format of Transient. This is a bit more complex than other
formats, as we need to create a temporary User that is used in the session. This

View file

@ -7061,6 +7061,18 @@ definitions:
format: uri
maxLength: 200
minLength: 1
slo_url:
title: SLO URL
description: Optional URL if your IDP supports Single-Logout.
type: string
format: uri
maxLength: 200
x-nullable: true
allow_idp_initiated:
title: Allow idp initiated
description: Allows authentication flows initiated by the IdP. This can be
a security risk, as no validation of the request ID is done.
type: boolean
name_id_policy:
title: Name id policy
description: NameID Policy sent to the IdP. Can be unset, in which case no
@ -7079,13 +7091,6 @@ definitions:
- REDIRECT
- POST
- POST_AUTO
slo_url:
title: SLO URL
description: Optional URL if your IDP supports Single-Logout.
type: string
format: uri
maxLength: 200
x-nullable: true
temporary_user_delete_after:
title: Delete temporary users after
description: "Time offset when temporary users should be deleted. This only\