Merge pull request #428 from eReuse/bugfix/2476-onlyhardrive

only sync DataStorages
This commit is contained in:
cayop 2023-02-13 12:15:12 +01:00 committed by GitHub
commit c58dc367e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 60 deletions

View File

@ -16,19 +16,20 @@ from ereuse_devicehub.resources.action.models import Remove
from ereuse_devicehub.resources.device.models import ( from ereuse_devicehub.resources.device.models import (
Component, Component,
Computer, Computer,
DataStorage,
Device, Device,
Placeholder, Placeholder,
) )
from ereuse_devicehub.resources.tag.model import Tag from ereuse_devicehub.resources.tag.model import Tag
DEVICES_ALLOW_DUPLICITY = [ # DEVICES_ALLOW_DUPLICITY = [
'RamModule', # 'RamModule',
'Display', # 'Display',
'SoundCard', # 'SoundCard',
'Battery', # 'Battery',
'Camera', # 'Camera',
'GraphicCard', # 'GraphicCard',
] # ]
class Sync: class Sync:
@ -119,7 +120,7 @@ class Sync:
""" """
assert inspect(component).transient, 'Component should not be synced from DB' assert inspect(component).transient, 'Component should not be synced from DB'
# if not is a DataStorage, then need build a new one # if not is a DataStorage, then need build a new one
if component.t in DEVICES_ALLOW_DUPLICITY: if not isinstance(component, DataStorage):
db.session.add(component) db.session.add(component)
is_new = True is_new = True
return component, is_new return component, is_new

View File

@ -2941,7 +2941,7 @@ def test_delete_devices_check_sync(user: UserClient):
in [y.device.id for y in x.actions if hasattr(y, 'device')] in [y.device.id for y in x.actions if hasattr(y, 'device')]
] ]
) )
== 2 == 0
) )

View File

@ -2718,7 +2718,7 @@ def test_unreliable_device(user3: UserClientFlask):
snapshots = Snapshot.query.all() snapshots = Snapshot.query.all()
assert snapshot2 not in snapshots assert snapshot2 not in snapshots
assert snapshots[0].device != snapshots[1].device assert snapshots[0].device != snapshots[1].device
assert len(snapshots[0].device.components) == 4 assert len(snapshots[0].device.components) == 8
assert len(snapshots[1].device.components) == 9 assert len(snapshots[1].device.components) == 9
assert len(snapshots[0].device.actions) == 11 assert len(snapshots[0].device.actions) == 11
assert len(snapshots[1].device.actions) == 10 assert len(snapshots[1].device.actions) == 10
@ -2772,5 +2772,5 @@ def test_reliable_device(user3: UserClientFlask):
assert Device.query.filter_by(hid=snapshot.device.hid).count() == 2 assert Device.query.filter_by(hid=snapshot.device.hid).count() == 2
assert Snapshot.query.count() == 1 assert Snapshot.query.count() == 1
assert Snapshot.query.first() == snapshot assert Snapshot.query.first() == snapshot
assert len(snapshot.device.components) == 4 assert len(snapshot.device.components) == 8
assert len(snapshot.device.actions) == 4 assert len(snapshot.device.actions) == 7

View File

