From c05240afbfb5279709649e6a8688a191a4ca11df Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Thu, 22 Jul 2021 10:10:25 +0200 Subject: [PATCH] lib: fix outpost fake-ip not working, add tests Signed-off-by: Jens Langhammer --- authentik/lib/tests/test_http.py | 67 ++++++++++++++++++++++++++++++++ authentik/lib/utils/http.py | 24 +++++++----- 2 files changed, 82 insertions(+), 9 deletions(-) create mode 100644 authentik/lib/tests/test_http.py diff --git a/authentik/lib/tests/test_http.py b/authentik/lib/tests/test_http.py new file mode 100644 index 000000000..0c6399950 --- /dev/null +++ b/authentik/lib/tests/test_http.py @@ -0,0 +1,67 @@ +"""Test HTTP Helpers""" +from django.test import RequestFactory, TestCase + +from authentik.core.models import ( + USER_ATTRIBUTE_CAN_OVERRIDE_IP, + Token, + TokenIntents, + User, +) +from authentik.lib.utils.http import ( + OUTPOST_REMOTE_IP_HEADER, + OUTPOST_TOKEN_HEADER, + get_client_ip, +) + + +class TestHTTP(TestCase): + """Test HTTP Helpers""" + + def setUp(self) -> None: + self.user = User.objects.get(username="akadmin") + self.factory = RequestFactory() + + def test_normal(self): + """Test normal request""" + request = self.factory.get("/") + self.assertEqual(get_client_ip(request), "127.0.0.1") + + def test_forward_for(self): + """Test x-forwarded-for request""" + request = self.factory.get("/", HTTP_X_FORWARDED_FOR="127.0.0.2") + self.assertEqual(get_client_ip(request), "127.0.0.2") + + def test_fake_outpost(self): + """Test faked IP which is overridden by an outpost""" + token = Token.objects.create( + identifier="test", user=self.user, intent=TokenIntents.INTENT_API + ) + # Invalid, non-existant token + request = self.factory.get( + "/", + **{ + OUTPOST_REMOTE_IP_HEADER: "1.2.3.4", + OUTPOST_TOKEN_HEADER: "abc", + }, + ) + self.assertEqual(get_client_ip(request), "127.0.0.1") + # Invalid, user doesn't have permisions + request = self.factory.get( + "/", + **{ + OUTPOST_REMOTE_IP_HEADER: "1.2.3.4", + OUTPOST_TOKEN_HEADER: token.key, + }, + ) + self.assertEqual(get_client_ip(request), "127.0.0.1") + # Valid + self.user.attributes[USER_ATTRIBUTE_CAN_OVERRIDE_IP] = True + self.user.save() + request = self.factory.get( + "/", + **{ + OUTPOST_REMOTE_IP_HEADER: "1.2.3.4", + OUTPOST_TOKEN_HEADER: token.key, + }, + ) + self.assertEqual(get_client_ip(request), "1.2.3.4") diff --git a/authentik/lib/utils/http.py b/authentik/lib/utils/http.py index 779b3626c..ac4b9b5dd 100644 --- a/authentik/lib/utils/http.py +++ b/authentik/lib/utils/http.py @@ -40,24 +40,30 @@ def _get_outpost_override_ip(request: HttpRequest) -> Optional[str]: or OUTPOST_TOKEN_HEADER not in request.META ): return None + fake_ip = request.META[OUTPOST_REMOTE_IP_HEADER] tokens = Token.filter_not_expired( key=request.META.get(OUTPOST_TOKEN_HEADER), intent=TokenIntents.INTENT_API ) if not tokens.exists(): - LOGGER.warning("Attempted remote-ip override without token") + LOGGER.warning("Attempted remote-ip override without token", fake_ip=fake_ip) return None user = tokens.first().user - if user.group_attributes().get(USER_ATTRIBUTE_CAN_OVERRIDE_IP, False): + if not user.group_attributes().get(USER_ATTRIBUTE_CAN_OVERRIDE_IP, False): + LOGGER.warning( + "Remote-IP override: user doesn't have permission", + user=user, + fake_ip=fake_ip, + ) return None - return request.META[OUTPOST_REMOTE_IP_HEADER] + return fake_ip def get_client_ip(request: Optional[HttpRequest]) -> str: """Attempt to get the client's IP by checking common HTTP Headers. Returns none if no IP Could be found""" - if request: - override = _get_outpost_override_ip(request) - if override: - return override - return _get_client_ip_from_meta(request.META) - return DEFAULT_IP + if not request: + return DEFAULT_IP + override = _get_outpost_override_ip(request) + if override: + return override + return _get_client_ip_from_meta(request.META)