sources/saml: add mitigation for idp-initiated requests
This commit is contained in:
parent
7a4e8af1ae
commit
8e6bb48227
|
@ -1,6 +1,7 @@
|
||||||
"""test OAuth Source"""
|
"""test OAuth Source"""
|
||||||
from os.path import abspath
|
from os.path import abspath
|
||||||
from sys import platform
|
from sys import platform
|
||||||
|
from time import sleep
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
from unittest.case import skipUnless
|
from unittest.case import skipUnless
|
||||||
|
|
||||||
|
@ -200,10 +201,11 @@ class TestSourceOAuth(SeleniumTestCase):
|
||||||
(By.CLASS_NAME, "pf-c-login__main-footer-links-item-link")
|
(By.CLASS_NAME, "pf-c-login__main-footer-links-item-link")
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
sleep(1)
|
||||||
self.driver.find_element(
|
self.driver.find_element(
|
||||||
By.CLASS_NAME, "pf-c-login__main-footer-links-item-link"
|
By.CLASS_NAME, "pf-c-login__main-footer-links-item-link"
|
||||||
).click()
|
).click()
|
||||||
|
sleep(1)
|
||||||
# Now we should be at the IDP, wait for the login field
|
# Now we should be at the IDP, wait for the login field
|
||||||
self.wait.until(ec.presence_of_element_located((By.ID, "login")))
|
self.wait.until(ec.presence_of_element_located((By.ID, "login")))
|
||||||
self.driver.find_element(By.ID, "login").send_keys("admin@example.com")
|
self.driver.find_element(By.ID, "login").send_keys("admin@example.com")
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
from typing import TYPE_CHECKING, Dict
|
from typing import TYPE_CHECKING, Dict
|
||||||
|
|
||||||
from defusedxml import ElementTree
|
from defusedxml import ElementTree
|
||||||
|
from django.core.cache import cache
|
||||||
|
from django.core.exceptions import SuspiciousOperation
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
from signxml import XMLVerifier
|
from signxml import XMLVerifier
|
||||||
from structlog import get_logger
|
from structlog import get_logger
|
||||||
|
@ -37,6 +39,8 @@ from passbook.stages.prompt.stage import PLAN_CONTEXT_PROMPT
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from xml.etree.ElementTree import Element # nosec
|
from xml.etree.ElementTree import Element # nosec
|
||||||
|
|
||||||
|
CACHE_SEEN_REQUEST_ID = "passbook_saml_seen_ids_%s"
|
||||||
DEFAULT_BACKEND = "django.contrib.auth.backends.ModelBackend"
|
DEFAULT_BACKEND = "django.contrib.auth.backends.ModelBackend"
|
||||||
|
|
||||||
|
|
||||||
|
@ -75,6 +79,13 @@ class ResponseProcessor:
|
||||||
|
|
||||||
def _verify_request_id(self, request: HttpRequest):
|
def _verify_request_id(self, request: HttpRequest):
|
||||||
if self._source.allow_idp_initiated:
|
if self._source.allow_idp_initiated:
|
||||||
|
# If IdP-initiated SSO flows are enabled, we want to cache the Response ID
|
||||||
|
# somewhat mitigate replay attacks
|
||||||
|
seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
|
||||||
|
if self._root.attrib["ID"] in seen_ids:
|
||||||
|
raise SuspiciousOperation("Replay attack detected")
|
||||||
|
seen_ids.append(self._root.attrib["ID"])
|
||||||
|
cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
|
||||||
return
|
return
|
||||||
if (
|
if (
|
||||||
SESSION_REQUEST_ID not in request.session
|
SESSION_REQUEST_ID not in request.session
|
||||||
|
|
Reference in a new issue