outposts: handle RuntimeError during websocket connect
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
65d2eed82d
commit
8a0dd6be24
|
@ -9,7 +9,7 @@ from dacite import from_dict
|
||||||
from dacite.data import Data
|
from dacite.data import Data
|
||||||
from guardian.shortcuts import get_objects_for_user
|
from guardian.shortcuts import get_objects_for_user
|
||||||
from prometheus_client import Gauge
|
from prometheus_client import Gauge
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import BoundLogger, get_logger
|
||||||
|
|
||||||
from authentik.core.channels import AuthJsonConsumer
|
from authentik.core.channels import AuthJsonConsumer
|
||||||
from authentik.outposts.models import OUTPOST_HELLO_INTERVAL, Outpost, OutpostState
|
from authentik.outposts.models import OUTPOST_HELLO_INTERVAL, Outpost, OutpostState
|
||||||
|
@ -23,8 +23,6 @@ GAUGE_OUTPOSTS_LAST_UPDATE = Gauge(
|
||||||
["outpost", "uid", "version"],
|
["outpost", "uid", "version"],
|
||||||
)
|
)
|
||||||
|
|
||||||
LOGGER = get_logger()
|
|
||||||
|
|
||||||
|
|
||||||
class WebsocketMessageInstruction(IntEnum):
|
class WebsocketMessageInstruction(IntEnum):
|
||||||
"""Commands which can be triggered over Websocket"""
|
"""Commands which can be triggered over Websocket"""
|
||||||
|
@ -51,6 +49,7 @@ class OutpostConsumer(AuthJsonConsumer):
|
||||||
"""Handler for Outposts that connect over websockets for health checks and live updates"""
|
"""Handler for Outposts that connect over websockets for health checks and live updates"""
|
||||||
|
|
||||||
outpost: Optional[Outpost] = None
|
outpost: Optional[Outpost] = None
|
||||||
|
logger: BoundLogger
|
||||||
|
|
||||||
last_uid: Optional[str] = None
|
last_uid: Optional[str] = None
|
||||||
|
|
||||||
|
@ -59,11 +58,20 @@ class OutpostConsumer(AuthJsonConsumer):
|
||||||
def connect(self):
|
def connect(self):
|
||||||
super().connect()
|
super().connect()
|
||||||
uuid = self.scope["url_route"]["kwargs"]["pk"]
|
uuid = self.scope["url_route"]["kwargs"]["pk"]
|
||||||
outpost = get_objects_for_user(self.user, "authentik_outposts.view_outpost").filter(pk=uuid)
|
outpost = (
|
||||||
if not outpost.exists():
|
get_objects_for_user(self.user, "authentik_outposts.view_outpost")
|
||||||
|
.filter(pk=uuid)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not outpost:
|
||||||
raise DenyConnection()
|
raise DenyConnection()
|
||||||
|
self.logger = get_logger().bind(outpost=outpost)
|
||||||
|
try:
|
||||||
self.accept()
|
self.accept()
|
||||||
self.outpost = outpost.first()
|
except RuntimeError as exc:
|
||||||
|
self.logger.warning("runtime error during accept", exc=exc)
|
||||||
|
raise DenyConnection()
|
||||||
|
self.outpost = outpost
|
||||||
self.last_uid = self.channel_name
|
self.last_uid = self.channel_name
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
|
@ -78,9 +86,8 @@ class OutpostConsumer(AuthJsonConsumer):
|
||||||
uid=self.last_uid,
|
uid=self.last_uid,
|
||||||
expected=self.outpost.config.kubernetes_replicas,
|
expected=self.outpost.config.kubernetes_replicas,
|
||||||
).dec()
|
).dec()
|
||||||
LOGGER.debug(
|
self.logger.debug(
|
||||||
"removed outpost instance from cache",
|
"removed outpost instance from cache",
|
||||||
outpost=self.outpost,
|
|
||||||
instance_uuid=self.last_uid,
|
instance_uuid=self.last_uid,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -103,9 +110,8 @@ class OutpostConsumer(AuthJsonConsumer):
|
||||||
uid=self.last_uid,
|
uid=self.last_uid,
|
||||||
expected=self.outpost.config.kubernetes_replicas,
|
expected=self.outpost.config.kubernetes_replicas,
|
||||||
).inc()
|
).inc()
|
||||||
LOGGER.debug(
|
self.logger.debug(
|
||||||
"added outpost instance to cache",
|
"added outpost instance to cache",
|
||||||
outpost=self.outpost,
|
|
||||||
instance_uuid=self.last_uid,
|
instance_uuid=self.last_uid,
|
||||||
)
|
)
|
||||||
self.first_msg = True
|
self.first_msg = True
|
||||||
|
|
Reference in a new issue