@ -158,12 +158,6 @@ def test_snapshot_update_timefield_updated(user: UserClient):
perform_second_snapshot=False, perform_second_snapshot=False,
) )
computer2 = yaml2json('2-second-device-with-components-of-first.snapshot') computer2 = yaml2json('2-second-device-with-components-of-first.snapshot')
snapshot_and_check(
user,
computer2,
action_types=('Remove',),
perform_second_snapshot=False,
)
pc1_devicehub_id = snapshot['device']['devicehubID'] pc1_devicehub_id = snapshot['device']['devicehubID']
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id) pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
assert pc1['updated'] != snapshot['device']['updated'] assert pc1['updated'] != snapshot['device']['updated']
@ -264,30 +258,25 @@ def test_snapshot_component_add_remove(user: UserClient):
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id) pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id) pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated # Check if the update_timestamp is updated
update1_pc2 = pc2['updated']
update2_pc1 = pc1['updated']
assert update1_pc1 != update2_pc1
# PC1 # PC1
assert tuple(c['serialNumber'] for c in pc1['components']) == ('p1c1s', 'p1c3s') assert tuple(c['serialNumber'] for c in pc1['components']) == (
'p1c1s',
'p1c2s',
'p1c3s',
)
assert all(c['parent'] == pc1_id for c in pc1['components']) assert all(c['parent'] == pc1_id for c in pc1['components'])
assert tuple(e['type'] for e in pc1['actions']) == ( assert tuple(e['type'] for e in pc1['actions']) == (
'BenchmarkProcessor', 'BenchmarkProcessor',
'Snapshot', 'Snapshot',
'Remove',
) )
# PC2 # PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p1c2s', 'p2c1s') assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s', 'p1c2s')
assert all(c['parent'] == pc2_id for c in pc2['components']) assert all(c['parent'] == pc2_id for c in pc2['components'])
assert tuple(e['type'] for e in pc2['actions']) == ('Snapshot',) assert tuple(e['type'] for e in pc2['actions']) == ('Snapshot',)
# p1c2s has two Snapshots, a Remove and an Add # p1c2s has two Snapshots, a Remove and an Add
p1c2s_dev = m.Device.query.filter_by(id=pc2['components'][0]['id']).one() p1c2s_dev = m.Device.query.filter_by(id=pc2['components'][1]['id']).one()
p1c2s, _ = user.get(res=m.Device, item=p1c2s_dev.devicehub_id) p1c2s, _ = user.get(res=m.Device, item=p1c2s_dev.devicehub_id)
assert tuple(e['type'] for e in p1c2s['actions']) == ( assert tuple(e['type'] for e in p1c2s['actions']) == ('Snapshot',)
'BenchmarkProcessor',
'Snapshot',
'Snapshot',
'Remove',
)
# We register the first device again, but removing motherboard # We register the first device again, but removing motherboard
# and moving processor from the second device to the first. # and moving processor from the second device to the first.
@ -296,42 +285,29 @@ def test_snapshot_component_add_remove(user: UserClient):
s3 = yaml2json( s3 = yaml2json(
'3-first-device-but-removing-motherboard-and-adding-processor-from-2.snapshot' '3-first-device-but-removing-motherboard-and-adding-processor-from-2.snapshot'
) )
snapshot_and_check(user, s3, ('Remove',), perform_second_snapshot=False)
pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id) pc1, _ = user.get(res=m.Device, item=pc1_devicehub_id)
pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id) pc2, _ = user.get(res=m.Device, item=pc2_devicehub_id)
# Check if the update_timestamp is updated # Check if the update_timestamp is updated
update2_pc2 = pc2['updated'] update2_pc2 = pc2['updated']
update3_pc1 = pc1['updated'] update3_pc1 = pc1['updated']
assert not update3_pc1 in [update1_pc1, update2_pc1]
assert update1_pc2 != update2_pc2
# PC1 # PC1
assert {c['serialNumber'] for c in pc1['components']} == {'p1c2s', 'p1c3s'} assert {c['serialNumber'] for c in pc1['components']} == {'p1c1s', 'p1c3s', 'p1c2s'}
assert all(c['parent'] == pc1_id for c in pc1['components']) assert all(c['parent'] == pc1['id'] for c in pc1['components'])
assert tuple(get_actions_info(pc1['actions'])) == ( assert tuple(get_actions_info(pc1['actions'])) == (
# id, type, components, snapshot # id, type, components, snapshot
('BenchmarkProcessor', []), # first BenchmarkProcessor ('BenchmarkProcessor', []), # first BenchmarkProcessor
('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s', 'p1c2s']), # first Snapshot1 ('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s']), # first Snapshot1
('Remove', ['p1c2s', 'p1c2s']), # Remove Processor in Snapshot2
('Snapshot', ['p1c2s', 'p1c3s']), # This Snapshot3
) )
# PC2 # PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',) assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s', 'p1c2s')
assert all(c['parent'] == pc2_id for c in pc2['components']) assert all(c['parent'] == pc2['id'] for c in pc2['components'])
assert tuple(e['type'] for e in pc2['actions']) == ( assert tuple(e['type'] for e in pc2['actions']) == ('Snapshot',) # Second Snapshot
'Snapshot', # Second Snapshot
'Remove', # the processor we added in 2.
)
# p1c2s has Snapshot, Remove and Add # p1c2s has Snapshot, Remove and Add
p1c2s_dev = m.Device.query.filter_by(id=pc1['components'][0]['id']).one() p1c2s_dev = m.Device.query.filter_by(id=pc1['components'][0]['id']).one()
p1c2s, _ = user.get(res=m.Device, item=p1c2s_dev.devicehub_id) p1c2s, _ = user.get(res=m.Device, item=p1c2s_dev.devicehub_id)
assert tuple(get_actions_info(p1c2s['actions'])) == ( assert tuple(get_actions_info(p1c2s['actions'])) == (
('BenchmarkProcessor', []), # first BenchmarkProcessor ('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s']), # First Snapshot to PC1
('Snapshot', ['p1c1s', 'p1c2s', 'p1c3s', 'p1c2s']), # First Snapshot to PC1
('Snapshot', ['p1c2s', 'p2c1s']), # Second Snapshot to PC2
('Remove', ['p1c2s', 'p1c2s']), # ...which caused p1c2s to be removed form PC1
('Snapshot', ['p1c2s', 'p1c3s']), # The third Snapshot to PC1
('Remove', ['p1c2s']), # ...which caused p1c2 to be removed from PC2
) )
# We register the first device but without the processor, # We register the first device but without the processor,
@ -344,16 +320,15 @@ def test_snapshot_component_add_remove(user: UserClient):
# Check if the update_timestamp is updated # Check if the update_timestamp is updated
update3_pc2 = pc2['updated'] update3_pc2 = pc2['updated']
update4_pc1 = pc1['updated'] update4_pc1 = pc1['updated']
assert update4_pc1 in [update1_pc1, update2_pc1, update3_pc1]
assert update3_pc2 == update2_pc2 assert update3_pc2 == update2_pc2
# PC 0: p1c3s, p1c4s. PC1: p2c1s # PC 0: p1c3s, p1c4s. PC1: p2c1s
assert {c['serialNumber'] for c in pc1['components']} == {'p1c2s', 'p1c3s'} assert {c['serialNumber'] for c in pc1['components']} == {'p1c1s', 'p1c2s', 'p1c3s'}
assert all(c['parent'] == pc1_id for c in pc1['components']) assert all(c['parent'] == pc1['id'] for c in pc1['components'])
# This last Action only # This last Action only
# PC2 # PC2
# We haven't changed PC2 # We haven't changed PC2
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',) assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s', 'p1c2s')
assert all(c['parent'] == pc2_id for c in pc2['components']) assert all(c['parent'] == pc2['id'] for c in pc2['components'])
@pytest.mark.mvp @pytest.mark.mvp
@ -454,7 +429,7 @@ def test_ram_remove(user: UserClient):
dev1 = m.Device.query.filter_by(id=snap1['device']['id']).one() dev1 = m.Device.query.filter_by(id=snap1['device']['id']).one()
dev2 = m.Device.query.filter_by(id=snap2['device']['id']).one() dev2 = m.Device.query.filter_by(id=snap2['device']['id']).one()
assert len(dev1.components) == 1 assert len(dev1.components) == 2
assert len(dev2.components) == 3 assert len(dev2.components) == 3
ssd = [x for x in dev2.components if x.t == 'SolidStateDrive'][0] ssd = [x for x in dev2.components if x.t == 'SolidStateDrive'][0]
remove = [x for x in ssd.actions if x.t == 'Remove'][0] remove = [x for x in ssd.actions if x.t == 'Remove'][0]
@ -685,7 +660,7 @@ def test_erase_changing_hdd_between_pcs(user: UserClient):
db.session.commit() db.session.commit()
assert dev2.components[2].parent == dev2 assert dev2.components[2].parent == dev2
assert dev2.components[2].actions[-1].device == dev1 assert dev2.components[2].actions[-1].device == dev2.components[2]
doc1, response = user.get( doc1, response = user.get(
res=documents.DocumentDef.t, item='erasures/{}'.format(dev1.id), accept=ANY res=documents.DocumentDef.t, item='erasures/{}'.format(dev1.id), accept=ANY
) )
@ -1343,6 +1318,7 @@ def test_placeholder(user: UserClient):
bodyLite, res = user.post(snapshot_lite, uri="/api/inventory/") bodyLite, res = user.post(snapshot_lite, uri="/api/inventory/")
assert res.status_code == 201 assert res.status_code == 201
dev = m.Device.query.filter_by(devicehub_id=bodyLite['dhid']).one() dev = m.Device.query.filter_by(devicehub_id=bodyLite['dhid']).one()
dev = dev.placeholder.binding
assert dev.placeholder is None assert dev.placeholder is None
assert dev.binding.phid == '12' assert dev.binding.phid == '12'
assert len(dev.binding.device.components) == 11 assert len(dev.binding.device.components) == 11
@ -1380,6 +1356,7 @@ def test_system_uuid_motherboard(user: UserClient):
if c['type'] == 'Motherboard': if c['type'] == 'Motherboard':
c['serialNumber'] = 'ABee0123456720' c['serialNumber'] = 'ABee0123456720'
s['uuid'] = str(uuid.uuid4())
snap2, _ = user.post(s, res=Snapshot, status=422) snap2, _ = user.post(s, res=Snapshot, status=422)
txt = "We have detected that a there is a device in your inventory" txt = "We have detected that a there is a device in your inventory"
assert txt in snap2['message'][0] assert txt in snap2['message'][0]
@ -1407,7 +1384,7 @@ def test_bug_4028_components(user: UserClient):
assert '' not in [c.phid() for c in components1] assert '' not in [c.phid() for c in components1]
assert '' not in [c.phid() for c in components2] assert '' not in [c.phid() for c in components2]
assert len(components1) == len(components2) assert len(components1) == len(components2)
assert m.Placeholder.query.count() == 15 assert m.Placeholder.query.count() == 19
assert m.Placeholder.query.count() * 2 == m.Device.query.count() assert m.Placeholder.query.count() * 2 == m.Device.query.count()
for c in m.Placeholder.query.filter(): for c in m.Placeholder.query.filter():
assert c.binding assert c.binding