*: fix mismatched task names for discovery, make output service connection task monitored (#4956)
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
parent
86f9056d3f
commit
9310d4cdc0
|
@ -55,11 +55,11 @@ class AuthentikBlueprintsConfig(ManagedAppConfig):
|
||||||
"""Load v1 tasks"""
|
"""Load v1 tasks"""
|
||||||
self.import_module("authentik.blueprints.v1.tasks")
|
self.import_module("authentik.blueprints.v1.tasks")
|
||||||
|
|
||||||
def reconcile_blueprints_discover(self):
|
def reconcile_blueprints_discovery(self):
|
||||||
"""Run blueprint discovery"""
|
"""Run blueprint discovery"""
|
||||||
from authentik.blueprints.v1.tasks import blueprints_discover, clear_failed_blueprints
|
from authentik.blueprints.v1.tasks import blueprints_discovery, clear_failed_blueprints
|
||||||
|
|
||||||
blueprints_discover.delay()
|
blueprints_discovery.delay()
|
||||||
clear_failed_blueprints.delay()
|
clear_failed_blueprints.delay()
|
||||||
|
|
||||||
def import_models(self):
|
def import_models(self):
|
||||||
|
|
|
@ -5,7 +5,7 @@ from authentik.lib.utils.time import fqdn_rand
|
||||||
|
|
||||||
CELERY_BEAT_SCHEDULE = {
|
CELERY_BEAT_SCHEDULE = {
|
||||||
"blueprints_v1_discover": {
|
"blueprints_v1_discover": {
|
||||||
"task": "authentik.blueprints.v1.tasks.blueprints_discover",
|
"task": "authentik.blueprints.v1.tasks.blueprints_discovery",
|
||||||
"schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"),
|
"schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"),
|
||||||
"options": {"queue": "authentik_scheduled"},
|
"options": {"queue": "authentik_scheduled"},
|
||||||
},
|
},
|
||||||
|
|
|
@ -6,7 +6,7 @@ from django.test import TransactionTestCase
|
||||||
from yaml import dump
|
from yaml import dump
|
||||||
|
|
||||||
from authentik.blueprints.models import BlueprintInstance, BlueprintInstanceStatus
|
from authentik.blueprints.models import BlueprintInstance, BlueprintInstanceStatus
|
||||||
from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_discover, blueprints_find
|
from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_discovery, blueprints_find
|
||||||
from authentik.lib.config import CONFIG
|
from authentik.lib.config import CONFIG
|
||||||
from authentik.lib.generators import generate_id
|
from authentik.lib.generators import generate_id
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ class TestBlueprintsV1Tasks(TransactionTestCase):
|
||||||
file.seek(0)
|
file.seek(0)
|
||||||
file_hash = sha512(file.read().encode()).hexdigest()
|
file_hash = sha512(file.read().encode()).hexdigest()
|
||||||
file.flush()
|
file.flush()
|
||||||
blueprints_discover() # pylint: disable=no-value-for-parameter
|
blueprints_discovery() # pylint: disable=no-value-for-parameter
|
||||||
instance = BlueprintInstance.objects.filter(name=blueprint_id).first()
|
instance = BlueprintInstance.objects.filter(name=blueprint_id).first()
|
||||||
self.assertEqual(instance.last_applied_hash, file_hash)
|
self.assertEqual(instance.last_applied_hash, file_hash)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
@ -81,7 +81,7 @@ class TestBlueprintsV1Tasks(TransactionTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
file.flush()
|
file.flush()
|
||||||
blueprints_discover() # pylint: disable=no-value-for-parameter
|
blueprints_discovery() # pylint: disable=no-value-for-parameter
|
||||||
blueprint = BlueprintInstance.objects.filter(name="foo").first()
|
blueprint = BlueprintInstance.objects.filter(name="foo").first()
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
blueprint.last_applied_hash,
|
blueprint.last_applied_hash,
|
||||||
|
@ -106,7 +106,7 @@ class TestBlueprintsV1Tasks(TransactionTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
file.flush()
|
file.flush()
|
||||||
blueprints_discover() # pylint: disable=no-value-for-parameter
|
blueprints_discovery() # pylint: disable=no-value-for-parameter
|
||||||
blueprint.refresh_from_db()
|
blueprint.refresh_from_db()
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
blueprint.last_applied_hash,
|
blueprint.last_applied_hash,
|
||||||
|
|
|
@ -76,7 +76,7 @@ class BlueprintEventHandler(FileSystemEventHandler):
|
||||||
return
|
return
|
||||||
if isinstance(event, FileCreatedEvent):
|
if isinstance(event, FileCreatedEvent):
|
||||||
LOGGER.debug("new blueprint file created, starting discovery")
|
LOGGER.debug("new blueprint file created, starting discovery")
|
||||||
blueprints_discover.delay()
|
blueprints_discovery.delay()
|
||||||
if isinstance(event, FileModifiedEvent):
|
if isinstance(event, FileModifiedEvent):
|
||||||
path = Path(event.src_path)
|
path = Path(event.src_path)
|
||||||
root = Path(CONFIG.y("blueprints_dir")).absolute()
|
root = Path(CONFIG.y("blueprints_dir")).absolute()
|
||||||
|
@ -134,7 +134,7 @@ def blueprints_find():
|
||||||
throws=(DatabaseError, ProgrammingError, InternalError), base=MonitoredTask, bind=True
|
throws=(DatabaseError, ProgrammingError, InternalError), base=MonitoredTask, bind=True
|
||||||
)
|
)
|
||||||
@prefill_task
|
@prefill_task
|
||||||
def blueprints_discover(self: MonitoredTask):
|
def blueprints_discovery(self: MonitoredTask):
|
||||||
"""Find blueprints and check if they need to be created in the database"""
|
"""Find blueprints and check if they need to be created in the database"""
|
||||||
count = 0
|
count = 0
|
||||||
for blueprint in blueprints_find():
|
for blueprint in blueprints_find():
|
||||||
|
|
|
@ -41,7 +41,7 @@ class TaskResult:
|
||||||
|
|
||||||
def with_error(self, exc: Exception) -> "TaskResult":
|
def with_error(self, exc: Exception) -> "TaskResult":
|
||||||
"""Since errors might not always be pickle-able, set the traceback"""
|
"""Since errors might not always be pickle-able, set the traceback"""
|
||||||
self.messages.append(str(exc))
|
self.messages.append(exception_to_string(exc))
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,9 @@ CELERY_BEAT_SCHEDULE = {
|
||||||
"schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"),
|
"schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"),
|
||||||
"options": {"queue": "authentik_scheduled"},
|
"options": {"queue": "authentik_scheduled"},
|
||||||
},
|
},
|
||||||
"outpost_local_connection": {
|
"outpost_connection_discovery": {
|
||||||
"task": "authentik.outposts.tasks.outpost_local_connection",
|
"task": "authentik.outposts.tasks.outpost_connection_discovery",
|
||||||
"schedule": crontab(minute=fqdn_rand("outpost_local_connection"), hour="*/8"),
|
"schedule": crontab(minute=fqdn_rand("outpost_connection_discovery"), hour="*/8"),
|
||||||
"options": {"queue": "authentik_scheduled"},
|
"options": {"queue": "authentik_scheduled"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -236,28 +236,33 @@ def _outpost_single_update(outpost: Outpost):
|
||||||
async_to_sync(closing_send)(channel, {"type": "event.update"})
|
async_to_sync(closing_send)(channel, {"type": "event.update"})
|
||||||
|
|
||||||
|
|
||||||
@CELERY_APP.task()
|
@CELERY_APP.task(
|
||||||
def outpost_local_connection():
|
base=MonitoredTask,
|
||||||
|
bind=True,
|
||||||
|
)
|
||||||
|
def outpost_connection_discovery(self: MonitoredTask):
|
||||||
"""Checks the local environment and create Service connections."""
|
"""Checks the local environment and create Service connections."""
|
||||||
|
status = TaskResult(TaskResultStatus.SUCCESSFUL)
|
||||||
if not CONFIG.y_bool("outposts.discover"):
|
if not CONFIG.y_bool("outposts.discover"):
|
||||||
LOGGER.info("Outpost integration discovery is disabled")
|
status.messages.append("Outpost integration discovery is disabled")
|
||||||
|
self.set_status(status)
|
||||||
return
|
return
|
||||||
# Explicitly check against token filename, as that's
|
# Explicitly check against token filename, as that's
|
||||||
# only present when the integration is enabled
|
# only present when the integration is enabled
|
||||||
if Path(SERVICE_TOKEN_FILENAME).exists():
|
if Path(SERVICE_TOKEN_FILENAME).exists():
|
||||||
LOGGER.info("Detected in-cluster Kubernetes Config")
|
status.messages.append("Detected in-cluster Kubernetes Config")
|
||||||
if not KubernetesServiceConnection.objects.filter(local=True).exists():
|
if not KubernetesServiceConnection.objects.filter(local=True).exists():
|
||||||
LOGGER.debug("Created Service Connection for in-cluster")
|
status.messages.append("Created Service Connection for in-cluster")
|
||||||
KubernetesServiceConnection.objects.create(
|
KubernetesServiceConnection.objects.create(
|
||||||
name="Local Kubernetes Cluster", local=True, kubeconfig={}
|
name="Local Kubernetes Cluster", local=True, kubeconfig={}
|
||||||
)
|
)
|
||||||
# For development, check for the existence of a kubeconfig file
|
# For development, check for the existence of a kubeconfig file
|
||||||
kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
|
kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
|
||||||
if kubeconfig_path.exists():
|
if kubeconfig_path.exists():
|
||||||
LOGGER.info("Detected kubeconfig")
|
status.messages.append("Detected kubeconfig")
|
||||||
kubeconfig_local_name = f"k8s-{gethostname()}"
|
kubeconfig_local_name = f"k8s-{gethostname()}"
|
||||||
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
|
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
|
||||||
LOGGER.debug("Creating kubeconfig Service Connection")
|
status.messages.append("Creating kubeconfig Service Connection")
|
||||||
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
|
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
|
||||||
KubernetesServiceConnection.objects.create(
|
KubernetesServiceConnection.objects.create(
|
||||||
name=kubeconfig_local_name,
|
name=kubeconfig_local_name,
|
||||||
|
@ -266,11 +271,12 @@ def outpost_local_connection():
|
||||||
unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
|
unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
|
||||||
socket = Path(unix_socket_path)
|
socket = Path(unix_socket_path)
|
||||||
if socket.exists() and access(socket, R_OK):
|
if socket.exists() and access(socket, R_OK):
|
||||||
LOGGER.info("Detected local docker socket")
|
status.messages.append("Detected local docker socket")
|
||||||
if len(DockerServiceConnection.objects.filter(local=True)) == 0:
|
if len(DockerServiceConnection.objects.filter(local=True)) == 0:
|
||||||
LOGGER.debug("Created Service Connection for docker")
|
status.messages.append("Created Service Connection for docker")
|
||||||
DockerServiceConnection.objects.create(
|
DockerServiceConnection.objects.create(
|
||||||
name="Local Docker connection",
|
name="Local Docker connection",
|
||||||
local=True,
|
local=True,
|
||||||
url=unix_socket_path,
|
url=unix_socket_path,
|
||||||
)
|
)
|
||||||
|
self.set_status(status)
|
||||||
|
|
|
@ -73,12 +73,12 @@ def task_error_hook(task_id, exception: Exception, traceback, *args, **kwargs):
|
||||||
def _get_startup_tasks() -> list[Callable]:
|
def _get_startup_tasks() -> list[Callable]:
|
||||||
"""Get all tasks to be run on startup"""
|
"""Get all tasks to be run on startup"""
|
||||||
from authentik.admin.tasks import clear_update_notifications
|
from authentik.admin.tasks import clear_update_notifications
|
||||||
from authentik.outposts.tasks import outpost_controller_all, outpost_local_connection
|
from authentik.outposts.tasks import outpost_connection_discovery, outpost_controller_all
|
||||||
from authentik.providers.proxy.tasks import proxy_set_defaults
|
from authentik.providers.proxy.tasks import proxy_set_defaults
|
||||||
|
|
||||||
return [
|
return [
|
||||||
clear_update_notifications,
|
clear_update_notifications,
|
||||||
outpost_local_connection,
|
outpost_connection_discovery,
|
||||||
outpost_controller_all,
|
outpost_controller_all,
|
||||||
proxy_set_defaults,
|
proxy_set_defaults,
|
||||||
]
|
]
|
||||||
|
|
|
@ -17,7 +17,7 @@ from authentik.core.models import Application
|
||||||
from authentik.flows.models import Flow
|
from authentik.flows.models import Flow
|
||||||
from authentik.lib.generators import generate_id
|
from authentik.lib.generators import generate_id
|
||||||
from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType
|
from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostConfig, OutpostType
|
||||||
from authentik.outposts.tasks import outpost_local_connection
|
from authentik.outposts.tasks import outpost_connection_discovery
|
||||||
from authentik.providers.proxy.models import ProxyProvider
|
from authentik.providers.proxy.models import ProxyProvider
|
||||||
from tests.e2e.utils import SeleniumTestCase, retry
|
from tests.e2e.utils import SeleniumTestCase, retry
|
||||||
|
|
||||||
|
@ -210,7 +210,7 @@ class TestProviderProxyConnect(ChannelsLiveServerTestCase):
|
||||||
@reconcile_app("authentik_crypto")
|
@reconcile_app("authentik_crypto")
|
||||||
def test_proxy_connectivity(self):
|
def test_proxy_connectivity(self):
|
||||||
"""Test proxy connectivity over websocket"""
|
"""Test proxy connectivity over websocket"""
|
||||||
outpost_local_connection()
|
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
||||||
proxy: ProxyProvider = ProxyProvider.objects.create(
|
proxy: ProxyProvider = ProxyProvider.objects.create(
|
||||||
name="proxy_provider",
|
name="proxy_provider",
|
||||||
authorization_flow=Flow.objects.get(
|
authorization_flow=Flow.objects.get(
|
||||||
|
|
|
@ -19,7 +19,7 @@ from authentik.outposts.models import (
|
||||||
OutpostType,
|
OutpostType,
|
||||||
default_outpost_config,
|
default_outpost_config,
|
||||||
)
|
)
|
||||||
from authentik.outposts.tasks import outpost_local_connection
|
from authentik.outposts.tasks import outpost_connection_discovery
|
||||||
from authentik.providers.proxy.models import ProxyProvider
|
from authentik.providers.proxy.models import ProxyProvider
|
||||||
from tests.e2e.utils import get_docker_tag
|
from tests.e2e.utils import get_docker_tag
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ class OutpostDockerTests(ChannelsLiveServerTestCase):
|
||||||
self.ssl_folder = mkdtemp()
|
self.ssl_folder = mkdtemp()
|
||||||
self.container = self._start_container(self.ssl_folder)
|
self.container = self._start_container(self.ssl_folder)
|
||||||
# Ensure that local connection have been created
|
# Ensure that local connection have been created
|
||||||
outpost_local_connection()
|
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
||||||
self.provider: ProxyProvider = ProxyProvider.objects.create(
|
self.provider: ProxyProvider = ProxyProvider.objects.create(
|
||||||
name="test",
|
name="test",
|
||||||
internal_host="http://localhost",
|
internal_host="http://localhost",
|
||||||
|
|
|
@ -10,7 +10,7 @@ from authentik.lib.config import CONFIG
|
||||||
from authentik.outposts.controllers.k8s.deployment import DeploymentReconciler
|
from authentik.outposts.controllers.k8s.deployment import DeploymentReconciler
|
||||||
from authentik.outposts.controllers.k8s.triggers import NeedsUpdate
|
from authentik.outposts.controllers.k8s.triggers import NeedsUpdate
|
||||||
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
|
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
|
||||||
from authentik.outposts.tasks import outpost_local_connection
|
from authentik.outposts.tasks import outpost_connection_discovery
|
||||||
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
||||||
from authentik.providers.proxy.models import ProxyProvider
|
from authentik.providers.proxy.models import ProxyProvider
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ class OutpostKubernetesTests(TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
# Ensure that local connection have been created
|
# Ensure that local connection have been created
|
||||||
outpost_local_connection()
|
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
||||||
self.provider: ProxyProvider = ProxyProvider.objects.create(
|
self.provider: ProxyProvider = ProxyProvider.objects.create(
|
||||||
name="test",
|
name="test",
|
||||||
internal_host="http://localhost",
|
internal_host="http://localhost",
|
||||||
|
|
|
@ -18,7 +18,7 @@ from authentik.outposts.models import (
|
||||||
OutpostType,
|
OutpostType,
|
||||||
default_outpost_config,
|
default_outpost_config,
|
||||||
)
|
)
|
||||||
from authentik.outposts.tasks import outpost_local_connection
|
from authentik.outposts.tasks import outpost_connection_discovery
|
||||||
from authentik.providers.proxy.controllers.docker import DockerController
|
from authentik.providers.proxy.controllers.docker import DockerController
|
||||||
from authentik.providers.proxy.models import ProxyProvider
|
from authentik.providers.proxy.models import ProxyProvider
|
||||||
from tests.e2e.utils import get_docker_tag
|
from tests.e2e.utils import get_docker_tag
|
||||||
|
@ -58,7 +58,7 @@ class TestProxyDocker(ChannelsLiveServerTestCase):
|
||||||
self.ssl_folder = mkdtemp()
|
self.ssl_folder = mkdtemp()
|
||||||
self.container = self._start_container(self.ssl_folder)
|
self.container = self._start_container(self.ssl_folder)
|
||||||
# Ensure that local connection have been created
|
# Ensure that local connection have been created
|
||||||
outpost_local_connection()
|
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
||||||
self.provider: ProxyProvider = ProxyProvider.objects.create(
|
self.provider: ProxyProvider = ProxyProvider.objects.create(
|
||||||
name="test",
|
name="test",
|
||||||
internal_host="http://localhost",
|
internal_host="http://localhost",
|
||||||
|
|
|
@ -8,7 +8,7 @@ from structlog.stdlib import get_logger
|
||||||
from authentik.core.tests.utils import create_test_flow
|
from authentik.core.tests.utils import create_test_flow
|
||||||
from authentik.outposts.controllers.kubernetes import KubernetesController
|
from authentik.outposts.controllers.kubernetes import KubernetesController
|
||||||
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
|
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
|
||||||
from authentik.outposts.tasks import outpost_local_connection
|
from authentik.outposts.tasks import outpost_connection_discovery
|
||||||
from authentik.providers.proxy.controllers.k8s.ingress import IngressReconciler
|
from authentik.providers.proxy.controllers.k8s.ingress import IngressReconciler
|
||||||
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
||||||
from authentik.providers.proxy.models import ProxyMode, ProxyProvider
|
from authentik.providers.proxy.models import ProxyMode, ProxyProvider
|
||||||
|
@ -23,7 +23,7 @@ class TestProxyKubernetes(TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
# Ensure that local connection have been created
|
# Ensure that local connection have been created
|
||||||
outpost_local_connection()
|
outpost_connection_discovery() # pylint: disable=no-value-for-parameter
|
||||||
self.controller = None
|
self.controller = None
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
def tearDown(self) -> None:
|
||||||
|
|
Reference in a new issue