start json parser
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
parent
d811aabd38
commit
e2d18f6011
|
@ -15,6 +15,7 @@ from authentik.api.decorators import permission_required
|
|||
from authentik.blueprints.models import BlueprintInstance
|
||||
from authentik.blueprints.v1.common import Blueprint, BlueprintEntry, BlueprintEntryDesiredState
|
||||
from authentik.blueprints.v1.importer import StringImporter, is_model_allowed
|
||||
from authentik.blueprints.v1.json_parser import BlueprintJSONParser
|
||||
from authentik.blueprints.v1.oci import OCI_PREFIX
|
||||
from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_find_dict
|
||||
from authentik.core.api.used_by import UsedByMixin
|
||||
|
@ -165,6 +166,7 @@ class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet):
|
|||
filter_backends=[],
|
||||
methods=["PUT"],
|
||||
permission_classes=[IsAdminUser],
|
||||
parser_classes=[BlueprintJSONParser],
|
||||
)
|
||||
def procedural(self, request: Request) -> Response:
|
||||
blueprint = Blueprint()
|
||||
|
|
|
@ -555,21 +555,30 @@ class BlueprintDumper(SafeDumper):
|
|||
return super().represent(data)
|
||||
|
||||
|
||||
def yaml_key_map() -> dict[str, type[YAMLTag]]:
|
||||
"""get a dict of all yaml tags, key being the actual tag
|
||||
and the value is the class"""
|
||||
return {
|
||||
"!KeyOf": KeyOf,
|
||||
"!Find": Find,
|
||||
"!Context": Context,
|
||||
"!Format": Format,
|
||||
"!Condition": Condition,
|
||||
"!If": If,
|
||||
"!Env": Env,
|
||||
"!Enumerate": Enumerate,
|
||||
"!Value": Value,
|
||||
"!Index": Index,
|
||||
}
|
||||
|
||||
|
||||
class BlueprintLoader(SafeLoader):
|
||||
"""Loader for blueprints with custom tag support"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.add_constructor("!KeyOf", KeyOf)
|
||||
self.add_constructor("!Find", Find)
|
||||
self.add_constructor("!Context", Context)
|
||||
self.add_constructor("!Format", Format)
|
||||
self.add_constructor("!Condition", Condition)
|
||||
self.add_constructor("!If", If)
|
||||
self.add_constructor("!Env", Env)
|
||||
self.add_constructor("!Enumerate", Enumerate)
|
||||
self.add_constructor("!Value", Value)
|
||||
self.add_constructor("!Index", Index)
|
||||
for tag, cls in yaml_key_map().items():
|
||||
self.add_constructor(tag, cls)
|
||||
|
||||
|
||||
class EntryInvalidError(SentryIgnoredException):
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
"""Blueprint JSON decoder"""
|
||||
from rest_framework.parsers import JSONParser
|
||||
from json import JSONDecoder
|
||||
from typing import Any
|
||||
|
||||
from yaml import ScalarNode, SequenceNode
|
||||
|
||||
from authentik.blueprints.v1.common import BlueprintLoader, YAMLTag, yaml_key_map
|
||||
|
||||
TAG_KEY = "goauthentik.io/yaml-key"
|
||||
ARGS_KEY = "args"
|
||||
|
||||
|
||||
class BlueprintJSONDecoder(JSONDecoder):
|
||||
"""Blueprint JSON decoder, allows using tag logic
|
||||
when using JSON data (e.g. through the API)"""
|
||||
|
||||
dummy_loader: BlueprintLoader
|
||||
tag_map: dict[str, type[YAMLTag]]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, object_hook=self.object_hook, **kwargs)
|
||||
self.dummy_loader = BlueprintLoader("")
|
||||
self.tag_map = yaml_key_map()
|
||||
|
||||
def parse_yaml_tag(self, data: dict) -> YAMLTag | None:
|
||||
"""parse the tag"""
|
||||
yaml_tag = data.get(TAG_KEY)
|
||||
tag_cls = self.tag_map.get(yaml_tag)
|
||||
if not tag_cls:
|
||||
return None
|
||||
return tag_cls
|
||||
|
||||
def parse_yaml_tag_args(self, data: Any) -> Any:
|
||||
"""Parse args into their yaml equivalent"""
|
||||
if data:
|
||||
if isinstance(data, list):
|
||||
return SequenceNode(
|
||||
"tag:yaml.org,2002:seq", [self.parse_yaml_tag_args(x) for x in data]
|
||||
)
|
||||
if isinstance(data, str):
|
||||
return ScalarNode("tag:yaml.org,2002:str", data)
|
||||
if isinstance(data, int):
|
||||
return ScalarNode("tag:yaml.org,2002:int", data)
|
||||
if isinstance(data, float):
|
||||
return ScalarNode("tag:yaml.org,2002:float", data)
|
||||
return None
|
||||
|
||||
def object_hook(self, data: dict) -> dict | Any:
|
||||
if TAG_KEY not in data:
|
||||
return data
|
||||
tag_cls = self.parse_yaml_tag(data)
|
||||
if not tag_cls:
|
||||
return data
|
||||
tag_args = self.parse_yaml_tag_args(data.get(ARGS_KEY, []))
|
||||
return tag_cls(self.dummy_loader, tag_args)
|
||||
|
||||
|
||||
class BlueprintJSONParser(JSONParser):
|
||||
"""Wrapper around the rest_framework JSON parser that uses the `BlueprintJSONDecoder`"""
|
||||
|
||||
def parse(self, stream, media_type=None, parser_context=None):
|
||||
parser_context = parser_context or {}
|
||||
parser_context["cls"] = BlueprintJSONDecoder
|
||||
return super().parse(stream, media_type, parser_context)
|
Reference in New Issue