diff --git a/ereuse_devicehub/resources/device/models.py b/ereuse_devicehub/resources/device/models.py index 028a8bf6..682327bb 100644 --- a/ereuse_devicehub/resources/device/models.py +++ b/ereuse_devicehub/resources/device/models.py @@ -28,7 +28,7 @@ from teal.resource import url_for_resource from ereuse_devicehub.db import db from ereuse_devicehub.resources.enums import BatteryTechnology, CameraFacing, ComputerChassis, \ DataStorageInterface, DisplayTech, PrinterTechnology, RamFormat, RamInterface, Severity, TransferState -from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing +from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing, listener_reset_field_updated_in_actual_time from ereuse_devicehub.resources.user.models import User @@ -881,3 +881,6 @@ class Manufacturer(db.Model): 'COPY common.manufacturer FROM STDIN (FORMAT csv)', f ) + + +listener_reset_field_updated_in_actual_time(Device) diff --git a/ereuse_devicehub/resources/models.py b/ereuse_devicehub/resources/models.py index 721e9258..e079269f 100644 --- a/ereuse_devicehub/resources/models.py +++ b/ereuse_devicehub/resources/models.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from flask_sqlalchemy import event from ereuse_devicehub.db import db @@ -34,3 +35,12 @@ class Thing(db.Model): # to be able to use sorted containers self.created = kwargs.get('created', datetime.now(timezone.utc)) super().__init__(**kwargs) + + +def update_object_timestamp(mapper, connection, thing_obj): + """ This function update the stamptime of field updated """ + thing_obj.updated = datetime.now(timezone.utc) + +def listener_reset_field_updated_in_actual_time(thing_obj): + """ This function launch a event than listen like a signal when some object is saved """ + event.listen(thing_obj, 'before_update', update_object_timestamp, propagate=True) diff --git a/tests/test_action.py b/tests/test_action.py index ba99996d..ba095685 100644 --- a/tests/test_action.py +++ b/tests/test_action.py @@ -238,6 +238,8 @@ def test_generic_action(action_model_state: Tuple[models.Action, states.Trading] device, _ = user.get(res=Device, item=snapshot['device']['id']) assert device['actions'][-1]['id'] == action['id'] assert device['physical'] == state.name + # Check if the update of device is changed + assert snapshot['device']['updated'] != device['updated'] @pytest.mark.mvp diff --git a/tests/test_device.py b/tests/test_device.py index 685b08b0..60389d8f 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -53,7 +53,7 @@ def test_device_model(): # Removing a component from pc doesn't delete the component pc.components.remove(net) db.session.commit() - pc = d.Device.query.first() # this is the same as querying for d.Desktop directly + pc = d.Device.query.filter_by(id=pc.id).first() # this is the same as querying for d.Desktop directly assert pc.components == {graphic} network_adapter = d.NetworkAdapter.query.one() assert network_adapter not in pc.components diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py index 42792c2f..ead63703 100644 --- a/tests/test_snapshot.py +++ b/tests/test_snapshot.py @@ -103,6 +103,25 @@ def test_snapshot_post(user: UserClient): assert rate['snapshot']['id'] == snapshot['id'] +@pytest.mark.mvp +def test_snapshot_update_timefield_updated(user: UserClient): + """ + Tests for check if one computer have the time mark updated when one component of it is updated + """ + computer1 = file('1-device-with-components.snapshot') + snapshot = snapshot_and_check(user, + computer1, + action_types=(BenchmarkProcessor.t, + RateComputer.t), + perform_second_snapshot=False) + computer2 = file('2-second-device-with-components-of-first.snapshot') + snapshot_and_check(user, computer2, action_types=('Remove', 'RateComputer'), + perform_second_snapshot=False) + pc1_id = snapshot['device']['id'] + pc1, _ = user.get(res=m.Device, item=pc1_id) + assert pc1['updated'] != snapshot['device']['updated'] + + @pytest.mark.mvp def test_snapshot_component_add_remove(user: UserClient): """Tests adding and removing components and some don't generate HID. @@ -129,6 +148,7 @@ def test_snapshot_component_add_remove(user: UserClient): perform_second_snapshot=False) pc1_id = snapshot1['device']['id'] pc1, _ = user.get(res=m.Device, item=pc1_id) + update1_pc1 = pc1['updated'] # Parent contains components assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c2s', 'p1c3s') # Components contain parent @@ -151,6 +171,10 @@ def test_snapshot_component_add_remove(user: UserClient): pc2_id = snapshot2['device']['id'] pc1, _ = user.get(res=m.Device, item=pc1_id) pc2, _ = user.get(res=m.Device, item=pc2_id) + # Check if the update_timestamp is updated + update1_pc2 = pc2['updated'] + update2_pc1 = pc1['updated'] + assert update1_pc1 != update2_pc1 # PC1 assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c3s') assert all(c['parent'] == pc1_id for c in pc1['components']) @@ -173,6 +197,12 @@ def test_snapshot_component_add_remove(user: UserClient): snapshot_and_check(user, s3, ('Remove', 'RateComputer'), perform_second_snapshot=False) pc1, _ = user.get(res=m.Device, item=pc1_id) pc2, _ = user.get(res=m.Device, item=pc2_id) + # Check if the update_timestamp is updated + update2_pc2 = pc2['updated'] + update3_pc1 = pc1['updated'] + assert not update3_pc1 in [update1_pc1, update2_pc1] + assert update1_pc2 != update2_pc2 + # PC1 assert {c['serialNumber'] for c in pc1['components']} == {'p1c2s', 'p1c3s'} assert all(c['parent'] == pc1_id for c in pc1['components']) @@ -213,6 +243,11 @@ def test_snapshot_component_add_remove(user: UserClient): snapshot_and_check(user, s4, ('RateComputer',), perform_second_snapshot=False) pc1, _ = user.get(res=m.Device, item=pc1_id) pc2, _ = user.get(res=m.Device, item=pc2_id) + # Check if the update_timestamp is updated + update3_pc2 = pc2['updated'] + update4_pc1 = pc1['updated'] + assert not update4_pc1 in [update1_pc1, update2_pc1, update3_pc1] + assert update3_pc2 == update2_pc2 # PC 0: p1c3s, p1c4s. PC1: p2c1s assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s', 'p1c4s'} assert all(c['parent'] == pc1_id for c in pc1['components'])