events/batch: add event batching mechanism [AUTH-134]

This commit is contained in:
Samir Musali 2023-11-21 17:07:45 +02:00
parent 8831e1d946
commit 59ab41821d
6 changed files with 184 additions and 6 deletions

View File

@ -37,6 +37,7 @@ class EventSerializer(ModelSerializer):
"created",
"expires",
"tenant",
"batch_id",
]

View File

@ -52,6 +52,9 @@ class NotificationTransportSerializer(ModelSerializer):
"webhook_url",
"webhook_mapping",
"send_once",
"enable_batching",
"batch_timeout",
"max_batch_size",
]

View File

@ -161,6 +161,63 @@ class EventManager(Manager):
return self.get_queryset().get_events_per(time_since, extract, data_points)
class EventBatch(ExpiringModel):
"""Model to store information about batches of events."""
batch_id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
event_type = models.CharField(max_length=255)
event_app = models.CharField(max_length=255)
event_user = models.CharField(max_length=255)
start_time = models.DateTimeField(auto_now_add=True)
end_time = models.DateTimeField(null=True, blank=True)
event_count = models.IntegerField(default=0)
last_updated = models.DateTimeField(auto_now=True)
max_batch_size = models.IntegerField(default=10)
batch_timeout = models.IntegerField(default=60) # Timeout in seconds
sent = models.BooleanField(default=False)
def add_event_to_batch(self, event):
"""Add an event to the batch and check if it's ready to send."""
self.add_event(event)
if self.check_batch_limits():
self.process_batch()
@staticmethod
def get_or_create_batch(action, app, user):
"""Get or create a batch for a given action."""
return EventBatch.objects.filter(
event_type=action, event_app=app, event_user=user, end_time__isnull=True
).first() or EventBatch.objects.create(event_type=action, event_app=app, event_user=user)
def check_batch_limits(self):
"""Check if the batch has reached its size or timeout limits."""
time_elapsed = now() - self.start_time
return self.event_count >= self.max_batch_size or time_elapsed >= timedelta(
seconds=self.batch_timeout
)
def add_event(self, event):
"""Add an event to the batch."""
self.event_count += 1
self.save()
def create_batch_summary(self):
"""Create a summary message for the batch."""
return f"Batched Event Summary: {self.event_type} action \
on {self.event_app} app by {self.event_user} user \
occurred {self.event_count} times between {self.start_time} and {now()}"
def process_batch(self):
"""Process the batch and check if it's ready to send."""
summary_message = self.create_batch_summary()
return summary_message
def send_notification(self):
"""Send notification for this batch."""
# Implement the logic to send notification
pass
class Event(SerializerModel, ExpiringModel):
"""An individual Audit/Metrics/Notification/Error Event"""
@ -176,6 +233,8 @@ class Event(SerializerModel, ExpiringModel):
# Shadow the expires attribute from ExpiringModel to override the default duration
expires = models.DateTimeField(default=default_event_duration)
batch_id = models.UUIDField(null=True, blank=True)
objects = EventManager()
@staticmethod
@ -197,6 +256,7 @@ class Event(SerializerModel, ExpiringModel):
current = currentframe()
parent = current.f_back
app = parent.f_globals["__name__"]
cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
event = Event(action=action, app=app, context=cleaned_kwargs)
return event
@ -256,6 +316,9 @@ class Event(SerializerModel, ExpiringModel):
return self
def save(self, *args, **kwargs):
# Creating a batch for this event in the save method
batch = EventBatch.get_or_create_batch(self.action, self.user, self.app)
self.batch_id = batch.batch_id
if self._state.adding:
LOGGER.info(
"Created Event",
@ -315,7 +378,17 @@ class NotificationTransport(SerializerModel):
),
)
enable_batching = models.BooleanField(default=False)
batch_timeout = models.IntegerField(default=60) # Timeout in seconds
max_batch_size = models.IntegerField(default=10)
def send(self, notification: "Notification") -> list[str]:
"""Send a batched notification or a single notification"""
if self.enable_batching:
return self.process_batch(notification)
return self.send_notification(notification)
def send_notification(self, notification: "Notification") -> list[str]:
"""Send notification to user, called from async task"""
if self.mode == TransportMode.LOCAL:
return self.send_local(notification)

View File

