update devices when there are a new action
This commit is contained in:
parent
e4cf093b4b
commit
76806299cd
|
@ -1,4 +1,5 @@
|
||||||
from distutils.version import StrictVersion
|
from distutils.version import StrictVersion
|
||||||
|
from datetime import datetime
|
||||||
from typing import List
|
from typing import List
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
@ -37,6 +38,8 @@ class ActionView(View):
|
||||||
return self.transfer_ownership()
|
return self.transfer_ownership()
|
||||||
Model = db.Model._decl_class_registry.data[json['type']]()
|
Model = db.Model._decl_class_registry.data[json['type']]()
|
||||||
action = Model(**a)
|
action = Model(**a)
|
||||||
|
for d in action.devices:
|
||||||
|
d.updated = datetime.now()
|
||||||
db.session.add(action)
|
db.session.add(action)
|
||||||
db.session().final_flush()
|
db.session().final_flush()
|
||||||
ret = self.schema.jsonify(action)
|
ret = self.schema.jsonify(action)
|
||||||
|
@ -109,6 +112,8 @@ class ActionView(View):
|
||||||
if snapshot.device.hid is None:
|
if snapshot.device.hid is None:
|
||||||
snapshot.severity = Severity.Warning
|
snapshot.severity = Severity.Warning
|
||||||
db.session.add(snapshot)
|
db.session.add(snapshot)
|
||||||
|
for action in snapshot.actions:
|
||||||
|
action.device.updated = datetime.now()
|
||||||
db.session().final_flush()
|
db.session().final_flush()
|
||||||
ret = self.schema.jsonify(snapshot) # transform it back
|
ret = self.schema.jsonify(snapshot) # transform it back
|
||||||
ret.status_code = 201
|
ret.status_code = 201
|
||||||
|
|
|
@ -238,6 +238,8 @@ def test_generic_action(action_model_state: Tuple[models.Action, states.Trading]
|
||||||
device, _ = user.get(res=Device, item=snapshot['device']['id'])
|
device, _ = user.get(res=Device, item=snapshot['device']['id'])
|
||||||
assert device['actions'][-1]['id'] == action['id']
|
assert device['actions'][-1]['id'] == action['id']
|
||||||
assert device['physical'] == state.name
|
assert device['physical'] == state.name
|
||||||
|
# Check if the update of device is changed
|
||||||
|
assert snapshot['device']['updated'] != device['updated']
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
|
|
|
@ -99,7 +99,7 @@ def test_snapshot_post(user: UserClient):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
def test_snapshot_update_timeupdated(user: UserClient):
|
def test_snapshot_update_timefield_updated(user: UserClient):
|
||||||
"""
|
"""
|
||||||
Tests for check if one computer have the time mark updated when one component of it is updated
|
Tests for check if one computer have the time mark updated when one component of it is updated
|
||||||
"""
|
"""
|
||||||
|
@ -110,9 +110,10 @@ def test_snapshot_update_timeupdated(user: UserClient):
|
||||||
RateComputer.t),
|
RateComputer.t),
|
||||||
perform_second_snapshot=False)
|
perform_second_snapshot=False)
|
||||||
computer2 = file('2-second-device-with-components-of-first.snapshot')
|
computer2 = file('2-second-device-with-components-of-first.snapshot')
|
||||||
|
pc1_id = snapshot['device']['id']
|
||||||
|
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
||||||
snapshot_and_check(user, computer2, action_types=('Remove', 'RateComputer'),
|
snapshot_and_check(user, computer2, action_types=('Remove', 'RateComputer'),
|
||||||
perform_second_snapshot=False)
|
perform_second_snapshot=False)
|
||||||
# import pdb; pdb.set_trace()
|
|
||||||
pc1_id = snapshot['device']['id']
|
pc1_id = snapshot['device']['id']
|
||||||
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
||||||
assert pc1['updated'] != snapshot['updated']
|
assert pc1['updated'] != snapshot['updated']
|
||||||
|
@ -144,6 +145,7 @@ def test_snapshot_component_add_remove(user: UserClient):
|
||||||
perform_second_snapshot=False)
|
perform_second_snapshot=False)
|
||||||
pc1_id = snapshot1['device']['id']
|
pc1_id = snapshot1['device']['id']
|
||||||
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
||||||
|
update1_pc1 = pc1['updated']
|
||||||
# Parent contains components
|
# Parent contains components
|
||||||
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c2s', 'p1c3s')
|
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c2s', 'p1c3s')
|
||||||
# Components contain parent
|
# Components contain parent
|
||||||
|
@ -166,6 +168,10 @@ def test_snapshot_component_add_remove(user: UserClient):
|
||||||
pc2_id = snapshot2['device']['id']
|
pc2_id = snapshot2['device']['id']
|
||||||
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
||||||
pc2, _ = user.get(res=m.Device, item=pc2_id)
|
pc2, _ = user.get(res=m.Device, item=pc2_id)
|
||||||
|
# Check if the update_timestamp is updated
|
||||||
|
update1_pc2 = pc2['updated']
|
||||||
|
update2_pc1 = pc1['updated']
|
||||||
|
assert update1_pc1 != update2_pc1
|
||||||
# PC1
|
# PC1
|
||||||
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c3s')
|
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c3s')
|
||||||
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
||||||
|
@ -188,6 +194,12 @@ def test_snapshot_component_add_remove(user: UserClient):
|
||||||
snapshot_and_check(user, s3, ('Remove', 'RateComputer'), perform_second_snapshot=False)
|
snapshot_and_check(user, s3, ('Remove', 'RateComputer'), perform_second_snapshot=False)
|
||||||
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
||||||
pc2, _ = user.get(res=m.Device, item=pc2_id)
|
pc2, _ = user.get(res=m.Device, item=pc2_id)
|
||||||
|
# Check if the update_timestamp is updated
|
||||||
|
update2_pc2 = pc2['updated']
|
||||||
|
update3_pc1 = pc1['updated']
|
||||||
|
assert not update3_pc1 in [update1_pc1, update2_pc1]
|
||||||
|
assert update1_pc2 != update2_pc2
|
||||||
|
|
||||||
# PC1
|
# PC1
|
||||||
assert {c['serialNumber'] for c in pc1['components']} == {'p1c2s', 'p1c3s'}
|
assert {c['serialNumber'] for c in pc1['components']} == {'p1c2s', 'p1c3s'}
|
||||||
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
||||||
|
@ -228,6 +240,11 @@ def test_snapshot_component_add_remove(user: UserClient):
|
||||||
snapshot_and_check(user, s4, ('RateComputer',), perform_second_snapshot=False)
|
snapshot_and_check(user, s4, ('RateComputer',), perform_second_snapshot=False)
|
||||||
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
pc1, _ = user.get(res=m.Device, item=pc1_id)
|
||||||
pc2, _ = user.get(res=m.Device, item=pc2_id)
|
pc2, _ = user.get(res=m.Device, item=pc2_id)
|
||||||
|
# Check if the update_timestamp is updated
|
||||||
|
update3_pc2 = pc2['updated']
|
||||||
|
update4_pc1 = pc1['updated']
|
||||||
|
assert not update4_pc1 in [update1_pc1, update2_pc1, update3_pc1]
|
||||||
|
assert update3_pc2 == update2_pc2
|
||||||
# PC 0: p1c3s, p1c4s. PC1: p2c1s
|
# PC 0: p1c3s, p1c4s. PC1: p2c1s
|
||||||
assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s', 'p1c4s'}
|
assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s', 'p1c4s'}
|
||||||
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
||||||
|
|
Reference in a new issue