reliable device

This commit is contained in:
Cayo Puigdefabregas 2023-01-13 17:42:42 +01:00
parent 5e7d95ba31
commit 520f1726be
5 changed files with 96 additions and 24 deletions

View File

@ -2,6 +2,7 @@ import copy
import csv import csv
import datetime import datetime
import json import json
import uuid
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
import pandas as pd import pandas as pd
@ -249,6 +250,10 @@ class LotForm(FlaskForm):
class UploadSnapshotForm(SnapshotMixin, FlaskForm): class UploadSnapshotForm(SnapshotMixin, FlaskForm):
snapshot = MultipleFileField('Select a Snapshot File', [validators.DataRequired()]) snapshot = MultipleFileField('Select a Snapshot File', [validators.DataRequired()])
def __init__(self, *args, **kwargs):
self.create_new_devices = kwargs.pop('create_new_devices', False)
super().__init__(*args, **kwargs)
def validate(self, extra_validators=None): def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators) is_valid = super().validate(extra_validators)
@ -319,7 +324,9 @@ class UploadSnapshotForm(SnapshotMixin, FlaskForm):
try: try:
snapshot_json = schema.load(snapshot_json) snapshot_json = schema.load(snapshot_json)
response = self.build(snapshot_json) response = self.build(
snapshot_json, create_new_device=self.create_new_devices
)
except ValidationError as err: except ValidationError as err:
txt = "{}".format(err) txt = "{}".format(err)
self.errors(txt=txt) self.errors(txt=txt)
@ -1769,8 +1776,6 @@ class UserTrustsForm(FlaskForm):
if self.snapshot_type.data == 'update' and not self.unic(): if self.snapshot_type.data == 'update' and not self.unic():
self.device.reliable() self.device.reliable()
txt = "This devices is assigned as reliable for the user."
self.error_log(txt)
if self.snapshot_type.data == 'new_device' and self.unic(): if self.snapshot_type.data == 'new_device' and self.unic():
self.device.unreliable() self.device.unreliable()

View File

@ -64,7 +64,8 @@ def move_json(tmp_snapshots, path_name, user, live=False):
class SnapshotMixin: class SnapshotMixin:
sync = Sync() sync = Sync()
def build(self, snapshot_json=None): # noqa: C901 def build(self, snapshot_json=None, create_new_device=False): # noqa: C901
self.create_new_device = create_new_device
if not snapshot_json: if not snapshot_json:
snapshot_json = self.snapshot_json snapshot_json = self.snapshot_json
device = snapshot_json.pop('device') # type: Computer device = snapshot_json.pop('device') # type: Computer
@ -87,7 +88,9 @@ class SnapshotMixin:
assert not device.actions_one assert not device.actions_one
assert all(not c.actions_one for c in components) if components else True assert all(not c.actions_one for c in components) if components else True
db_device, remove_actions = self.sync.run(device, components) db_device, remove_actions = self.sync.run(
device, components, self.create_new_device
)
del device # Do not use device anymore del device # Do not use device anymore
snapshot.device = db_device snapshot.device = db_device

View File

