root: Restructure broker / cache / channel / result configuration (#7097)

* Initial commit

* Remove any remaining mentions of Redis URL

This is handled in https://github.com/goauthentik/authentik/pull/5395

* Allow setting broker transport options

This enables usage of other brokers that require additional settings

* Remove remaining reference to Redis URL

This functionality is not part of this PR

* Reset default TLS requirements to none

* Fix linter errors

* Move dict from base64 encoded json to config.py

Additionally add tests

* Replace ast.literal_eval with json.loads

* Use default channel and cache backend configuration

If more customization is desired users shall look at goauthentik.io/docs/installation/configuration#custom-python-settings

* Send config deprecation notification to all superusers

* Remove duplicate method

* Add configuration explanation

For channel layer settings

* Use Event for deprecation warning

* Fix remove duplicated method

* Add missing comma

* Update authentik/lib/config.py

Signed-off-by: Jens L. <jens@beryju.org>

* Fix Event deprecation handling

---------

Signed-off-by: Jens L. <jens@beryju.org>
Co-authored-by: Jens L <jens@beryju.org>
This commit is contained in:
Philipp Kolberg 2023-11-10 15:44:37 +01:00 committed by GitHub
parent 11dcda77fa
commit 9db9ad3d66
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 233 additions and 47 deletions

View file

@ -93,10 +93,10 @@ class ConfigView(APIView):
"traces_sample_rate": float(CONFIG.get("error_reporting.sample_rate", 0.4)),
},
"capabilities": self.get_capabilities(),
"cache_timeout": CONFIG.get_int("redis.cache_timeout"),
"cache_timeout_flows": CONFIG.get_int("redis.cache_timeout_flows"),
"cache_timeout_policies": CONFIG.get_int("redis.cache_timeout_policies"),
"cache_timeout_reputation": CONFIG.get_int("redis.cache_timeout_reputation"),
"cache_timeout": CONFIG.get_int("cache.timeout"),
"cache_timeout_flows": CONFIG.get_int("cache.timeout_flows"),
"cache_timeout_policies": CONFIG.get_int("cache.timeout_policies"),
"cache_timeout_reputation": CONFIG.get_int("cache.timeout_reputation"),
}
)

View file

@ -33,7 +33,7 @@ PLAN_CONTEXT_SOURCE = "source"
# Is set by the Flow Planner when a FlowToken was used, and the currently active flow plan
# was restored.
PLAN_CONTEXT_IS_RESTORED = "is_restored"
CACHE_TIMEOUT = CONFIG.get_int("redis.cache_timeout_flows")
CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_flows")
CACHE_PREFIX = "goauthentik.io/flows/planner/"

View file

