audit: create audit logs for model creation/updating/deletion
This commit is contained in:
parent
e2cc2843d8
commit
dbcdab05ff
|
@ -0,0 +1,87 @@
|
||||||
|
"""Audit middleware"""
|
||||||
|
from functools import partial
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
from django.contrib.auth.models import User
|
||||||
|
from django.db.models import Model
|
||||||
|
from django.db.models.signals import post_save, pre_delete
|
||||||
|
from django.http import HttpRequest, HttpResponse
|
||||||
|
|
||||||
|
from passbook.audit.models import Event, EventAction, model_to_dict
|
||||||
|
from passbook.audit.signals import EventNewThread
|
||||||
|
from passbook.core.middleware import LOCAL
|
||||||
|
|
||||||
|
|
||||||
|
class AuditMiddleware:
|
||||||
|
"""Register handlers for duration of request-response that log creation/update/deletion
|
||||||
|
of models"""
|
||||||
|
|
||||||
|
get_response: Callable[[HttpRequest], HttpResponse]
|
||||||
|
|
||||||
|
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
|
||||||
|
self.get_response = get_response
|
||||||
|
|
||||||
|
def __call__(self, request: HttpRequest) -> HttpResponse:
|
||||||
|
# Connect signal for automatic logging
|
||||||
|
if hasattr(request, "user") and getattr(
|
||||||
|
request.user, "is_authenticated", False
|
||||||
|
):
|
||||||
|
post_save_handler = partial(
|
||||||
|
self.post_save_handler, user=request.user, request=request
|
||||||
|
)
|
||||||
|
pre_delete_handler = partial(
|
||||||
|
self.pre_delete_handler, user=request.user, request=request
|
||||||
|
)
|
||||||
|
post_save.connect(
|
||||||
|
post_save_handler,
|
||||||
|
dispatch_uid=LOCAL.passbook["request_id"],
|
||||||
|
weak=False,
|
||||||
|
)
|
||||||
|
pre_delete.connect(
|
||||||
|
pre_delete_handler,
|
||||||
|
dispatch_uid=LOCAL.passbook["request_id"],
|
||||||
|
weak=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
response = self.get_response(request)
|
||||||
|
|
||||||
|
post_save.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
|
||||||
|
pre_delete.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
def process_exception(self, request: HttpRequest, exception: Exception):
|
||||||
|
"""Unregister handlers in case of exception"""
|
||||||
|
post_save.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
|
||||||
|
pre_delete.disconnect(dispatch_uid=LOCAL.passbook["request_id"])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
def post_save_handler(
|
||||||
|
user: User, request: HttpRequest, sender, instance: Model, created: bool, **_
|
||||||
|
):
|
||||||
|
"""Signal handler for all object's post_save"""
|
||||||
|
if isinstance(instance, Event):
|
||||||
|
return
|
||||||
|
|
||||||
|
action = EventAction.MODEL_CREATED if created else EventAction.MODEL_UPDATED
|
||||||
|
EventNewThread(
|
||||||
|
action, request, user=user, kwargs={"model": model_to_dict(instance)}
|
||||||
|
).run()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
def pre_delete_handler(
|
||||||
|
user: User, request: HttpRequest, sender, instance: Model, **_
|
||||||
|
):
|
||||||
|
"""Signal handler for all object's pre_delete"""
|
||||||
|
if isinstance(instance, Event):
|
||||||
|
return
|
||||||
|
|
||||||
|
EventNewThread(
|
||||||
|
EventAction.MODEL_DELETED,
|
||||||
|
request,
|
||||||
|
user=user,
|
||||||
|
kwargs={"model": model_to_dict(instance)},
|
||||||
|
).run()
|
|
@ -1,7 +1,6 @@
|
||||||
"""passbook audit models"""
|
"""passbook audit models"""
|
||||||
from enum import Enum
|
|
||||||
from inspect import getmodule, stack
|
from inspect import getmodule, stack
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional, Union
|
||||||
from uuid import UUID, uuid4
|
from uuid import UUID, uuid4
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
@ -22,7 +21,7 @@ from passbook.core.middleware import (
|
||||||
from passbook.core.models import User
|
from passbook.core.models import User
|
||||||
from passbook.lib.utils.http import get_client_ip
|
from passbook.lib.utils.http import get_client_ip
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger("passbook.audit")
|
||||||
|
|
||||||
|
|
||||||
def cleanse_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
|
def cleanse_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
|
||||||
|
@ -90,28 +89,29 @@ def sanitize_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
|
||||||
return final_dict
|
return final_dict
|
||||||
|
|
||||||
|
|
||||||
class EventAction(Enum):
|
class EventAction(models.TextChoices):
|
||||||
"""All possible actions to save into the audit log"""
|
"""All possible actions to save into the audit log"""
|
||||||
|
|
||||||
LOGIN = "login"
|
LOGIN = "login"
|
||||||
LOGIN_FAILED = "login_failed"
|
LOGIN_FAILED = "login_failed"
|
||||||
LOGOUT = "logout"
|
LOGOUT = "logout"
|
||||||
|
|
||||||
|
SIGN_UP = "sign_up"
|
||||||
AUTHORIZE_APPLICATION = "authorize_application"
|
AUTHORIZE_APPLICATION = "authorize_application"
|
||||||
SUSPICIOUS_REQUEST = "suspicious_request"
|
SUSPICIOUS_REQUEST = "suspicious_request"
|
||||||
SIGN_UP = "sign_up"
|
PASSWORD_SET = "password_set" # noqa # nosec
|
||||||
PASSWORD_RESET = "password_reset" # noqa # nosec
|
|
||||||
INVITE_CREATED = "invitation_created"
|
INVITE_CREATED = "invitation_created"
|
||||||
INVITE_USED = "invitation_used"
|
INVITE_USED = "invitation_used"
|
||||||
|
|
||||||
IMPERSONATION_STARTED = "impersonation_started"
|
IMPERSONATION_STARTED = "impersonation_started"
|
||||||
IMPERSONATION_ENDED = "impersonation_ended"
|
IMPERSONATION_ENDED = "impersonation_ended"
|
||||||
CUSTOM = "custom"
|
|
||||||
|
|
||||||
@staticmethod
|
MODEL_CREATED = "model_created"
|
||||||
def as_choices():
|
MODEL_UPDATED = "model_updated"
|
||||||
"""Generate choices of actions used for database"""
|
MODEL_DELETED = "model_deleted"
|
||||||
return tuple(
|
|
||||||
(x, y.value) for x, y in getattr(EventAction, "__members__").items()
|
CUSTOM_PREFIX = "custom_"
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Event(models.Model):
|
class Event(models.Model):
|
||||||
|
@ -119,7 +119,7 @@ class Event(models.Model):
|
||||||
|
|
||||||
event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
|
event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
|
||||||
user = models.JSONField(default=dict)
|
user = models.JSONField(default=dict)
|
||||||
action = models.TextField(choices=EventAction.as_choices())
|
action = models.TextField(choices=EventAction.choices)
|
||||||
date = models.DateTimeField(auto_now_add=True)
|
date = models.DateTimeField(auto_now_add=True)
|
||||||
app = models.TextField()
|
app = models.TextField()
|
||||||
context = models.JSONField(default=dict, blank=True)
|
context = models.JSONField(default=dict, blank=True)
|
||||||
|
@ -134,20 +134,18 @@ class Event(models.Model):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def new(
|
def new(
|
||||||
action: EventAction,
|
action: Union[str, EventAction],
|
||||||
app: Optional[str] = None,
|
app: Optional[str] = None,
|
||||||
_inspect_offset: int = 1,
|
_inspect_offset: int = 1,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> "Event":
|
) -> "Event":
|
||||||
"""Create new Event instance from arguments. Instance is NOT saved."""
|
"""Create new Event instance from arguments. Instance is NOT saved."""
|
||||||
if not isinstance(action, EventAction):
|
if not isinstance(action, EventAction):
|
||||||
raise ValueError(
|
action = EventAction.CUSTOM_PREFIX + action
|
||||||
f"action must be EventAction instance but was {type(action)}"
|
|
||||||
)
|
|
||||||
if not app:
|
if not app:
|
||||||
app = getmodule(stack()[_inspect_offset][0]).__name__
|
app = getmodule(stack()[_inspect_offset][0]).__name__
|
||||||
cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
|
cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
|
||||||
event = Event(action=action.value, app=app, context=cleaned_kwargs)
|
event = Event(action=action, app=app, context=cleaned_kwargs)
|
||||||
return event
|
return event
|
||||||
|
|
||||||
def from_http(
|
def from_http(
|
||||||
|
|
|
@ -20,12 +20,12 @@ from passbook.stages.user_write.signals import user_write
|
||||||
class EventNewThread(Thread):
|
class EventNewThread(Thread):
|
||||||
"""Create Event in background thread"""
|
"""Create Event in background thread"""
|
||||||
|
|
||||||
action: EventAction
|
action: str
|
||||||
request: HttpRequest
|
request: HttpRequest
|
||||||
kwargs: Dict[str, Any]
|
kwargs: Dict[str, Any]
|
||||||
user: Optional[User] = None
|
user: Optional[User] = None
|
||||||
|
|
||||||
def __init__(self, action: EventAction, request: HttpRequest, **kwargs):
|
def __init__(self, action: str, request: HttpRequest, **kwargs):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.action = action
|
self.action = action
|
||||||
self.request = request
|
self.request = request
|
||||||
|
|
|
@ -13,7 +13,7 @@ class TestAuditEvent(TestCase):
|
||||||
|
|
||||||
def test_new_with_model(self):
|
def test_new_with_model(self):
|
||||||
"""Create a new Event passing a model as kwarg"""
|
"""Create a new Event passing a model as kwarg"""
|
||||||
event = Event.new(EventAction.CUSTOM, test={"model": get_anonymous_user()})
|
event = Event.new("unittest", test={"model": get_anonymous_user()})
|
||||||
event.save() # We save to ensure nothing is un-saveable
|
event.save() # We save to ensure nothing is un-saveable
|
||||||
model_content_type = ContentType.objects.get_for_model(get_anonymous_user())
|
model_content_type = ContentType.objects.get_for_model(get_anonymous_user())
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
@ -24,7 +24,7 @@ class TestAuditEvent(TestCase):
|
||||||
def test_new_with_uuid_model(self):
|
def test_new_with_uuid_model(self):
|
||||||
"""Create a new Event passing a model (with UUID PK) as kwarg"""
|
"""Create a new Event passing a model (with UUID PK) as kwarg"""
|
||||||
temp_model = DummyPolicy.objects.create(name="test", result=True)
|
temp_model = DummyPolicy.objects.create(name="test", result=True)
|
||||||
event = Event.new(EventAction.CUSTOM, model=temp_model)
|
event = Event.new("unittest", model=temp_model)
|
||||||
event.save() # We save to ensure nothing is un-saveable
|
event.save() # We save to ensure nothing is un-saveable
|
||||||
model_content_type = ContentType.objects.get_for_model(temp_model)
|
model_content_type = ContentType.objects.get_for_model(temp_model)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
|
|
@ -177,6 +177,7 @@ MIDDLEWARE = [
|
||||||
"django.contrib.sessions.middleware.SessionMiddleware",
|
"django.contrib.sessions.middleware.SessionMiddleware",
|
||||||
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
||||||
"passbook.core.middleware.RequestIDMiddleware",
|
"passbook.core.middleware.RequestIDMiddleware",
|
||||||
|
"passbook.audit.middleware.AuditMiddleware",
|
||||||
"django.middleware.security.SecurityMiddleware",
|
"django.middleware.security.SecurityMiddleware",
|
||||||
"django.middleware.common.CommonMiddleware",
|
"django.middleware.common.CommonMiddleware",
|
||||||
"django.middleware.csrf.CsrfViewMiddleware",
|
"django.middleware.csrf.CsrfViewMiddleware",
|
||||||
|
|
Reference in New Issue