fix snapshotParse for hwmd

This commit is contained in:
Cayo Puigdefabregas 2023-07-11 14:06:16 +02:00
parent 6337322ac5
commit 765858ac1b
7 changed files with 215 additions and 95 deletions

View file

@ -13,7 +13,7 @@ from werkzeug.exceptions import Unauthorized
from ereuse_devicehub.auth import Auth from ereuse_devicehub.auth import Auth
from ereuse_devicehub.db import db from ereuse_devicehub.db import db
from ereuse_devicehub.parser.models import SnapshotsLog from ereuse_devicehub.parser.models import SnapshotsLog
from ereuse_devicehub.parser.parser import ParseSnapshotLsHw from ereuse_devicehub.parser.parser import ParseSnapshot
from ereuse_devicehub.parser.schemas import Snapshot_lite from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.views.snapshot import ( from ereuse_devicehub.resources.action.views.snapshot import (
SnapshotMixin, SnapshotMixin,
@ -59,7 +59,7 @@ class InventoryView(LoginMixin, SnapshotMixin):
return snapshot_json return snapshot_json
try: try:
self.snapshot_json = ParseSnapshotLsHw(snapshot_json).get_snapshot() self.snapshot_json = ParseSnapshot(snapshot_json).get_snapshot()
raise 1 == 2 raise 1 == 2
except Exception as err: except Exception as err:
logger.error("Error: {} \n{}\n".format(err, self.snapshot_json)) logger.error("Error: {} \n{}\n".format(err, self.snapshot_json))

View file

@ -39,7 +39,7 @@ from ereuse_devicehub.inventory.models import (
TransferCustomerDetails, TransferCustomerDetails,
) )
from ereuse_devicehub.parser.models import PlaceholdersLog, SnapshotsLog from ereuse_devicehub.parser.models import PlaceholdersLog, SnapshotsLog
from ereuse_devicehub.parser.parser import ParseSnapshot, ParseSnapshotLsHw from ereuse_devicehub.parser.parser import ParseSnapshot
from ereuse_devicehub.parser.schemas import Snapshot_lite from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.models import Snapshot, Trade from ereuse_devicehub.resources.action.models import Snapshot, Trade
from ereuse_devicehub.resources.action.schemas import Snapshot as SnapshotSchema from ereuse_devicehub.resources.action.schemas import Snapshot as SnapshotSchema
@ -315,7 +315,7 @@ class UploadSnapshotForm(SnapshotMixin, FlaskForm):
return True return True
def save(self, commit=True, user_trusts=True): def save(self, commit=True, user_trusts=True): # noqa: C901
if any([x == 'Error' for x in self.result.values()]): if any([x == 'Error' for x in self.result.values()]):
return return
schema = SnapshotSchema() schema = SnapshotSchema()

View file

@ -2,6 +2,7 @@ import json
import logging import logging
import uuid import uuid
import numpy
from dmidecode import DMIParse from dmidecode import DMIParse
from flask import request from flask import request
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
@ -25,6 +26,7 @@ class ParseSnapshot:
self.lscpi_raw = snapshot["hwmd"]["lspci"] self.lscpi_raw = snapshot["hwmd"]["lspci"]
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw) self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw) self.smart = self.loads(self.smart_raw)
@ -32,14 +34,15 @@ class ParseSnapshot:
self.hwinfo = self.parse_hwinfo() self.hwinfo = self.parse_hwinfo()
self.set_computer() self.set_computer()
self.get_hwinfo_monitors()
self.set_components() self.set_components()
self.snapshot_json = { self.snapshot_json = {
"device": self.device, "device": self.device,
"software": "UsodyOS", "software": "UsodyOS",
"components": self.components, "components": self.components,
"uuid": snapshot['uuid'], "uuid": snapshot['uuid'],
"type": snapshot['type'], "version": snapshot['version'],
"version": "1.0.0", "settings_version": snapshot['settings_version'],
"endTime": snapshot["timestamp"], "endTime": snapshot["timestamp"],
"elapsed": 1, "elapsed": 1,
"sid": snapshot["sid"], "sid": snapshot["sid"],
@ -63,7 +66,10 @@ class ParseSnapshot:
self.get_cpu() self.get_cpu()
self.get_ram() self.get_ram()
self.get_mother_board() self.get_mother_board()
self.get_graphic()
self.get_data_storage() self.get_data_storage()
self.get_display()
self.get_sound_card()
self.get_networks() self.get_networks()
def get_cpu(self): def get_cpu(self):
@ -87,21 +93,6 @@ class ParseSnapshot:
} }
) )
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_ram_model(self, ram):
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'memory':
if c.get('serial') == ram.get('Serial Number'):
return c.get('Vendor')
def get_ram(self): def get_ram(self):
for ram in self.dmi.get("Memory Device"): for ram in self.dmi.get("Memory Device"):
self.components.append( self.components.append(
@ -114,14 +105,11 @@ class ParseSnapshot:
"serialNumber": ram.get("Serial Number", self.default), "serialNumber": ram.get("Serial Number", self.default),
"interface": self.get_ram_type(ram), "interface": self.get_ram_type(ram),
"format": ram.get("Form Factor", "DIMM"), "format": ram.get("Form Factor", "DIMM"),
"partNumber": ram.get("Part Number", self.default), "model": ram.get("Part Number", self.default),
"model": self.get_ram_model(ram),
} }
) )
def get_mother_board(self): def get_mother_board(self):
# TODO @cayop model, not exist in dmidecode
# import pdb; pdb.set_trace()
for moder_board in self.dmi.get("Baseboard"): for moder_board in self.dmi.get("Baseboard"):
self.components.append( self.components.append(
{ {
@ -131,17 +119,170 @@ class ParseSnapshot:
"serialNumber": moder_board.get("Serial Number"), "serialNumber": moder_board.get("Serial Number"),
"manufacturer": moder_board.get("Manufacturer"), "manufacturer": moder_board.get("Manufacturer"),
"biosDate": self.get_bios_date(), "biosDate": self.get_bios_date(),
# "firewire": self.get_firmware(),
"ramMaxSize": self.get_max_ram_size(), "ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")), "ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(), "slots": self.get_ram_slots(),
"model": moder_board.get("Product Name"), # ?? "model": moder_board.get("Product Name"),
"pcmcia": self.get_pcmcia_num(), # ?? "firewire": self.get_firmware_num(),
"serial": self.get_serial_num(), # ?? "pcmcia": self.get_pcmcia_num(),
"serial": self.get_serial_num(),
"usb": self.get_usb_num(), "usb": self.get_usb_num(),
} }
) )
def get_graphic(self):
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] != 'display':
continue
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"manufacturer": c.get("vendor", self.default),
"model": c.get("product", self.default),
"serialNumber": c.get("serial", self.default),
}
)
def get_data_storage(self):
for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1:
continue
model = sm.get('model_name')
manufacturer = None
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": [],
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def get_networks(self):
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'networks':
capacity = c.get('capacity')
units = c.get('units')
speed = None
if capacity and units:
speed = base2.unit.Quantity(capacity, units).to('Mbit/s').m
wireless = bool(c.get('configuration', {}).get('wireless', False))
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
"speed": speed,
"variant": c.get('version', 1),
"wireless": wireless,
}
)
def get_sound_card(self):
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'multimedia':
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
}
)
def get_display(self): # noqa: C901
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
for c in self.monitors:
resolution_width, resolution_height = (None,) * 2
refresh, serial, model, manufacturer, size = (None,) * 5
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
self.components.append(
{
"actions": [],
"type": "Display",
"model": model,
"manufacturer": manufacturer,
"serialNumber": serial,
'size': size,
'resolution_width': resolution_width,
'resolution_height': resolution_height,
'technology': technology,
'refresh_rate': refresh,
}
)
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return numpy.sqrt(w**2 + h**2) * i
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_usb_num(self): def get_usb_num(self):
return len( return len(
[ [
@ -160,6 +301,15 @@ class ParseSnapshot:
] ]
) )
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self): def get_pcmcia_num(self):
return len( return len(
[ [
@ -290,72 +440,33 @@ class ParseSnapshot:
return k return k
return self.default return self.default
def get_data_storage(self):
for sm in self.smart:
model = sm.get('model_name')
manufacturer = None
if len(model.split(" ")) == 2:
manufacturer, model = model.split(" ")
self.components.append(
{
"actions": [],
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def get_data_storage_type(self, x): def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types # TODO @cayop add more SSDS types
SSDS = ["nvme"] SSDS = ["nvme"]
SSD = 'SolidStateDrive' SSD = 'SolidStateDrive'
HDD = 'HardDrive' HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type') type_dev = x.get('device', {}).get('type')
return SSD if type_dev in SSDS else HDD trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x): def get_data_storage_interface(self, x):
return x.get('device', {}).get('protocol', 'ATA') interface = x.get('device', {}).get('protocol', 'ATA')
try:
DataStorageInterface(interface.upper())
except ValueError as err:
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(err))
self.errors(txt, severity=Severity.Warning)
return "ATA"
def get_data_storage_size(self, x): def get_data_storage_size(self, x):
type_dev = x.get('device', {}).get('type') total_capacity = x.get('user_capacity', {}).get('bytes')
total_capacity = "{type}_total_capacity".format(type=type_dev) if not total_capacity:
return 1
# convert bytes to Mb # convert bytes to Mb
return x.get(total_capacity) / 1024**2 return total_capacity / 1024**2
def get_networks(self):
hw_class = " Hardware Class: "
mac = " Permanent HW Address: "
model = " Model: "
wireless = "wireless"
for line in self.hwinfo:
iface = {
"variant": "1",
"actions": [],
"speed": 100.0,
"type": "NetworkAdapter",
"wireless": False,
"manufacturer": "Ethernet",
}
for y in line:
if hw_class in y and not y.split(hw_class)[1] == 'network':
break
if mac in y:
iface["serialNumber"] = y.split(mac)[1]
if model in y:
iface["model"] = y.split(model)[1]
if wireless in y:
iface["wireless"] = True
if iface.get("serialNumber"):
self.components.append(iface)
def parse_hwinfo(self): def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n") hw_blocks = self.hwinfo_raw.split("\n\n")

View file

@ -36,7 +36,6 @@ class Snapshot_lite(Thing):
# description='The software that generated this Snapshot.', # description='The software that generated this Snapshot.',
# ) # )
sid = String(required=True) sid = String(required=True)
type = String(required=True)
timestamp = String(required=True) timestamp = String(required=True)
settings_version = String(required=False) settings_version = String(required=False)
hwmd = Nested(Snapshot_lite_data, required=True) hwmd = Nested(Snapshot_lite_data, required=True)

View file

@ -469,9 +469,12 @@ class Snapshot(ActionWithOneDevice):
@validates_schema @validates_schema
def validate_components_only_workbench(self, data: dict): def validate_components_only_workbench(self, data: dict):
if (data['software'] != SnapshotSoftware.Workbench) and ( software = [
data['software'] != SnapshotSoftware.WorkbenchAndroid SnapshotSoftware.Workbench,
): SnapshotSoftware.WorkbenchAndroid,
SnapshotSoftware.UsodyOS,
]
if data['software'] not in software:
if data.get('components', None) is not None: if data.get('components', None) is not None:
raise ValidationError( raise ValidationError(
'Only Workbench can add component info', field_names=['components'] 'Only Workbench can add component info', field_names=['components']
@ -481,7 +484,11 @@ class Snapshot(ActionWithOneDevice):
def validate_only_workbench_fields(self, data: dict): def validate_only_workbench_fields(self, data: dict):
"""Ensures workbench has ``elapsed`` and ``uuid`` and no others.""" """Ensures workbench has ``elapsed`` and ``uuid`` and no others."""
# todo test # todo test
if data['software'] == SnapshotSoftware.Workbench: software = [
SnapshotSoftware.Workbench,
SnapshotSoftware.UsodyOS,
]
if data['software'] in software:
if not data.get('uuid', None): if not data.get('uuid', None):
raise ValidationError( raise ValidationError(
'Snapshots from Workbench and WorkbenchAndroid must have uuid', 'Snapshots from Workbench and WorkbenchAndroid must have uuid',

View file

@ -70,9 +70,12 @@ class SnapshotMixin:
snapshot_json = self.snapshot_json snapshot_json = self.snapshot_json
device = snapshot_json.pop('device') # type: Computer device = snapshot_json.pop('device') # type: Computer
components = None components = None
if snapshot_json['software'] == ( software = [
SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid SnapshotSoftware.Workbench,
): SnapshotSoftware.WorkbenchAndroid,
SnapshotSoftware.UsodyOS,
]
if snapshot_json['software'] in software:
components = snapshot_json.pop('components', None) components = snapshot_json.pop('components', None)
snapshot = Snapshot(**snapshot_json) snapshot = Snapshot(**snapshot_json)

View file

@ -1,10 +1,9 @@
import ipaddress import ipaddress
from distutils.version import StrictVersion from distutils.version import LooseVersion
from typing import Type, Union from typing import Type, Union
import colour import colour
from boltons import strutils, urlutils from boltons import strutils, urlutils
from ereuse_devicehub.ereuse_utils import if_none_return_none
from flask import current_app as app from flask import current_app as app
from flask import g from flask import g
from marshmallow import utils from marshmallow import utils
@ -17,6 +16,7 @@ from marshmallow.validate import Validator
from marshmallow_enum import EnumField as _EnumField from marshmallow_enum import EnumField as _EnumField
from sqlalchemy_utils import PhoneNumber from sqlalchemy_utils import PhoneNumber
from ereuse_devicehub.ereuse_utils import if_none_return_none
from ereuse_devicehub.teal import db as tealdb from ereuse_devicehub.teal import db as tealdb
from ereuse_devicehub.teal.resource import Schema from ereuse_devicehub.teal.resource import Schema
@ -30,7 +30,7 @@ class Version(Field):
@if_none_return_none @if_none_return_none
def _deserialize(self, value, attr, data): def _deserialize(self, value, attr, data):
return StrictVersion(value) return LooseVersion(value)
class Color(Field): class Color(Field):