blueprints: fixed bug causing filtering with an empty query (#4106)

* Fixed bug causing filtering with an empty query

Fixed bug allowing blueprint import to filter for existing models using an empty query.

The code only checks if the `identifiers` dict is empty, but `__query_from_identifier` skips identifier member values of type `dict` or keys == `pk`, so it is possible to produce an empty query if an `identifier` consists of just `dict` type members or "pk" key. 

Signed-off-by: sdimovv <36302090+sdimovv@users.noreply.github.com>

* Added test case

* Added support for using dict fields as blueprint entry identifiers

* Disabled pylint too-many-locals for _validate_single

Signed-off-by: sdimovv <36302090+sdimovv@users.noreply.github.com>
This commit is contained in:
sdimovv 2022-12-06 11:06:25 +00:00 committed by GitHub
parent 5ef5c70490
commit 8d13235b74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 7 deletions

View File

@ -27,6 +27,61 @@ class TestBlueprintsV1(TransactionTestCase):
)
)
self.assertFalse(importer.validate()[0])
importer = Importer(
(
'{"version": 1, "entries": [{"attrs": {"name": "test"}, '
'"identifiers": {}, '
'"model": "authentik_core.Group"}]}'
)
)
self.assertFalse(importer.validate()[0])
def test_validated_import_dict_identifiers(self):
"""Test importing blueprints with dict identifiers."""
Group.objects.filter(name__istartswith="test").delete()
Group.objects.create(
name="test1",
attributes={
"key": ["value"],
"other_key": ["a_value", "other_value"],
},
)
Group.objects.create(
name="test2",
attributes={
"key": ["value"],
"other_key": ["diff_value", "other_diff_value"],
},
)
importer = Importer(
(
'{"version": 1, "entries": [{"attrs": {"name": "test999", "attributes": '
'{"key": ["updated_value"]}}, "identifiers": {"attributes": {"other_key": '
'["other_value"]}}, "model": "authentik_core.Group"}]}'
)
)
self.assertTrue(importer.validate()[0])
self.assertTrue(importer.apply())
self.assertTrue(
Group.objects.filter(
name="test2",
attributes={
"key": ["value"],
"other_key": ["diff_value", "other_diff_value"],
},
)
)
self.assertTrue(
Group.objects.filter(
name="test999",
# All attributes used as identifiers are kept and merged with the
# new attributes declared in the blueprint
attributes={"key": ["updated_value"], "other_key": ["other_value"]},
)
)
self.assertFalse(Group.objects.filter(name="test1"))
def test_export_validate_import(self):
"""Test export and validate it"""

View File

@ -132,13 +132,16 @@ class Importer:
main_query = Q(pk=attrs["pk"])
sub_query = Q()
for identifier, value in attrs.items():
if isinstance(value, dict):
continue
if identifier == "pk":
continue
sub_query &= Q(**{identifier: value})
if isinstance(value, dict):
sub_query &= Q(**{f"{identifier}__contains": value})
else:
sub_query &= Q(**{identifier: value})
return main_query | sub_query
# pylint: disable-msg=too-many-locals
def _validate_single(self, entry: BlueprintEntry) -> Optional[BaseSerializer]:
"""Validate a single entry"""
model_app_label, model_name = entry.model.split(".")
@ -156,8 +159,6 @@ class Importer:
f"Serializer errors {serializer.errors}", serializer_errors=serializer.errors
) from exc
return serializer
if entry.identifiers == {}:
raise EntryInvalidError("No identifiers")
# If we try to validate without referencing a possible instance
# we'll get a duplicate error, hence we load the model here and return
@ -169,7 +170,12 @@ class Importer:
if isinstance(value, dict) and "pk" in value:
del updated_identifiers[key]
updated_identifiers[f"{key}"] = value["pk"]
existing_models = model.objects.filter(self.__query_from_identifier(updated_identifiers))
query = self.__query_from_identifier(updated_identifiers)
if not query:
raise EntryInvalidError("No or invalid identifiers")
existing_models = model.objects.filter(query)
serializer_kwargs = {}
model_instance = existing_models.first()
@ -198,7 +204,7 @@ class Importer:
full_data = self.__update_pks_for_attrs(entry.get_attrs(self.__import))
except ValueError as exc:
raise EntryInvalidError(exc) from exc
full_data.update(updated_identifiers)
always_merger.merge(full_data, updated_identifiers)
serializer_kwargs["data"] = full_data
serializer: Serializer = model().serializer(**serializer_kwargs)