@ -894,15 +894,13 @@ class Device(Thing):
snapshot1 = ac snapshot1 = ac
if i > 0: if i > 0:
snapshots[ac] = self.get_snapshot_file(ac) snapshots[ac] = self.get_snapshot_file(ac)
# ac.active = False
i += 1 i += 1
if not snapshot1: if not snapshot1:
return return
self.create_new_device(snapshots.values()) self.create_new_device(snapshots.values())
# [self.remove_snapshot(ac) for ac in snapshots.keys()] self.remove_snapshot(snapshots.keys())
self.reset_components(snapshot1)
return return
@ -930,10 +928,42 @@ class Device(Thing):
form = UploadSnapshotForm() form = UploadSnapshotForm()
form.result = {} form.result = {}
form.snapshots = new_snapshots form.snapshots = new_snapshots
form.create_new_devices = True
form.save(commit=False) form.save(commit=False)
def remove_snapshot(self, snapshots):
from ereuse_devicehub.parser.models import SnapshotsLog
for ac in snapshots:
for slog in SnapshotsLog.query.filter_by(snapshot=ac):
slog.snapshot_id = None
slog.snapshot_uuid = None
db.session.delete(ac)
def remove_devices(self, devices):
from ereuse_devicehub.parser.models import SnapshotsLog
for dev in devices:
for ac in dev.actions:
if ac.type != 'Snapshot':
continue
for slog in SnapshotsLog.query.filter_by(snapshot=ac):
slog.snapshot_id = None
slog.snapshot_uuid = None
for c in dev.components:
c.parent_id = None
for tag in dev.tags:
tag.device_id = None
placeholder = dev.binding or dev.placeholder
if placeholder:
db.session.delete(placeholder.binding)
db.session.delete(placeholder.device)
db.session.delete(placeholder)
def reliable(self): def reliable(self):
# self.user_trusts = True
computers = Computer.query.filter_by( computers = Computer.query.filter_by(
hid=self.hid, hid=self.hid,
owner_id=g.user.id, owner_id=g.user.id,
@ -943,6 +973,7 @@ class Device(Thing):
i = 0 i = 0
computer1 = None computer1 = None
computers_to_remove = []
for d in computers: for d in computers:
if i == 0: if i == 0:
d.user_trusts = True d.user_trusts = True
@ -950,16 +981,9 @@ class Device(Thing):
i += 1 i += 1
continue continue
d.user_trusts = True computers_to_remove.append(d)
d.active = False
d.binding.device.active = False
for ac in d.actions:
if ac.type == 'Snapshot':
ac.active = False
for c in d.components:
c.parent = None
self.remove_devices(computers_to_remove)
if not computer1: if not computer1:
return return
@ -981,8 +1005,6 @@ class Device(Thing):
if c.parent is None: if c.parent is None:
c.parent = snapshot.device c.parent = snapshot.device
snapshot.device.components = snapshot.components
def __lt__(self, other): def __lt__(self, other):
return self.id < other.id return self.id < other.id

View File

@ -35,7 +35,10 @@ class Sync:
"""Synchronizes the device and components with the database.""" """Synchronizes the device and components with the database."""
def run( def run(
self, device: Device, components: Iterable[Component] or None self,
device: Device,
components: Iterable[Component] or None,
create_new_device=False,
) -> (Device, OrderedSet): ) -> (Device, OrderedSet):
"""Synchronizes the device and components with the database. """Synchronizes the device and components with the database.
@ -71,7 +74,7 @@ class Sync:
device.components = OrderedSet(components) device.components = OrderedSet(components)
device.set_hid() device.set_hid()
device.components = OrderedSet() device.components = OrderedSet()
db_device = self.execute_register(device) db_device = self.execute_register(device, create_new_device)
db_components, actions = OrderedSet(), OrderedSet() db_components, actions = OrderedSet(), OrderedSet()
if components is not None: # We have component info (see above) if components is not None: # We have component info (see above)
@ -134,7 +137,7 @@ class Sync:
is_new = True is_new = True
return db_component, is_new return db_component, is_new
def execute_register(self, device: Device) -> Device: def execute_register(self, device: Device, create_new_device=False) -> Device:
"""Synchronizes one device to the DB. """Synchronizes one device to the DB.
This method tries to get an existing device using the HID This method tries to get an existing device using the HID
@ -166,7 +169,7 @@ class Sync:
if db_device and db_device.allocated: if db_device and db_device.allocated:
raise ResourceNotFound('device is actually allocated {}'.format(device)) raise ResourceNotFound('device is actually allocated {}'.format(device))
if not db_device: if not db_device or create_new_device:
device.tags.clear() # We don't want to add the transient dummy tags device.tags.clear() # We don't want to add the transient dummy tags
db.session.add(device) db.session.add(device)
db_device = device db_device = device

View File

@ -2672,3 +2672,42 @@ def test_system_uuid_motherboard(user3: UserClientFlask):
for c in snapshot.device.components: for c in snapshot.device.components:
if c.type == 'Motherboard': if c.type == 'Motherboard':
assert c.serial_number == 'abee0123456720' assert c.serial_number == 'abee0123456720'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_unreliable_device(user3: UserClientFlask):
snapshot = create_device(user3, 'real-eee-1001pxd.snapshot.12.json')
uri = '/inventory/upload-snapshot/'
file_name = 'real-eee-1001pxd.snapshot.12'
snapshot_json = conftest.yaml2json(file_name)
snapshot_json['uuid'] = 'c058e8d2-fb92-47cb-a4b7-522b75561136'
b_snapshot = bytes(json.dumps(snapshot_json), 'utf-8')
file_snap = (BytesIO(b_snapshot), file_name)
user3.get(uri)
data = {
'snapshot': file_snap,
'csrf_token': generate_csrf(),
}
user3.post(uri, data=data, content_type="multipart/form-data")
snapshot2 = Snapshot.query.filter_by(uuid=snapshot_json['uuid']).first()
assert snapshot2.device == snapshot.device
uuid2 = snapshot2.uuid
uri = f"/inventory/snapshots/{uuid2}/"
user3.get(uri)
data = {
'snapshot_type': "new_device",
'csrf_token': generate_csrf(),
}
assert Device.query.filter_by(hid=snapshot.device.hid).count() == 2
user3.post(uri, data=data)
assert Device.query.filter_by(hid=snapshot.device.hid).count() == 4
assert Snapshot.query.count() == 3
import pdb
pdb.set_trace()