From 59ab41821d0e103a8838ece95dd6b0654269a4d4 Mon Sep 17 00:00:00 2001 From: Samir Musali Date: Tue, 21 Nov 2023 17:07:45 +0200 Subject: [PATCH] events/batch: add event batching mechanism [AUTH-134] --- authentik/events/api/events.py | 1 + .../events/api/notification_transports.py | 3 + authentik/events/models.py | 73 +++++++++++++++++++ authentik/events/tasks.py | 45 ++++++++++-- blueprints/schema.json | 26 +++++++ schema.yml | 42 +++++++++++ 6 files changed, 184 insertions(+), 6 deletions(-) diff --git a/authentik/events/api/events.py b/authentik/events/api/events.py index 18b60be2c..b27d03aa5 100644 --- a/authentik/events/api/events.py +++ b/authentik/events/api/events.py @@ -37,6 +37,7 @@ class EventSerializer(ModelSerializer): "created", "expires", "tenant", + "batch_id", ] diff --git a/authentik/events/api/notification_transports.py b/authentik/events/api/notification_transports.py index 30cd88d17..5f9d66e61 100644 --- a/authentik/events/api/notification_transports.py +++ b/authentik/events/api/notification_transports.py @@ -52,6 +52,9 @@ class NotificationTransportSerializer(ModelSerializer): "webhook_url", "webhook_mapping", "send_once", + "enable_batching", + "batch_timeout", + "max_batch_size", ] diff --git a/authentik/events/models.py b/authentik/events/models.py index 240ba11de..c36488cc4 100644 --- a/authentik/events/models.py +++ b/authentik/events/models.py @@ -161,6 +161,63 @@ class EventManager(Manager): return self.get_queryset().get_events_per(time_since, extract, data_points) +class EventBatch(ExpiringModel): + """Model to store information about batches of events.""" + + batch_id = models.UUIDField(primary_key=True, default=uuid4, editable=False) + event_type = models.CharField(max_length=255) + event_app = models.CharField(max_length=255) + event_user = models.CharField(max_length=255) + start_time = models.DateTimeField(auto_now_add=True) + end_time = models.DateTimeField(null=True, blank=True) + event_count = models.IntegerField(default=0) + last_updated = models.DateTimeField(auto_now=True) + max_batch_size = models.IntegerField(default=10) + batch_timeout = models.IntegerField(default=60) # Timeout in seconds + sent = models.BooleanField(default=False) + + def add_event_to_batch(self, event): + """Add an event to the batch and check if it's ready to send.""" + self.add_event(event) + if self.check_batch_limits(): + self.process_batch() + + @staticmethod + def get_or_create_batch(action, app, user): + """Get or create a batch for a given action.""" + return EventBatch.objects.filter( + event_type=action, event_app=app, event_user=user, end_time__isnull=True + ).first() or EventBatch.objects.create(event_type=action, event_app=app, event_user=user) + + def check_batch_limits(self): + """Check if the batch has reached its size or timeout limits.""" + time_elapsed = now() - self.start_time + return self.event_count >= self.max_batch_size or time_elapsed >= timedelta( + seconds=self.batch_timeout + ) + + def add_event(self, event): + """Add an event to the batch.""" + self.event_count += 1 + self.save() + + def create_batch_summary(self): + """Create a summary message for the batch.""" + return f"Batched Event Summary: {self.event_type} action \ + on {self.event_app} app by {self.event_user} user \ + occurred {self.event_count} times between {self.start_time} and {now()}" + + def process_batch(self): + """Process the batch and check if it's ready to send.""" + summary_message = self.create_batch_summary() + return summary_message + + def send_notification(self): + """Send notification for this batch.""" + # Implement the logic to send notification + pass + + class Event(SerializerModel, ExpiringModel): """An individual Audit/Metrics/Notification/Error Event""" @@ -176,6 +233,8 @@ class Event(SerializerModel, ExpiringModel): # Shadow the expires attribute from ExpiringModel to override the default duration expires = models.DateTimeField(default=default_event_duration) + batch_id = models.UUIDField(null=True, blank=True) + objects = EventManager() @staticmethod @@ -197,6 +256,7 @@ class Event(SerializerModel, ExpiringModel): current = currentframe() parent = current.f_back app = parent.f_globals["__name__"] + cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs)) event = Event(action=action, app=app, context=cleaned_kwargs) return event @@ -256,6 +316,9 @@ class Event(SerializerModel, ExpiringModel): return self def save(self, *args, **kwargs): + # Creating a batch for this event in the save method + batch = EventBatch.get_or_create_batch(self.action, self.user, self.app) + self.batch_id = batch.batch_id if self._state.adding: LOGGER.info( "Created Event", @@ -315,7 +378,17 @@ class NotificationTransport(SerializerModel): ), ) + enable_batching = models.BooleanField(default=False) + batch_timeout = models.IntegerField(default=60) # Timeout in seconds + max_batch_size = models.IntegerField(default=10) + def send(self, notification: "Notification") -> list[str]: + """Send a batched notification or a single notification""" + if self.enable_batching: + return self.process_batch(notification) + return self.send_notification(notification) + + def send_notification(self, notification: "Notification") -> list[str]: """Send notification to user, called from async task""" if self.mode == TransportMode.LOCAL: return self.send_local(notification) diff --git a/authentik/events/tasks.py b/authentik/events/tasks.py index 9c00860f8..b9e041b8b 100644 --- a/authentik/events/tasks.py +++ b/authentik/events/tasks.py @@ -2,11 +2,13 @@ from typing import Optional from django.db.models.query_utils import Q +from django.utils import timezone from guardian.shortcuts import get_anonymous_user from structlog.stdlib import get_logger from authentik.core.exceptions import PropertyMappingExpressionException from authentik.core.models import User +from authentik.events.models import EventBatch # Importing the EventBatch model from authentik.events.models import ( Event, Notification, @@ -19,6 +21,7 @@ from authentik.events.monitored_tasks import ( TaskResult, TaskResultStatus, prefill_task, + shared_task, ) from authentik.policies.engine import PolicyEngine from authentik.policies.models import PolicyBinding, PolicyEngineMode @@ -110,19 +113,36 @@ def notification_transport( event = Event.objects.filter(pk=event_pk).first() if not event: return + user = User.objects.filter(pk=user_pk).first() if not user: return trigger = NotificationRule.objects.filter(pk=trigger_pk).first() if not trigger: return - notification = Notification( - severity=trigger.severity, body=event.summary, event=event, user=user - ) + + # Check if batching is enabled and process accordingly transport = NotificationTransport.objects.filter(pk=transport_pk).first() - if not transport: - return - transport.send(notification) + if transport and transport.enable_batching: + # Process the event for batching + batch = EventBatch.get_or_create_batch(event.action, event.app, event.user) + batch.add_event_to_batch(event) + # Check if the batch has reached its limits + if not batch.check_batch_limits(): + return + + batch_summary = batch.process_batch() + batch.delete() + notification = Notification( + severity=trigger.severity, body=batch_summary, event=event, user=user + ) + else: + notification = Notification( + severity=trigger.severity, body=event.summary, event=event, user=user + ) + + transport.send_notification(notification) + self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL)) except (NotificationTransportError, PropertyMappingExpressionException) as exc: self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) @@ -147,3 +167,16 @@ def notification_cleanup(self: MonitoredTask): notification.delete() LOGGER.debug("Expired notifications", amount=amount) self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"])) + +# Scheduled task to check and send pending batches +@CELERY_APP.task(base=MonitoredTask) +@shared_task +def check_and_send_pending_batches(): + """Check for pending batches that haven't been sent and have been idle for a specified time.""" + idle_time = timezone.now() - timedelta(minutes=10) # Example idle time + pending_batches = EventBatch.objects.filter(sent=False, last_updated__lt=idle_time) + for batch in pending_batches: + batch.send_notification() + batch.sent = True + batch.save() + diff --git a/blueprints/schema.json b/blueprints/schema.json index 213cb1673..fbf2028e5 100644 --- a/blueprints/schema.json +++ b/blueprints/schema.json @@ -3022,6 +3022,11 @@ "type": "object", "additionalProperties": true, "title": "Tenant" + }, + "batch_id": { + "type": "string", + "format": "uuid", + "title": "Batch id" } }, "required": [] @@ -3056,6 +3061,22 @@ "type": "boolean", "title": "Send once", "description": "Only send notification once, for example when sending a webhook into a chat channel." + }, + "enable_batching": { + "type": "boolean", + "title": "Enable batching" + }, + "batch_timeout": { + "type": "integer", + "minimum": -2147483648, + "maximum": 2147483647, + "title": "Batch timeout" + }, + "max_batch_size": { + "type": "integer", + "minimum": -2147483648, + "maximum": 2147483647, + "title": "Max batch size" } }, "required": [] @@ -3131,6 +3152,11 @@ "type": "object", "additionalProperties": true, "title": "Tenant" + }, + "batch_id": { + "type": "string", + "format": "uuid", + "title": "Batch id" } }, "required": [ diff --git a/schema.yml b/schema.yml index ef149443f..9332a8fa4 100644 --- a/schema.yml +++ b/schema.yml @@ -31489,6 +31489,10 @@ components: type: string format: date-time tenant: {} + batch_id: + type: string + format: uuid + nullable: true required: - action - app @@ -31988,6 +31992,10 @@ components: type: string format: date-time tenant: {} + batch_id: + type: string + format: uuid + nullable: true required: - action - app @@ -34539,6 +34547,16 @@ components: type: boolean description: Only send notification once, for example when sending a webhook into a chat channel. + enable_batching: + type: boolean + batch_timeout: + type: integer + maximum: 2147483647 + minimum: -2147483648 + max_batch_size: + type: integer + maximum: 2147483647 + minimum: -2147483648 required: - mode_verbose - name @@ -34575,6 +34593,16 @@ components: type: boolean description: Only send notification once, for example when sending a webhook into a chat channel. + enable_batching: + type: boolean + batch_timeout: + type: integer + maximum: 2147483647 + minimum: -2147483648 + max_batch_size: + type: integer + maximum: 2147483647 + minimum: -2147483648 required: - name NotificationTransportTest: @@ -37521,6 +37549,10 @@ components: type: string format: date-time tenant: {} + batch_id: + type: string + format: uuid + nullable: true PatchedExpressionPolicyRequest: type: object description: Group Membership Policy Serializer @@ -38038,6 +38070,16 @@ components: type: boolean description: Only send notification once, for example when sending a webhook into a chat channel. + enable_batching: + type: boolean + batch_timeout: + type: integer + maximum: 2147483647 + minimum: -2147483648 + max_batch_size: + type: integer + maximum: 2147483647 + minimum: -2147483648 PatchedNotificationWebhookMappingRequest: type: object description: NotificationWebhookMapping Serializer