@ -1,4 +1,6 @@
"""authentik core config loader"""
import base64
import json
import os
from collections.abc import Mapping
from contextlib import contextmanager
@ -22,6 +24,25 @@ SEARCH_PATHS = ["authentik/lib/default.yml", "/etc/authentik/config.yml", ""] +
ENV_PREFIX = "AUTHENTIK"
ENVIRONMENT = os.getenv(f"{ENV_PREFIX}_ENV", "local")
REDIS_ENV_KEYS = [
f"{ENV_PREFIX}_REDIS__HOST",
f"{ENV_PREFIX}_REDIS__PORT",
f"{ENV_PREFIX}_REDIS__DB",
f"{ENV_PREFIX}_REDIS__USERNAME",
f"{ENV_PREFIX}_REDIS__PASSWORD",
f"{ENV_PREFIX}_REDIS__TLS",
f"{ENV_PREFIX}_REDIS__TLS_REQS",
]
DEPRECATIONS = {
"redis.broker_url": "broker.url",
"redis.broker_transport_options": "broker.transport_options",
"redis.cache_timeout": "cache.timeout",
"redis.cache_timeout_flows": "cache.timeout_flows",
"redis.cache_timeout_policies": "cache.timeout_policies",
"redis.cache_timeout_reputation": "cache.timeout_reputation",
}
def get_path_from_dict(root: dict, path: str, sep=".", default=None) -> Any:
"""Recursively walk through `root`, checking each part of `path` separated by `sep`.
@ -81,6 +102,10 @@ class AttrEncoder(JSONEncoder):
return super().default(o)
class UNSET:
"""Used to test whether configuration key has not been set."""
class ConfigLoader:
"""Search through SEARCH_PATHS and load configuration. Environment variables starting with
`ENV_PREFIX` are also applied.
@ -113,6 +138,40 @@ class ConfigLoader:
self.update_from_file(env_file)
self.update_from_env()
self.update(self.__config, kwargs)
self.check_deprecations()
def check_deprecations(self):
"""Warn if any deprecated configuration options are used"""
def _pop_deprecated_key(current_obj, dot_parts, index):
"""Recursive function to remove deprecated keys in configuration"""
dot_part = dot_parts[index]
if index == len(dot_parts) - 1:
return current_obj.pop(dot_part)
value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1)
if not current_obj[dot_part]:
current_obj.pop(dot_part)
return value
for deprecation, replacement in DEPRECATIONS.items():
if self.get(deprecation, default=UNSET) is not UNSET:
message = (
f"'{deprecation}' has been deprecated in favor of '{replacement}'! "
+ "Please update your configuration."
)
self.log(
"warning",
message,
)
try:
from authentik.events.models import Event, EventAction
Event.new(EventAction.CONFIGURATION_ERROR, message=message).save()
except ImportError:
continue
deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0)
self.set(replacement, deprecated_attr.value)
def log(self, level: str, message: str, **kwargs):
"""Custom Log method, we want to ensure ConfigLoader always logs JSON even when
@ -180,6 +239,10 @@ class ConfigLoader:
error=str(exc),
)
def update_from_dict(self, update: dict):
"""Update config from dict"""
self.__config.update(update)
def update_from_env(self):
"""Check environment variables"""
outer = {}
@ -188,19 +251,13 @@ class ConfigLoader:
if not key.startswith(ENV_PREFIX):
continue
relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower()
# Recursively convert path from a.b.c into outer[a][b][c]
current_obj = outer
dot_parts = relative_key.split(".")
for dot_part in dot_parts[:-1]:
if dot_part not in current_obj:
current_obj[dot_part] = {}
current_obj = current_obj[dot_part]
# Check if the value is json, and try to load it
try:
value = loads(value)
except JSONDecodeError:
pass
current_obj[dot_parts[-1]] = Attr(value, Attr.Source.ENV, key)
attr_value = Attr(value, Attr.Source.ENV, relative_key)
set_path_in_dict(outer, relative_key, attr_value)
idx += 1
if idx > 0:
self.log("debug", "Loaded environment variables", count=idx)
@ -241,6 +298,23 @@ class ConfigLoader:
"""Wrapper for get that converts value into boolean"""
return str(self.get(path, default)).lower() == "true"
def get_dict_from_b64_json(self, path: str, default=None) -> dict:
"""Wrapper for get that converts value from Base64 encoded string into dictionary"""
config_value = self.get(path)
if config_value is None:
return {}
try:
b64decoded_str = base64.b64decode(config_value).decode("utf-8")
b64decoded_str = b64decoded_str.strip().lstrip("{").rstrip("}")
b64decoded_str = "{" + b64decoded_str + "}"
return json.loads(b64decoded_str)
except (JSONDecodeError, TypeError, ValueError) as exc:
self.log(
"warning",
f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}",
)
return default if isinstance(default, dict) else {}
def set(self, path: str, value: Any, sep="."):
"""Set value using same syntax as get()"""
set_path_in_dict(self.raw, path, Attr(value), sep=sep)

View file

@ -28,14 +28,28 @@ listen:
redis:
host: localhost
port: 6379
db: 0
username: ""
password: ""
tls: false
tls_reqs: "none"
db: 0
cache_timeout: 300
cache_timeout_flows: 300
cache_timeout_policies: 300
cache_timeout_reputation: 300
# broker:
# url: ""
# transport_options: ""
cache:
# url: ""
timeout: 300
timeout_flows: 300
timeout_policies: 300
timeout_reputation: 300
# channel:
# url: ""
# result_backend:
# url: ""
paths:
media: ./media

View file

@ -1,20 +1,32 @@
"""Test config loader"""
import base64
from json import dumps
from os import chmod, environ, unlink, write
from tempfile import mkstemp
from unittest import mock
from django.conf import ImproperlyConfigured
from django.test import TestCase
from authentik.lib.config import ENV_PREFIX, ConfigLoader
from authentik.lib.config import ENV_PREFIX, UNSET, Attr, AttrEncoder, ConfigLoader
class TestConfig(TestCase):
"""Test config loader"""
check_deprecations_env_vars = {
ENV_PREFIX + "_REDIS__BROKER_URL": "redis://myredis:8327/43",
ENV_PREFIX + "_REDIS__BROKER_TRANSPORT_OPTIONS": "bWFzdGVybmFtZT1teW1hc3Rlcg==",
ENV_PREFIX + "_REDIS__CACHE_TIMEOUT": "124s",
ENV_PREFIX + "_REDIS__CACHE_TIMEOUT_FLOWS": "32m",
ENV_PREFIX + "_REDIS__CACHE_TIMEOUT_POLICIES": "3920ns",
ENV_PREFIX + "_REDIS__CACHE_TIMEOUT_REPUTATION": "298382us",
}
@mock.patch.dict(environ, {ENV_PREFIX + "_test__test": "bar"})
def test_env(self):
"""Test simple instance"""
config = ConfigLoader()
environ[ENV_PREFIX + "_test__test"] = "bar"
config.update_from_env()
self.assertEqual(config.get("test.test"), "bar")
@ -27,12 +39,20 @@ class TestConfig(TestCase):
self.assertEqual(config.get("foo.bar"), "baz")
self.assertEqual(config.get("foo.bar"), "bar")
@mock.patch.dict(environ, {"foo": "bar"})
def test_uri_env(self):
"""Test URI parsing (environment)"""
config = ConfigLoader()
environ["foo"] = "bar"
self.assertEqual(config.parse_uri("env://foo").value, "bar")
self.assertEqual(config.parse_uri("env://foo?bar").value, "bar")
foo_uri = "env://foo"
foo_parsed = config.parse_uri(foo_uri)
self.assertEqual(foo_parsed.value, "bar")
self.assertEqual(foo_parsed.source_type, Attr.Source.URI)
self.assertEqual(foo_parsed.source, foo_uri)
foo_bar_uri = "env://foo?bar"
foo_bar_parsed = config.parse_uri(foo_bar_uri)
self.assertEqual(foo_bar_parsed.value, "bar")
self.assertEqual(foo_bar_parsed.source_type, Attr.Source.URI)
self.assertEqual(foo_bar_parsed.source, foo_bar_uri)
def test_uri_file(self):
"""Test URI parsing (file load)"""
@ -91,3 +111,60 @@ class TestConfig(TestCase):
config = ConfigLoader()
config.set("foo", "bar")
self.assertEqual(config.get_int("foo", 1234), 1234)
def test_get_dict_from_b64_json(self):
"""Test get_dict_from_b64_json"""
config = ConfigLoader()
test_value = ' { "foo": "bar" } '.encode("utf-8")
b64_value = base64.b64encode(test_value)
config.set("foo", b64_value)
self.assertEqual(config.get_dict_from_b64_json("foo"), {"foo": "bar"})
def test_get_dict_from_b64_json_missing_brackets(self):
"""Test get_dict_from_b64_json with missing brackets"""
config = ConfigLoader()
test_value = ' "foo": "bar" '.encode("utf-8")
b64_value = base64.b64encode(test_value)
config.set("foo", b64_value)
self.assertEqual(config.get_dict_from_b64_json("foo"), {"foo": "bar"})
def test_get_dict_from_b64_json_invalid(self):
"""Test get_dict_from_b64_json with invalid value"""
config = ConfigLoader()
config.set("foo", "bar")
self.assertEqual(config.get_dict_from_b64_json("foo"), {})
def test_attr_json_encoder(self):
"""Test AttrEncoder"""
test_attr = Attr("foo", Attr.Source.ENV, "AUTHENTIK_REDIS__USERNAME")
json_attr = dumps(test_attr, indent=4, cls=AttrEncoder)
self.assertEqual(json_attr, '"foo"')
def test_attr_json_encoder_no_attr(self):
"""Test AttrEncoder if no Attr is passed"""
class Test:
"""Non Attr class"""
with self.assertRaises(TypeError):
test_obj = Test()
dumps(test_obj, indent=4, cls=AttrEncoder)
@mock.patch.dict(environ, check_deprecations_env_vars)
def test_check_deprecations(self):
"""Test config key re-write for deprecated env vars"""
config = ConfigLoader()
config.update_from_env()
config.check_deprecations()
self.assertEqual(config.get("redis.broker_url", UNSET), UNSET)
self.assertEqual(config.get("redis.broker_transport_options", UNSET), UNSET)
self.assertEqual(config.get("redis.cache_timeout", UNSET), UNSET)
self.assertEqual(config.get("redis.cache_timeout_flows", UNSET), UNSET)
self.assertEqual(config.get("redis.cache_timeout_policies", UNSET), UNSET)
self.assertEqual(config.get("redis.cache_timeout_reputation", UNSET), UNSET)
self.assertEqual(config.get("broker.url"), "redis://myredis:8327/43")
self.assertEqual(config.get("broker.transport_options"), "bWFzdGVybmFtZT1teW1hc3Rlcg==")
self.assertEqual(config.get("cache.timeout"), "124s")
self.assertEqual(config.get("cache.timeout_flows"), "32m")
self.assertEqual(config.get("cache.timeout_policies"), "3920ns")
self.assertEqual(config.get("cache.timeout_reputation"), "298382us")

View file

@ -93,7 +93,7 @@ class OutpostConsumer(AuthJsonConsumer):
expected=self.outpost.config.kubernetes_replicas,
).dec()
def receive_json(self, content: Data):
def receive_json(self, content: Data, **kwargs):
msg = from_dict(WebsocketMessage, content)
uid = msg.args.get("uuid", self.channel_name)
self.last_uid = uid

View file

@ -20,7 +20,7 @@ from authentik.policies.types import CACHE_PREFIX, PolicyRequest, PolicyResult
LOGGER = get_logger()
FORK_CTX = get_context("fork")
CACHE_TIMEOUT = CONFIG.get_int("redis.cache_timeout_policies")
CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_policies")
PROCESS_CLASS = FORK_CTX.Process

View file

@ -13,7 +13,7 @@ from authentik.policies.reputation.tasks import save_reputation
from authentik.stages.identification.signals import identification_failed
LOGGER = get_logger()
CACHE_TIMEOUT = CONFIG.get_int("redis.cache_timeout_reputation")
CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_reputation")
def update_score(request: HttpRequest, identifier: str, amount: int):

View file

@ -1,5 +1,4 @@
"""root settings for authentik"""
import importlib
import os
from hashlib import sha512
@ -195,8 +194,8 @@ _redis_url = (
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": f"{_redis_url}/{CONFIG.get('redis.db')}",
"TIMEOUT": CONFIG.get_int("redis.cache_timeout", 300),
"LOCATION": CONFIG.get("cache.url") or f"{_redis_url}/{CONFIG.get('redis.db')}",
"TIMEOUT": CONFIG.get_int("cache.timeout", 300),
"OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient"},
"KEY_PREFIX": "authentik_cache",
}
@ -256,7 +255,7 @@ CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
"CONFIG": {
"hosts": [f"{_redis_url}/{CONFIG.get('redis.db')}"],
"hosts": [CONFIG.get("channel.url", f"{_redis_url}/{CONFIG.get('redis.db')}")],
"prefix": "authentik_channels_",
},
},
@ -349,8 +348,11 @@ CELERY = {
},
"task_create_missing_queues": True,
"task_default_queue": "authentik",
"broker_url": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
"result_backend": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
"broker_url": CONFIG.get("broker.url")
or f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
"broker_transport_options": CONFIG.get_dict_from_b64_json("broker.transport_options"),
"result_backend": CONFIG.get("result_backend.url")
or f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
}
# Sentry integration

View file

@ -27,14 +27,11 @@ type Config struct {
type RedisConfig struct {
Host string `yaml:"host" env:"AUTHENTIK_REDIS__HOST"`
Port int `yaml:"port" env:"AUTHENTIK_REDIS__PORT"`
DB int `yaml:"db" env:"AUTHENTIK_REDIS__DB"`
Username string `yaml:"username" env:"AUTHENTIK_REDIS__USERNAME"`
Password string `yaml:"password" env:"AUTHENTIK_REDIS__PASSWORD"`
TLS bool `yaml:"tls" env:"AUTHENTIK_REDIS__TLS"`
TLSReqs string `yaml:"tls_reqs" env:"AUTHENTIK_REDIS__TLS_REQS"`
DB int `yaml:"cache_db" env:"AUTHENTIK_REDIS__DB"`
CacheTimeout int `yaml:"cache_timeout" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT"`
CacheTimeoutFlows int `yaml:"cache_timeout_flows" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT_FLOWS"`
CacheTimeoutPolicies int `yaml:"cache_timeout_policies" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT_POLICIES"`
CacheTimeoutReputation int `yaml:"cache_timeout_reputation" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT_REPUTATION"`
}
type ListenConfig struct {

View file

@ -71,16 +71,38 @@ To check if your config has been applied correctly, you can run the following co
## Redis Settings
- `AUTHENTIK_REDIS__HOST`: Hostname of your Redis Server
- `AUTHENTIK_REDIS__PORT`: Redis port, defaults to 6379
- `AUTHENTIK_REDIS__PASSWORD`: Password for your Redis Server
- `AUTHENTIK_REDIS__TLS`: Use TLS to connect to Redis, defaults to false
- `AUTHENTIK_REDIS__TLS_REQS`: Redis TLS requirements, defaults to "none"
- `AUTHENTIK_REDIS__DB`: Database, defaults to 0
- `AUTHENTIK_REDIS__CACHE_TIMEOUT`: Timeout for cached data until it expires in seconds, defaults to 300
- `AUTHENTIK_REDIS__CACHE_TIMEOUT_FLOWS`: Timeout for cached flow plans until they expire in seconds, defaults to 300
- `AUTHENTIK_REDIS__CACHE_TIMEOUT_POLICIES`: Timeout for cached policies until they expire in seconds, defaults to 300
- `AUTHENTIK_REDIS__CACHE_TIMEOUT_REPUTATION`: Timeout for cached reputation until they expire in seconds, defaults to 300
- `AUTHENTIK_REDIS__HOST`: Redis server host when not using configuration URL
- `AUTHENTIK_REDIS__PORT`: Redis server port when not using configuration URL
- `AUTHENTIK_REDIS__DB`: Redis server database when not using configuration URL
- `AUTHENTIK_REDIS__USERNAME`: Redis server username when not using configuration URL
- `AUTHENTIK_REDIS__PASSWORD`: Redis server password when not using configuration URL
- `AUTHENTIK_REDIS__TLS`: Redis server connection using TLS when not using configuration URL
- `AUTHENTIK_REDIS__TLS_REQS`: Redis server TLS connection requirements when not using configuration URL
## Result Backend Settings
- `AUTHENTIK_RESULT_BACKEND__URL`: Result backend configuration URL, uses [the Redis Settings](#redis-settings) by default
## Cache Settings
- `AUTHENTIK_CACHE__URL`: Cache configuration URL, uses [the Redis Settings](#redis-settings) by default
- `AUTHENTIK_CACHE__TIMEOUT`: Timeout for cached data until it expires in seconds, defaults to 300
- `AUTHENTIK_CACHE__TIMEOUT_FLOWS`: Timeout for cached flow plans until they expire in seconds, defaults to 300
- `AUTHENTIK_CACHE__TIMEOUT_POLICIES`: Timeout for cached policies until they expire in seconds, defaults to 300
- `AUTHENTIK_CACHE__TIMEOUT_REPUTATION`: Timeout for cached reputation until they expire in seconds, defaults to 300
:::info
`AUTHENTIK_CACHE__TIMEOUT_REPUTATION` only applies to the cache expiry, see [`AUTHENTIK_REPUTATION__EXPIRY`](#authentik_reputation__expiry) to control how long reputation is persisted for.
:::
## Channel Layer Settings (inter-instance communication)
- `AUTHENTIK_CHANNEL__URL`: Channel layers configuration URL, uses [the Redis Settings](#redis-settings) by default
## Broker Settings
- `AUTHENTIK_BROKER__URL`: Broker configuration URL, defaults to Redis using [the respective settings](#redis-settings)
- `AUTHENTIK_BROKER__TRANSPORT_OPTIONS`: Base64 encoded broker transport options
:::info
`AUTHENTIK_REDIS__CACHE_TIMEOUT_REPUTATION` only applies to the cache expiry, see [`AUTHENTIK_REPUTATION__EXPIRY`](#authentik_reputation__expiry) to control how long reputation is persisted for.