policy(minor): add unittests for policy engine
This commit is contained in:
parent
a691ee529c
commit
65a065c4ee
|
@ -21,6 +21,7 @@ exclude_lines =
|
||||||
def __str__
|
def __str__
|
||||||
def __repr__
|
def __repr__
|
||||||
if self\.debug
|
if self\.debug
|
||||||
|
if TYPE_CHECKING
|
||||||
|
|
||||||
# Don't complain if tests don't hit defensive assertion code:
|
# Don't complain if tests don't hit defensive assertion code:
|
||||||
raise AssertionError
|
raise AssertionError
|
||||||
|
|
|
@ -80,9 +80,9 @@ pylint:
|
||||||
- redis:latest
|
- redis:latest
|
||||||
coverage:
|
coverage:
|
||||||
script:
|
script:
|
||||||
- coverage run manage.py test
|
- coverage run --concurrency=multiprocessing manage.py test
|
||||||
|
- coverage combine
|
||||||
- coverage report
|
- coverage report
|
||||||
- coverage html
|
|
||||||
stage: test
|
stage: test
|
||||||
services:
|
services:
|
||||||
- postgres:latest
|
- postgres:latest
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
"""passbook policy engine"""
|
"""passbook policy engine"""
|
||||||
from multiprocessing import Pipe
|
from multiprocessing import Pipe
|
||||||
from multiprocessing.connection import Connection
|
from multiprocessing.connection import Connection
|
||||||
from typing import List, Tuple
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.http import HttpRequest
|
from django.http import HttpRequest
|
||||||
|
@ -19,13 +19,14 @@ class PolicyProcessInfo:
|
||||||
|
|
||||||
process: PolicyProcess
|
process: PolicyProcess
|
||||||
connection: Connection
|
connection: Connection
|
||||||
result: PolicyResult
|
result: Optional[PolicyResult]
|
||||||
policy: Policy
|
policy: Policy
|
||||||
|
|
||||||
def __init__(self, process: PolicyProcess, connection: Connection, policy: Policy):
|
def __init__(self, process: PolicyProcess, connection: Connection, policy: Policy):
|
||||||
self.process = process
|
self.process = process
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.policy = policy
|
self.policy = policy
|
||||||
|
self.result = None
|
||||||
|
|
||||||
class PolicyEngine:
|
class PolicyEngine:
|
||||||
"""Orchestrate policy checking, launch tasks and return result"""
|
"""Orchestrate policy checking, launch tasks and return result"""
|
||||||
|
|
0
passbook/policies/tests/__init__.py
Normal file
0
passbook/policies/tests/__init__.py
Normal file
64
passbook/policies/tests/test_engine.py
Normal file
64
passbook/policies/tests/test_engine.py
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
"""policy engine tests"""
|
||||||
|
from django.core.cache import cache
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from passbook.core.models import DebugPolicy, Policy, User
|
||||||
|
from passbook.policies.engine import PolicyEngine
|
||||||
|
|
||||||
|
|
||||||
|
class PolicyTestEngine(TestCase):
|
||||||
|
"""PolicyEngine tests"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
cache.clear()
|
||||||
|
self.user = User.objects.create_user(
|
||||||
|
username="policyuser")
|
||||||
|
self.policy_false = DebugPolicy.objects.create(
|
||||||
|
result=False,
|
||||||
|
wait_min=0,
|
||||||
|
wait_max=1)
|
||||||
|
self.policy_true = DebugPolicy.objects.create(
|
||||||
|
result=True,
|
||||||
|
wait_min=0,
|
||||||
|
wait_max=1)
|
||||||
|
self.policy_negate = DebugPolicy.objects.create(
|
||||||
|
negate=True,
|
||||||
|
result=True,
|
||||||
|
wait_min=0,
|
||||||
|
wait_max=1)
|
||||||
|
self.policy_raises = Policy.objects.create(
|
||||||
|
name='raises')
|
||||||
|
|
||||||
|
def test_engine_empty(self):
|
||||||
|
"""Ensure empty policy list passes"""
|
||||||
|
engine = PolicyEngine([], self.user)
|
||||||
|
self.assertEqual(engine.build().passing, True)
|
||||||
|
|
||||||
|
def test_engine_without_user(self):
|
||||||
|
"""Ensure engine raises error if no user is set"""
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
PolicyEngine([]).build()
|
||||||
|
|
||||||
|
def test_engine(self):
|
||||||
|
"""Ensure all policies passes (Mix of false and true -> false)"""
|
||||||
|
engine = PolicyEngine(DebugPolicy.objects.filter(negate__exact=False), self.user)
|
||||||
|
self.assertEqual(engine.build().passing, False)
|
||||||
|
|
||||||
|
def test_engine_negate(self):
|
||||||
|
"""Test negate flag"""
|
||||||
|
engine = PolicyEngine(DebugPolicy.objects.filter(negate__exact=True), self.user)
|
||||||
|
self.assertEqual(engine.build().passing, False)
|
||||||
|
|
||||||
|
def test_engine_policy_error(self):
|
||||||
|
"""Test negate flag"""
|
||||||
|
engine = PolicyEngine(Policy.objects.filter(name='raises'), self.user)
|
||||||
|
self.assertEqual(engine.build().passing, False)
|
||||||
|
|
||||||
|
def test_engine_cache(self):
|
||||||
|
"""Ensure empty policy list passes"""
|
||||||
|
engine = PolicyEngine(DebugPolicy.objects.filter(negate__exact=False), self.user)
|
||||||
|
self.assertEqual(len(cache.keys('policy_*')), 0)
|
||||||
|
self.assertEqual(engine.build().passing, False)
|
||||||
|
self.assertEqual(len(cache.keys('policy_*')), 2)
|
||||||
|
self.assertEqual(engine.build().passing, False)
|
||||||
|
self.assertEqual(len(cache.keys('policy_*')), 2)
|
Reference in a new issue