@ -2,11 +2,13 @@
from typing import Optional
from django.db.models.query_utils import Q
from django.utils import timezone
from guardian.shortcuts import get_anonymous_user
from structlog.stdlib import get_logger
from authentik.core.exceptions import PropertyMappingExpressionException
from authentik.core.models import User
from authentik.events.models import EventBatch # Importing the EventBatch model
from authentik.events.models import (
Event,
Notification,
@ -19,6 +21,7 @@ from authentik.events.monitored_tasks import (
TaskResult,
TaskResultStatus,
prefill_task,
shared_task,
)
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
@ -110,19 +113,36 @@ def notification_transport(
event = Event.objects.filter(pk=event_pk).first()
if not event:
return
user = User.objects.filter(pk=user_pk).first()
if not user:
return
trigger = NotificationRule.objects.filter(pk=trigger_pk).first()
if not trigger:
return
notification = Notification(
severity=trigger.severity, body=event.summary, event=event, user=user
)
# Check if batching is enabled and process accordingly
transport = NotificationTransport.objects.filter(pk=transport_pk).first()
if not transport:
return
transport.send(notification)
if transport and transport.enable_batching:
# Process the event for batching
batch = EventBatch.get_or_create_batch(event.action, event.app, event.user)
batch.add_event_to_batch(event)
# Check if the batch has reached its limits
if not batch.check_batch_limits():
return
batch_summary = batch.process_batch()
batch.delete()
notification = Notification(
severity=trigger.severity, body=batch_summary, event=event, user=user
)
else:
notification = Notification(
severity=trigger.severity, body=event.summary, event=event, user=user
)
transport.send_notification(notification)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
except (NotificationTransportError, PropertyMappingExpressionException) as exc:
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
@ -147,3 +167,16 @@ def notification_cleanup(self: MonitoredTask):
notification.delete()
LOGGER.debug("Expired notifications", amount=amount)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"]))
# Scheduled task to check and send pending batches
@CELERY_APP.task(base=MonitoredTask)
@shared_task
def check_and_send_pending_batches():
"""Check for pending batches that haven't been sent and have been idle for a specified time."""
idle_time = timezone.now() - timedelta(minutes=10) # Example idle time
pending_batches = EventBatch.objects.filter(sent=False, last_updated__lt=idle_time)
for batch in pending_batches:
batch.send_notification()
batch.sent = True
batch.save()

View File

@ -3022,6 +3022,11 @@
"type": "object",
"additionalProperties": true,
"title": "Tenant"
},
"batch_id": {
"type": "string",
"format": "uuid",
"title": "Batch id"
}
},
"required": []
@ -3056,6 +3061,22 @@
"type": "boolean",
"title": "Send once",
"description": "Only send notification once, for example when sending a webhook into a chat channel."
},
"enable_batching": {
"type": "boolean",
"title": "Enable batching"
},
"batch_timeout": {
"type": "integer",
"minimum": -2147483648,
"maximum": 2147483647,
"title": "Batch timeout"
},
"max_batch_size": {
"type": "integer",
"minimum": -2147483648,
"maximum": 2147483647,
"title": "Max batch size"
}
},
"required": []
@ -3131,6 +3152,11 @@
"type": "object",
"additionalProperties": true,
"title": "Tenant"
},
"batch_id": {
"type": "string",
"format": "uuid",
"title": "Batch id"
}
},
"required": [

View File

@ -31489,6 +31489,10 @@ components:
type: string
format: date-time
tenant: {}
batch_id:
type: string
format: uuid
nullable: true
required:
- action
- app
@ -31988,6 +31992,10 @@ components:
type: string
format: date-time
tenant: {}
batch_id:
type: string
format: uuid
nullable: true
required:
- action
- app
@ -34539,6 +34547,16 @@ components:
type: boolean
description: Only send notification once, for example when sending a webhook
into a chat channel.
enable_batching:
type: boolean
batch_timeout:
type: integer
maximum: 2147483647
minimum: -2147483648
max_batch_size:
type: integer
maximum: 2147483647
minimum: -2147483648
required:
- mode_verbose
- name
@ -34575,6 +34593,16 @@ components:
type: boolean
description: Only send notification once, for example when sending a webhook
into a chat channel.
enable_batching:
type: boolean
batch_timeout:
type: integer
maximum: 2147483647
minimum: -2147483648
max_batch_size:
type: integer
maximum: 2147483647
minimum: -2147483648
required:
- name
NotificationTransportTest:
@ -37521,6 +37549,10 @@ components:
type: string
format: date-time
tenant: {}
batch_id:
type: string
format: uuid
nullable: true
PatchedExpressionPolicyRequest:
type: object
description: Group Membership Policy Serializer
@ -38038,6 +38070,16 @@ components:
type: boolean
description: Only send notification once, for example when sending a webhook
into a chat channel.
enable_batching:
type: boolean
batch_timeout:
type: integer
maximum: 2147483647
minimum: -2147483648
max_batch_size:
type: integer
maximum: 2147483647
minimum: -2147483648
PatchedNotificationWebhookMappingRequest:
type: object
description: NotificationWebhookMapping Serializer