outpost: fix outpost update signal only being sent to outposts connected to the same passbook instance
This commit is contained in:
parent
0161205c82
commit
7a4e8af1ae
|
@ -9,6 +9,7 @@ from dacite import from_dict
|
||||||
from django.contrib.postgres.fields import ArrayField
|
from django.contrib.postgres.fields import ArrayField
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
from django.db.models.base import Model
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from guardian.shortcuts import assign_perm
|
from guardian.shortcuts import assign_perm
|
||||||
|
|
||||||
|
@ -30,13 +31,17 @@ class OutpostConfig:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class OutpostModel:
|
class OutpostModel(Model):
|
||||||
"""Base model for providers that need more objects than just themselves"""
|
"""Base model for providers that need more objects than just themselves"""
|
||||||
|
|
||||||
def get_required_objects(self) -> Iterable[models.Model]:
|
def get_required_objects(self) -> Iterable[models.Model]:
|
||||||
"""Return a list of all required objects"""
|
"""Return a list of all required objects"""
|
||||||
return [self]
|
return [self]
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
|
||||||
|
abstract = True
|
||||||
|
|
||||||
|
|
||||||
class OutpostType(models.TextChoices):
|
class OutpostType(models.TextChoices):
|
||||||
"""Outpost types, currently only the reverse proxy is available"""
|
"""Outpost types, currently only the reverse proxy is available"""
|
||||||
|
|
|
@ -1,31 +1,31 @@
|
||||||
"""passbook outpost signals"""
|
"""passbook outpost signals"""
|
||||||
from asgiref.sync import async_to_sync
|
|
||||||
from channels.layers import get_channel_layer
|
|
||||||
from django.db.models import Model
|
from django.db.models import Model
|
||||||
from django.db.models.signals import post_save
|
from django.db.models.signals import post_save
|
||||||
from django.dispatch import receiver
|
from django.dispatch import receiver
|
||||||
from structlog import get_logger
|
from structlog import get_logger
|
||||||
|
|
||||||
|
from passbook.lib.utils.reflection import class_to_path
|
||||||
from passbook.outposts.models import Outpost, OutpostModel
|
from passbook.outposts.models import Outpost, OutpostModel
|
||||||
|
from passbook.outposts.tasks import outpost_send_update
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@receiver(post_save, sender=Outpost)
|
@receiver(post_save, sender=Outpost)
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def ensure_user_and_token(sender, instance, **_):
|
def ensure_user_and_token(sender, instance: Model, **_):
|
||||||
"""Ensure that token is created/updated on save"""
|
"""Ensure that token is created/updated on save"""
|
||||||
_ = instance.token
|
_ = instance.token
|
||||||
|
|
||||||
|
|
||||||
@receiver(post_save)
|
@receiver(post_save)
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def post_save_update(sender, instance, **_):
|
def post_save_update(sender, instance: Model, **_):
|
||||||
"""If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved,
|
"""If an OutpostModel, or a model that is somehow connected to an OutpostModel is saved,
|
||||||
we send a message down the relevant OutpostModels WS connection to trigger an update"""
|
we send a message down the relevant OutpostModels WS connection to trigger an update"""
|
||||||
if isinstance(instance, OutpostModel):
|
if isinstance(instance, OutpostModel):
|
||||||
LOGGER.debug("triggering outpost update from outpostmodel", instance=instance)
|
LOGGER.debug("triggering outpost update from outpostmodel", instance=instance)
|
||||||
_send_update(instance)
|
outpost_send_update.delay(class_to_path(instance.__class__), instance.pk)
|
||||||
return
|
return
|
||||||
|
|
||||||
for field in instance._meta.get_fields():
|
for field in instance._meta.get_fields():
|
||||||
|
@ -46,13 +46,4 @@ def post_save_update(sender, instance, **_):
|
||||||
# Because the Outpost Model has an M2M to Provider,
|
# Because the Outpost Model has an M2M to Provider,
|
||||||
# we have to iterate over the entire QS
|
# we have to iterate over the entire QS
|
||||||
for reverse in getattr(instance, field_name).all():
|
for reverse in getattr(instance, field_name).all():
|
||||||
_send_update(reverse)
|
outpost_send_update(class_to_path(reverse.__class__), reverse.pk)
|
||||||
|
|
||||||
|
|
||||||
def _send_update(outpost_model: Model):
|
|
||||||
"""Send update trigger for each channel of an outpost model"""
|
|
||||||
for outpost in outpost_model.outpost_set.all():
|
|
||||||
channel_layer = get_channel_layer()
|
|
||||||
for channel in outpost.channels:
|
|
||||||
LOGGER.debug("sending update", channel=channel)
|
|
||||||
async_to_sync(channel_layer.send)(channel, {"type": "event.update"})
|
|
||||||
|
|
|
@ -1,8 +1,22 @@
|
||||||
"""outpost tasks"""
|
"""outpost tasks"""
|
||||||
from passbook.outposts.models import Outpost, OutpostDeploymentType, OutpostType
|
from typing import Any
|
||||||
|
|
||||||
|
from asgiref.sync import async_to_sync
|
||||||
|
from channels.layers import get_channel_layer
|
||||||
|
from structlog import get_logger
|
||||||
|
|
||||||
|
from passbook.lib.utils.reflection import path_to_class
|
||||||
|
from passbook.outposts.models import (
|
||||||
|
Outpost,
|
||||||
|
OutpostDeploymentType,
|
||||||
|
OutpostModel,
|
||||||
|
OutpostType,
|
||||||
|
)
|
||||||
from passbook.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
from passbook.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
||||||
from passbook.root.celery import CELERY_APP
|
from passbook.root.celery import CELERY_APP
|
||||||
|
|
||||||
|
LOGGER = get_logger()
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task(bind=True)
|
@CELERY_APP.task(bind=True)
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
|
@ -20,3 +34,16 @@ def outpost_k8s_controller_single(self, outpost: str, outpost_type: str):
|
||||||
"""Launch Kubernetes manager and reconcile deployment/service/etc"""
|
"""Launch Kubernetes manager and reconcile deployment/service/etc"""
|
||||||
if outpost_type == OutpostType.PROXY:
|
if outpost_type == OutpostType.PROXY:
|
||||||
ProxyKubernetesController(outpost).run()
|
ProxyKubernetesController(outpost).run()
|
||||||
|
|
||||||
|
|
||||||
|
@CELERY_APP.task()
|
||||||
|
def outpost_send_update(model_class: str, model_pk: Any):
|
||||||
|
"""Send outpost update to all registered outposts, irregardless to which passbook
|
||||||
|
instance they are connected"""
|
||||||
|
model = path_to_class(model_class)
|
||||||
|
outpost_model: OutpostModel = model.objects.get(model_pk)
|
||||||
|
for outpost in outpost_model.outpost_set.all():
|
||||||
|
channel_layer = get_channel_layer()
|
||||||
|
for channel in outpost.channels:
|
||||||
|
LOGGER.debug("sending update", channel=channel)
|
||||||
|
async_to_sync(channel_layer.send)(channel, {"type": "event.update"})
|
||||||
|
|
Reference in New Issue