75 lines
2.8 KiB
Python
75 lines
2.8 KiB
Python
"""authentik admin Middleware to impersonate users"""
|
|
from contextvars import ContextVar
|
|
from typing import Callable, Optional
|
|
from uuid import uuid4
|
|
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.utils.translation import activate
|
|
from sentry_sdk.api import set_tag
|
|
from structlog.contextvars import STRUCTLOG_KEY_PREFIX
|
|
|
|
SESSION_KEY_IMPERSONATE_USER = "authentik/impersonate/user"
|
|
SESSION_KEY_IMPERSONATE_ORIGINAL_USER = "authentik/impersonate/original_user"
|
|
RESPONSE_HEADER_ID = "X-authentik-id"
|
|
KEY_AUTH_VIA = "auth_via"
|
|
KEY_USER = "user"
|
|
|
|
CTX_REQUEST_ID = ContextVar[Optional[str]](STRUCTLOG_KEY_PREFIX + "request_id", default=None)
|
|
CTX_HOST = ContextVar[Optional[str]](STRUCTLOG_KEY_PREFIX + "host", default=None)
|
|
CTX_AUTH_VIA = ContextVar[Optional[str]](STRUCTLOG_KEY_PREFIX + KEY_AUTH_VIA, default=None)
|
|
|
|
|
|
class ImpersonateMiddleware:
|
|
"""Middleware to impersonate users"""
|
|
|
|
get_response: Callable[[HttpRequest], HttpResponse]
|
|
|
|
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request: HttpRequest) -> HttpResponse:
|
|
# No permission checks are done here, they need to be checked before
|
|
# SESSION_KEY_IMPERSONATE_USER is set.
|
|
if request.user.is_authenticated:
|
|
locale = request.user.locale(request)
|
|
if locale != "":
|
|
activate(locale)
|
|
|
|
if SESSION_KEY_IMPERSONATE_USER in request.session:
|
|
request.user = request.session[SESSION_KEY_IMPERSONATE_USER]
|
|
# Ensure that the user is active, otherwise nothing will work
|
|
request.user.is_active = True
|
|
|
|
return self.get_response(request)
|
|
|
|
|
|
class RequestIDMiddleware:
|
|
"""Add a unique ID to every request"""
|
|
|
|
get_response: Callable[[HttpRequest], HttpResponse]
|
|
|
|
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request: HttpRequest) -> HttpResponse:
|
|
if not hasattr(request, "request_id"):
|
|
request_id = uuid4().hex
|
|
setattr(request, "request_id", request_id)
|
|
CTX_REQUEST_ID.set(request_id)
|
|
CTX_HOST.set(request.get_host())
|
|
set_tag("authentik.request_id", request_id)
|
|
if hasattr(request, "user") and getattr(request.user, "is_authenticated", False):
|
|
CTX_AUTH_VIA.set("session")
|
|
else:
|
|
CTX_AUTH_VIA.set("unauthenticated")
|
|
|
|
response = self.get_response(request)
|
|
|
|
response[RESPONSE_HEADER_ID] = request.request_id
|
|
setattr(response, "ak_context", {})
|
|
response.ak_context["request_id"] = CTX_REQUEST_ID.get()
|
|
response.ak_context["host"] = CTX_HOST.get()
|
|
response.ak_context[KEY_AUTH_VIA] = CTX_AUTH_VIA.get()
|
|
response.ak_context[KEY_USER] = request.user.username
|
|
return response
|