Compare commits

...
This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.

13 commits

Author SHA1 Message Date
Cayo Puigdefabregas 9586873fa4 fix get sanitize 2023-07-26 16:33:23 +02:00
Cayo Puigdefabregas 765927cfd1 parse and encrypted erasure 2023-07-19 18:26:21 +02:00
Cayo Puigdefabregas 24b59a920e . 2023-07-18 16:31:21 +02:00
Cayo Puigdefabregas d8d2931720 add sanitize actions 2023-07-14 17:40:18 +02:00
Cayo Puigdefabregas 0d957ecc4a fix strict mode 2023-07-12 16:18:56 +02:00
Cayo Puigdefabregas a7f9b8b725 fix parsing 2023-07-12 14:18:11 +02:00
Cayo Puigdefabregas 89109bd1b4 add migration file 2023-07-12 14:17:16 +02:00
Cayo Puigdefabregas e19df352d2 . 2023-07-11 16:19:29 +02:00
Cayo Puigdefabregas 415bc65f4b fix strictversions 2023-07-11 16:19:05 +02:00
Cayo Puigdefabregas 765858ac1b fix snapshotParse for hwmd 2023-07-11 14:06:16 +02:00
Cayo Puigdefabregas 6337322ac5 parse ram 2023-07-05 09:21:29 +02:00
Cayo Puigdefabregas 383c2e3560 snapshotparser 2023-07-04 16:45:04 +02:00
Cayo Puigdefabregas ca4a995a27 add new parse for hwmd step1 scooter 2023-07-03 18:49:15 +02:00
12 changed files with 516 additions and 135 deletions

View file

@ -1,4 +1,5 @@
import json import json
import logging
from binascii import Error as asciiError from binascii import Error as asciiError
from flask import Blueprint from flask import Blueprint
@ -12,7 +13,7 @@ from werkzeug.exceptions import Unauthorized
from ereuse_devicehub.auth import Auth from ereuse_devicehub.auth import Auth
from ereuse_devicehub.db import db from ereuse_devicehub.db import db
from ereuse_devicehub.parser.models import SnapshotsLog from ereuse_devicehub.parser.models import SnapshotsLog
from ereuse_devicehub.parser.parser import ParseSnapshotLsHw from ereuse_devicehub.parser.parser import ParseSnapshot
from ereuse_devicehub.parser.schemas import Snapshot_lite from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.views.snapshot import ( from ereuse_devicehub.resources.action.views.snapshot import (
SnapshotMixin, SnapshotMixin,
@ -21,6 +22,8 @@ from ereuse_devicehub.resources.action.views.snapshot import (
) )
from ereuse_devicehub.resources.enums import Severity from ereuse_devicehub.resources.enums import Severity
logger = logging.getLogger(__name__)
api = Blueprint('api', __name__, url_prefix='/api') api = Blueprint('api', __name__, url_prefix='/api')
@ -55,7 +58,19 @@ class InventoryView(LoginMixin, SnapshotMixin):
if type(snapshot_json) == Response: if type(snapshot_json) == Response:
return snapshot_json return snapshot_json
self.snapshot_json = ParseSnapshotLsHw(snapshot_json).get_snapshot() try:
self.snapshot_json = ParseSnapshot(snapshot_json).get_snapshot()
raise 1 == 2
except Exception as err:
logger.error("Error: {} \n{}\n".format(err, self.snapshot_json))
self.response = jsonify(
{
'error': err,
}
)
self.response.status_code = 500
return self.response
snapshot = self.build() snapshot = self.build()
snapshot.device.set_hid() snapshot.device.set_hid()

View file

@ -1,4 +1,4 @@
from distutils.version import StrictVersion from distutils.version import LooseVersion
from itertools import chain from itertools import chain
from decouple import config from decouple import config
@ -59,11 +59,11 @@ class DevicehubConfig(Config):
SCHEMA = config('SCHEMA', 'dbtest') SCHEMA = config('SCHEMA', 'dbtest')
HOST = config('HOST', 'localhost') HOST = config('HOST', 'localhost')
API_HOST = config('API_HOST', 'localhost') API_HOST = config('API_HOST', 'localhost')
MIN_WORKBENCH = StrictVersion('11.0a1') # type: StrictVersion MIN_WORKBENCH = LooseVersion('11.0a1')
"""The minimum version of ereuse.org workbench that this devicehub """The minimum version of ereuse.org workbench that this devicehub
accepts. we recommend not changing this value. accepts. we recommend not changing this value.
""" """
SCHEMA_WORKBENCH = ["1.0.0"] SCHEMA_HWMD = ["1.0.0"]
TMP_SNAPSHOTS = config('TMP_SNAPSHOTS', '/tmp/snapshots') TMP_SNAPSHOTS = config('TMP_SNAPSHOTS', '/tmp/snapshots')
TMP_LIVES = config('TMP_LIVES', '/tmp/lives') TMP_LIVES = config('TMP_LIVES', '/tmp/lives')
@ -75,7 +75,7 @@ class DevicehubConfig(Config):
API_DOC_CLASS_DISCRIMINATOR = 'type' API_DOC_CLASS_DISCRIMINATOR = 'type'
PRICE_SOFTWARE = PriceSoftware.Ereuse PRICE_SOFTWARE = PriceSoftware.Ereuse
PRICE_VERSION = StrictVersion('1.0') PRICE_VERSION = LooseVersion('1.0')
PRICE_CURRENCY = Currency.EUR PRICE_CURRENCY = Currency.EUR
"""Official versions.""" """Official versions."""

View file

@ -39,7 +39,7 @@ from ereuse_devicehub.inventory.models import (
TransferCustomerDetails, TransferCustomerDetails,
) )
from ereuse_devicehub.parser.models import PlaceholdersLog, SnapshotsLog from ereuse_devicehub.parser.models import PlaceholdersLog, SnapshotsLog
from ereuse_devicehub.parser.parser import ParseSnapshotLsHw from ereuse_devicehub.parser.parser import ParseSnapshot # , ParseSnapshotLsHw
from ereuse_devicehub.parser.schemas import Snapshot_lite from ereuse_devicehub.parser.schemas import Snapshot_lite
from ereuse_devicehub.resources.action.models import Snapshot, Trade from ereuse_devicehub.resources.action.models import Snapshot, Trade
from ereuse_devicehub.resources.action.schemas import Snapshot as SnapshotSchema from ereuse_devicehub.resources.action.schemas import Snapshot as SnapshotSchema
@ -315,18 +315,12 @@ class UploadSnapshotForm(SnapshotMixin, FlaskForm):
return True return True
def is_wb_lite_snapshot(self, version: str) -> bool: def save(self, commit=True, user_trusts=True): # noqa: C901
is_lite = False
if version in app.config['SCHEMA_WORKBENCH']:
is_lite = True
return is_lite
def save(self, commit=True, user_trusts=True):
if any([x == 'Error' for x in self.result.values()]): if any([x == 'Error' for x in self.result.values()]):
return return
schema = SnapshotSchema() schema = SnapshotSchema()
schema_lite = Snapshot_lite() schema_lite = Snapshot_lite()
schemas_hwmd = {'1.0.0': Snapshot_lite()}
devices = [] devices = []
self.tmp_snapshots = app.config['TMP_SNAPSHOTS'] self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
for filename, snapshot_json in self.snapshots: for filename, snapshot_json in self.snapshots:
@ -335,10 +329,19 @@ class UploadSnapshotForm(SnapshotMixin, FlaskForm):
self.version = snapshot_json.get('schema_api') self.version = snapshot_json.get('schema_api')
self.uuid = snapshot_json.get('uuid') self.uuid = snapshot_json.get('uuid')
self.sid = snapshot_json.get('sid') self.sid = snapshot_json.get('sid')
# import pdb; pdb.set_trace()
if snapshot_json.get('hwmd'):
schema_api = snapshot_json.get('schema_api')
schema_lite = schemas_hwmd.get(schema_api)
if not schema_lite:
txt = "Error: No there are schema_api in the snapshot {}"
txt = txt.format(self.uuid)
raise txt
if self.is_wb_lite_snapshot(self.version):
self.snapshot_json = schema_lite.load(snapshot_json) self.snapshot_json = schema_lite.load(snapshot_json)
snapshot_json = ParseSnapshotLsHw(self.snapshot_json).snapshot_json snapshot_json = ParseSnapshot(self.snapshot_json).snapshot_json
# snapshot_json = ParseSnapshotLsHw(self.snapshot_json).snapshot_json
else: else:
self.version = snapshot_json.get('version') self.version = snapshot_json.get('version')
system_uuid = self.get_uuid(debug) system_uuid = self.get_uuid(debug)

View file

@ -0,0 +1,30 @@
"""add usody in enum software
Revision ID: be6847b24846
Revises: 5169765e2653
Create Date: 2023-07-11 14:07:08.887104
"""
from alembic import context, op
# revision identifiers, used by Alembic.
revision = 'be6847b24846'
down_revision = '5169765e2653'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.execute("ALTER TYPE snapshotsoftware ADD VALUE 'UsodyOS'")
def downgrade():
# "select e.enumlabel FROM pg_enum e JOIN pg_type t ON e.enumtypid = t.oid WHERE t.typname = 'snapshotsoftware'"
pass

View file

@ -1,12 +1,15 @@
import json import json
import logging import logging
import uuid import uuid
from datetime import datetime
import numpy
from dmidecode import DMIParse from dmidecode import DMIParse
from flask import request from flask import request
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from ereuse_devicehub.parser import base2 from ereuse_devicehub.ereuse_utils.nested_lookup import get_nested_dicts_with_key_value
from ereuse_devicehub.parser import base2, unit
from ereuse_devicehub.parser.computer import Computer from ereuse_devicehub.parser.computer import Computer
from ereuse_devicehub.parser.models import SnapshotsLog from ereuse_devicehub.parser.models import SnapshotsLog
from ereuse_devicehub.resources.action.schemas import Snapshot from ereuse_devicehub.resources.action.schemas import Snapshot
@ -18,25 +21,31 @@ logger = logging.getLogger(__name__)
class ParseSnapshot: class ParseSnapshot:
def __init__(self, snapshot, default="n/a"): def __init__(self, snapshot, default="n/a"):
self.default = default self.default = default
self.dmidecode_raw = snapshot["data"]["dmidecode"] self.dmidecode_raw = snapshot["hwmd"]["dmidecode"]
self.smart_raw = snapshot["data"]["smart"] self.smart_raw = snapshot["hwmd"]["smart"]
self.hwinfo_raw = snapshot["data"]["hwinfo"] self.hwinfo_raw = snapshot["hwmd"]["hwinfo"]
self.lshw_raw = snapshot["hwmd"]["lshw"]
self.lscpi_raw = snapshot["hwmd"]["lspci"]
self.sanitize_raw = snapshot.get("sanitize", [])
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw) self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw) self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw)
self.hwinfo = self.parse_hwinfo() self.hwinfo = self.parse_hwinfo()
self.set_basic_datas() self.set_computer()
self.get_hwinfo_monitors()
self.set_components() self.set_components()
self.snapshot_json = { self.snapshot_json = {
"device": self.device, "device": self.device,
"software": "Workbench", "software": "UsodyOS",
"components": self.components, "components": self.components,
"uuid": snapshot['uuid'], "uuid": snapshot['uuid'],
"type": snapshot['type'], "version": snapshot['version'],
"version": "14.0.0", "settings_version": snapshot['settings_version'],
"endTime": snapshot["timestamp"], "endTime": snapshot["timestamp"],
"elapsed": 1, "elapsed": 1,
"sid": snapshot["sid"], "sid": snapshot["sid"],
@ -45,7 +54,7 @@ class ParseSnapshot:
def get_snapshot(self): def get_snapshot(self):
return Snapshot().load(self.snapshot_json) return Snapshot().load(self.snapshot_json)
def set_basic_datas(self): def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer() self.device['manufacturer'] = self.dmi.manufacturer()
self.device['model'] = self.dmi.model() self.device['model'] = self.dmi.model()
self.device['serialNumber'] = self.dmi.serial_number() self.device['serialNumber'] = self.dmi.serial_number()
@ -53,17 +62,24 @@ class ParseSnapshot:
self.device['sku'] = self.get_sku() self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version() self.device['version'] = self.get_version()
self.device['system_uuid'] = self.get_uuid() self.device['system_uuid'] = self.get_uuid()
self.device['family'] = self.get_family()
self.device['chassis'] = self.get_chassis_dh()
def set_components(self): def set_components(self):
self.get_cpu() self.get_cpu()
self.get_ram() self.get_ram()
self.get_mother_board() self.get_mother_board()
self.get_graphic()
self.get_data_storage() self.get_data_storage()
self.get_display()
self.get_sound_card()
self.get_networks() self.get_networks()
def get_cpu(self): def get_cpu(self):
# TODO @cayop generation, brand and address not exist in dmidecode
for cpu in self.dmi.get('Processor'): for cpu in self.dmi.get('Processor'):
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append( self.components.append(
{ {
"actions": [], "actions": [],
@ -73,16 +89,20 @@ class ParseSnapshot:
"model": cpu.get('Version'), "model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)), "threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'), "manufacturer": cpu.get('Manufacturer'),
"serialNumber": cpu.get('Serial Number'), "serialNumber": serial,
"generation": cpu.get('Generation'), "generation": None,
"brand": cpu.get('Brand'), "brand": cpu.get('Family'),
"address": cpu.get('Address'), "address": self.get_cpu_address(cpu),
} }
) )
def get_ram(self): def get_ram(self):
# TODO @cayop format and model not exist in dmidecode
for ram in self.dmi.get("Memory Device"): for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append( self.components.append(
{ {
"actions": [], "actions": [],
@ -91,14 +111,13 @@ class ParseSnapshot:
"speed": self.get_ram_speed(ram), "speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default), "manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default), "serialNumber": ram.get("Serial Number", self.default),
"interface": self.get_ram_type(ram), "interface": ram.get("Type", "DDR"),
"format": self.get_ram_format(ram), "format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default), "model": ram.get("Part Number", self.default),
} }
) )
def get_mother_board(self): def get_mother_board(self):
# TODO @cayop model, not exist in dmidecode
for moder_board in self.dmi.get("Baseboard"): for moder_board in self.dmi.get("Baseboard"):
self.components.append( self.components.append(
{ {
@ -108,17 +127,232 @@ class ParseSnapshot:
"serialNumber": moder_board.get("Serial Number"), "serialNumber": moder_board.get("Serial Number"),
"manufacturer": moder_board.get("Manufacturer"), "manufacturer": moder_board.get("Manufacturer"),
"biosDate": self.get_bios_date(), "biosDate": self.get_bios_date(),
# "firewire": self.get_firmware(),
"ramMaxSize": self.get_max_ram_size(), "ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")), "ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(), "slots": self.get_ram_slots(),
"model": moder_board.get("Product Name"), # ?? "model": moder_board.get("Product Name"),
"pcmcia": self.get_pcmcia_num(), # ?? "firewire": self.get_firmware_num(),
"serial": self.get_serial_num(), # ?? "pcmcia": self.get_pcmcia_num(),
"serial": self.get_serial_num(),
"usb": self.get_usb_num(), "usb": self.get_usb_num(),
} }
) )
def get_graphic(self):
nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'display')
for c in nodes:
if not c['configuration'].get('driver', None):
continue
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": c.get("vendor", self.default),
"model": c.get("product", self.default),
"serialNumber": c.get("serial", self.default),
}
)
def get_memory_video(self, c):
# get info of lspci
# pci_id = c['businfo'].split('@')[1]
# lspci.get(pci_id) | grep size
# lspci -v -s 00:02.0
return None
def get_data_storage(self):
for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1:
continue
model = sm.get('model_name')
manufacturer = None
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": self.sanitize(sm),
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def sanitize(self, disk):
disk_sanitize = None
# import pdb; pdb.set_trace()
for d in self.sanitize_raw:
s = d.get('device_info', {}).get('export_data', {})
s = s.get('block', {}).get('serial')
if s == disk.get('serial_number'):
disk_sanitize = d
break
if not disk_sanitize:
return []
steps = []
step_type = 'EraseBasic'
if disk.get('name') == 'Baseline Cryptographic':
step_type = 'EraseCrypto'
if disk.get('type') == 'EraseCrypto':
step_type = 'EraseCrypto'
erase = {
'type': step_type,
'severity': disk_sanitize['severity'].name,
'steps': steps,
'startTime': None,
'endTime': None,
}
for step in disk_sanitize.get('steps', []):
steps.append(
{
'severity': step['severity'].name,
'startTime': step['start_time'].isoformat(),
'endTime': step['end_time'].isoformat(),
'type': 'StepRandom',
}
)
erase['endTime'] = step['end_time'].isoformat()
if not erase['startTime']:
erase['startTime'] = step['start_time'].isoformat()
return [erase]
def get_networks(self):
nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'network')
for c in nodes:
capacity = c.get('capacity')
units = c.get('units')
speed = None
if capacity and units:
speed = unit.Quantity(capacity, units).to('Mbit/s').m
wireless = bool(c.get('configuration', {}).get('wireless', False))
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
"speed": speed,
"variant": c.get('version', 1),
"wireless": wireless,
}
)
def get_sound_card(self):
nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'multimedia')
for c in nodes:
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
}
)
def get_display(self): # noqa: C901
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
for c in self.monitors:
resolution_width, resolution_height = (None,) * 2
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append(
{
"actions": [],
"type": "Display",
"model": model,
"manufacturer": manufacturer,
"serialNumber": serial,
'size': size,
'resolutionWidth': resolution_width,
'resolutionHeight': resolution_height,
"productionDate": production_date,
'technology': technology,
'refreshRate': refresh,
}
)
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return numpy.sqrt(w**2 + h**2) * i
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_usb_num(self): def get_usb_num(self):
return len( return len(
[ [
@ -137,6 +371,15 @@ class ParseSnapshot:
] ]
) )
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self): def get_pcmcia_num(self):
return len( return len(
[ [
@ -167,23 +410,22 @@ class ParseSnapshot:
return slots return slots
def get_ram_size(self, ram): def get_ram_size(self, ram):
size = ram.get("Size", "0") try:
memory = ram.get("Size", "0")
memory = memory.split(' ')
if len(memory) > 1:
size = int(memory[0])
units = memory[1]
return base2.Quantity(size, units).to('MiB').m
return int(size.split(" ")[0]) return int(size.split(" ")[0])
except Exception as err:
logger.error("get_ram_size error: {}".format(err))
return 0
def get_ram_speed(self, ram): def get_ram_speed(self, ram):
size = ram.get("Speed", "0") size = ram.get("Speed", "0")
return int(size.split(" ")[0]) return int(size.split(" ")[0])
def get_ram_type(self, ram):
TYPES = {'ddr', 'sdram', 'sodimm'}
for t in TYPES:
if t in ram.get("Type", "DDR"):
return t
def get_ram_format(self, ram):
channel = ram.get("Locator", "DIMM")
return 'SODIMM' if 'SODIMM' in channel else 'DIMM'
def get_cpu_speed(self, cpu): def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0") speed = cpu.get('Max Speed', "0")
return float(speed.split(" ")[0]) / 1024 return float(speed.split(" ")[0]) / 1024
@ -195,10 +437,13 @@ class ParseSnapshot:
return self.dmi.get("System")[0].get("Version", self.default) return self.dmi.get("System")[0].get("Version", self.default)
def get_uuid(self): def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", self.default) return self.dmi.get("System")[0].get("UUID", '')
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self): def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", self.default) return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self): def get_type(self):
chassis_type = self.get_chassis() chassis_type = self.get_chassis()
@ -238,26 +483,30 @@ class ParseSnapshot:
return k return k
return self.default return self.default
def get_data_storage(self): def get_chassis_dh(self):
CHASSIS_DH = {
for sm in self.smart: 'Tower': {'desktop', 'low-profile', 'tower', 'server'},
model = sm.get('model_name') 'Docking': {'docking'},
manufacturer = None 'AllInOne': {'all-in-one'},
if len(model.split(" ")) == 2: 'Microtower': {'mini-tower', 'space-saving', 'mini'},
manufacturer, model = model.split(" ") 'PizzaBox': {'pizzabox'},
'Lunchbox': {'lunchbox'},
self.components.append( 'Stick': {'stick'},
{ 'Netbook': {'notebook', 'sub-notebook'},
"actions": [], 'Handheld': {'handheld'},
"type": self.get_data_storage_type(sm), 'Laptop': {'portable', 'laptop'},
"model": model, 'Convertible': {'convertible'},
"manufacturer": manufacturer, 'Detachable': {'detachable'},
"serialNumber": sm.get('serial_number'), 'Tablet': {'tablet'},
"size": self.get_data_storage_size(sm), 'Virtual': {'_virtual'},
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
} }
)
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x): def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types # TODO @cayop add more SSDS types
@ -265,45 +514,27 @@ class ParseSnapshot:
SSD = 'SolidStateDrive' SSD = 'SolidStateDrive'
HDD = 'HardDrive' HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type') type_dev = x.get('device', {}).get('type')
return SSD if type_dev in SSDS else HDD trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x): def get_data_storage_interface(self, x):
return x.get('device', {}).get('protocol', 'ATA') interface = x.get('device', {}).get('protocol', 'ATA')
try:
DataStorageInterface(interface.upper())
except ValueError as err:
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(err))
self.errors(txt, severity=Severity.Warning)
return "ATA"
def get_data_storage_size(self, x): def get_data_storage_size(self, x):
type_dev = x.get('device', {}).get('type') total_capacity = x.get('user_capacity', {}).get('bytes')
total_capacity = "{type}_total_capacity".format(type=type_dev) if not total_capacity:
return 1
# convert bytes to Mb # convert bytes to Mb
return x.get(total_capacity) / 1024**2 return total_capacity / 1024**2
def get_networks(self):
hw_class = " Hardware Class: "
mac = " Permanent HW Address: "
model = " Model: "
wireless = "wireless"
for line in self.hwinfo:
iface = {
"variant": "1",
"actions": [],
"speed": 100.0,
"type": "NetworkAdapter",
"wireless": False,
"manufacturer": "Ethernet",
}
for y in line:
if hw_class in y and not y.split(hw_class)[1] == 'network':
break
if mac in y:
iface["serialNumber"] = y.split(mac)[1]
if model in y:
iface["model"] = y.split(model)[1]
if wireless in y:
iface["wireless"] = True
if iface.get("serialNumber"):
self.components.append(iface)
def parse_hwinfo(self): def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n") hw_blocks = self.hwinfo_raw.split("\n\n")
@ -314,6 +545,21 @@ class ParseSnapshot:
return json.loads(x) return json.loads(x)
return x return x
def errors(self, txt=None, severity=Severity.Error):
if not txt:
return self._errors
logger.error(txt)
self._errors.append(txt)
error = SnapshotsLog(
description=txt,
snapshot_uuid=self.uuid,
severity=severity,
sid=self.sid,
version=self.version,
)
error.save()
class ParseSnapshotLsHw: class ParseSnapshotLsHw:
def __init__(self, snapshot, default="n/a"): def __init__(self, snapshot, default="n/a"):
@ -321,10 +567,10 @@ class ParseSnapshotLsHw:
self.uuid = snapshot.get("uuid") self.uuid = snapshot.get("uuid")
self.sid = snapshot.get("sid") self.sid = snapshot.get("sid")
self.version = str(snapshot.get("version")) self.version = str(snapshot.get("version"))
self.dmidecode_raw = snapshot["data"]["dmidecode"] self.dmidecode_raw = snapshot["hwmd"]["dmidecode"]
self.smart = snapshot["data"]["smart"] self.smart = snapshot["hwmd"]["smart"]
self.hwinfo_raw = snapshot["data"]["hwinfo"] self.hwinfo_raw = snapshot["hwmd"]["hwinfo"]
self.lshw = snapshot["data"]["lshw"] self.lshw = snapshot["hwmd"]["lshw"]
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
self.components_obj = [] self.components_obj = []
@ -401,6 +647,11 @@ class ParseSnapshotLsHw:
def get_ram(self): def get_ram(self):
for ram in self.dmi.get("Memory Device"): for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append( self.components.append(
{ {
"actions": [], "actions": [],
@ -474,7 +725,6 @@ class ParseSnapshotLsHw:
return dmi_uuid return dmi_uuid
def get_data_storage(self): def get_data_storage(self):
for sm in self.smart: for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1: if sm.get('smartctl', {}).get('exit_status') == 1:
continue continue
@ -487,7 +737,7 @@ class ParseSnapshotLsHw:
self.components.append( self.components.append(
{ {
"actions": [self.get_test_data_storage(sm)], "actions": [],
"type": self.get_data_storage_type(sm), "type": self.get_data_storage_type(sm),
"model": model, "model": model,
"manufacturer": manufacturer, "manufacturer": manufacturer,

View file

@ -1,37 +1,94 @@
from datetime import datetime
from flask import current_app as app from flask import current_app as app
from marshmallow import Schema as MarshmallowSchema from marshmallow import Schema as MarshmallowSchema
from marshmallow import ValidationError, validates_schema from marshmallow import ValidationError, pre_load, validates_schema
from marshmallow.fields import Dict, List, Nested, String from marshmallow.fields import DateTime, Dict, Integer, List, Nested, String
from marshmallow_enum import EnumField
from ereuse_devicehub.resources.enums import Severity, SnapshotSoftware
from ereuse_devicehub.resources.schemas import Thing from ereuse_devicehub.resources.schemas import Thing
# from marshmallow_enum import EnumField
class Snapshot_lite_data(MarshmallowSchema): class Snapshot_lite_data(MarshmallowSchema):
dmidecode = String(required=True) hwmd_version = String(required=True)
hwinfo = String(required=True)
smart = List(Dict(), required=True)
lshw = Dict(required=True) lshw = Dict(required=True)
dmidecode = String(required=True)
lspci = String(required=True) lspci = String(required=True)
hwinfo = String(required=True)
smart = List(Dict(), required=False)
class Test(MarshmallowSchema):
type = String(required=True)
class Steps(MarshmallowSchema):
num = Integer(data_key='step', required=True)
start_time = DateTime(data_key='date_init', required=False)
end_time = DateTime(data_key='date_end', required=False)
severity = EnumField(Severity)
@pre_load
def preload_datas(self, data: dict):
# import pdb; pdb.set_trace()
data['severity'] = Severity.Info.name
data.pop('duration', None)
data.pop('commands', None)
if not data.pop('success', False):
data['severity'] = Severity.Error.name
if data.get('date_init'):
data['date_init'] = datetime.fromtimestamp(data['date_init']).isoformat()
if data.get('date_end'):
data['date_end'] = datetime.fromtimestamp(data['date_end']).isoformat()
else:
data['date_end'] = data['date_init']
class Sanitize(MarshmallowSchema):
steps = Nested(Steps, many=True, required=True, data_key='erasure_steps')
validation = Dict()
device_info = Dict()
method = Dict(required=True)
sanitize_version = String()
severity = EnumField(Severity, required=True)
@pre_load
def preload_datas(self, data: dict):
data['severity'] = Severity.Info.name
if not data.pop('result', False):
data['severity'] = Severity.Error.name
class Snapshot_lite(Thing): class Snapshot_lite(Thing):
uuid = String(required=True) uuid = String(required=True)
version = String(required=True) version = String(required=True)
schema_api = String(required=True) schema_api = String(required=True)
software = String(required=True) software = EnumField(
SnapshotSoftware,
required=True,
description='The software that generated this Snapshot.',
)
sid = String(required=True) sid = String(required=True)
type = String(required=True)
timestamp = String(required=True) timestamp = String(required=True)
settings_version = String(required=False) settings_version = String(required=False)
data = Nested(Snapshot_lite_data, required=True) hwmd = Nested(Snapshot_lite_data, required=True)
tests = Nested(Test, many=True, collection_class=list, required=False)
sanitize = Nested(Sanitize, many=True, collection_class=list, required=False)
@validates_schema @validates_schema
def validate_workbench_version(self, data: dict): def validate_workbench_version(self, data: dict):
if data['schema_api'] not in app.config['SCHEMA_WORKBENCH']: if data['schema_api'] not in app.config['SCHEMA_HWMD']:
raise ValidationError( raise ValidationError(
'Min. supported Workbench version is ' 'Min. supported Workbench version is '
'{} but yours is {}.'.format( '{} but yours is {}.'.format(
app.config['SCHEMA_WORKBENCH'][0], data['version'] app.config['SCHEMA_HWMD'][0], data['version']
), ),
field_names=['version'], field_names=['version'],
) )

View file

@ -523,11 +523,14 @@ class EraseSectors(EraseBasic):
def get_public_name(self): def get_public_name(self):
steps_random = 0 steps_random = 0
steps_zeros = 0 steps_zeros = 0
steps_encrypted = 0
for s in self.steps: for s in self.steps:
if s.type == 'StepRandom': if s.type == 'StepRandom':
steps_random += 1 steps_random += 1
if s.type == 'StepZero': if s.type == 'StepZero':
steps_zeros += 1 steps_zeros += 1
if s.type == 'StepEncrypted':
steps_encrypted += 1
if steps_zeros == 0 and steps_random == 1: if steps_zeros == 0 and steps_random == 1:
return "Basic" return "Basic"
@ -651,6 +654,10 @@ class StepRandom(Step):
pass pass
class StepEncrypted(Step):
pass
class Snapshot(JoinedWithOneDeviceMixin, ActionWithOneDevice): class Snapshot(JoinedWithOneDeviceMixin, ActionWithOneDevice):
"""The Snapshot sets the physical information of the device (S/N, model...) """The Snapshot sets the physical information of the device (S/N, model...)
and updates it with erasures, benchmarks, ratings, and tests; updates the and updates it with erasures, benchmarks, ratings, and tests; updates the
@ -740,7 +747,7 @@ class Snapshot(JoinedWithOneDeviceMixin, ActionWithOneDevice):
""" """
uuid = Column(UUID(as_uuid=True), unique=True) uuid = Column(UUID(as_uuid=True), unique=True)
version = Column(StrictVersionType(STR_SM_SIZE), nullable=False) version = Column(Unicode(STR_SM_SIZE), nullable=False)
software = Column(DBEnum(SnapshotSoftware), nullable=False) software = Column(DBEnum(SnapshotSoftware), nullable=False)
elapsed = Column(Interval) elapsed = Column(Interval)
elapsed.comment = """For Snapshots made with Workbench, the total amount elapsed.comment = """For Snapshots made with Workbench, the total amount

View file

@ -442,7 +442,7 @@ class Snapshot(ActionWithOneDevice):
required=True, required=True,
description='The software that generated this Snapshot.', description='The software that generated this Snapshot.',
) )
version = Version(required=True, description='The version of the software.') version = String(required=True, description='The version of the software.')
actions = NestedOn(Action, many=True, dump_only=True) actions = NestedOn(Action, many=True, dump_only=True)
elapsed = TimeDelta(precision=TimeDelta.SECONDS) elapsed = TimeDelta(precision=TimeDelta.SECONDS)
components = NestedOn( components = NestedOn(
@ -469,9 +469,12 @@ class Snapshot(ActionWithOneDevice):
@validates_schema @validates_schema
def validate_components_only_workbench(self, data: dict): def validate_components_only_workbench(self, data: dict):
if (data['software'] != SnapshotSoftware.Workbench) and ( software = [
data['software'] != SnapshotSoftware.WorkbenchAndroid SnapshotSoftware.Workbench,
): SnapshotSoftware.WorkbenchAndroid,
SnapshotSoftware.UsodyOS,
]
if data['software'] not in software:
if data.get('components', None) is not None: if data.get('components', None) is not None:
raise ValidationError( raise ValidationError(
'Only Workbench can add component info', field_names=['components'] 'Only Workbench can add component info', field_names=['components']
@ -481,7 +484,11 @@ class Snapshot(ActionWithOneDevice):
def validate_only_workbench_fields(self, data: dict): def validate_only_workbench_fields(self, data: dict):
"""Ensures workbench has ``elapsed`` and ``uuid`` and no others.""" """Ensures workbench has ``elapsed`` and ``uuid`` and no others."""
# todo test # todo test
if data['software'] == SnapshotSoftware.Workbench: software = [
SnapshotSoftware.Workbench,
SnapshotSoftware.UsodyOS,
]
if data['software'] in software:
if not data.get('uuid', None): if not data.get('uuid', None):
raise ValidationError( raise ValidationError(
'Snapshots from Workbench and WorkbenchAndroid must have uuid', 'Snapshots from Workbench and WorkbenchAndroid must have uuid',

View file

@ -70,9 +70,12 @@ class SnapshotMixin:
snapshot_json = self.snapshot_json snapshot_json = self.snapshot_json
device = snapshot_json.pop('device') # type: Computer device = snapshot_json.pop('device') # type: Computer
components = None components = None
if snapshot_json['software'] == ( software = [
SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid SnapshotSoftware.Workbench,
): SnapshotSoftware.WorkbenchAndroid,
SnapshotSoftware.UsodyOS,
]
if snapshot_json['software'] in software:
components = snapshot_json.pop('components', None) components = snapshot_json.pop('components', None)
snapshot = Snapshot(**snapshot_json) snapshot = Snapshot(**snapshot_json)

View file

@ -1747,7 +1747,7 @@ class Processor(JoinedComponentTableMixin, Component):
class RamModule(JoinedComponentTableMixin, Component): class RamModule(JoinedComponentTableMixin, Component):
"""A stick of RAM.""" """A stick of RAM."""
size = Column(SmallInteger, check_range('size', min=128, max=17000)) size = Column(SmallInteger, check_range('size', min=128, max=70000))
size.comment = """The capacity of the RAM stick.""" size.comment = """The capacity of the RAM stick."""
speed = Column(SmallInteger, check_range('speed', min=100, max=10000)) speed = Column(SmallInteger, check_range('speed', min=100, max=10000))
interface = Column(DBEnum(RamInterface)) interface = Column(DBEnum(RamInterface))

View file

@ -15,6 +15,7 @@ class SnapshotSoftware(Enum):
Web = 'Web' Web = 'Web'
DesktopApp = 'DesktopApp' DesktopApp = 'DesktopApp'
WorkbenchDesktop = 'WorkbenchDesktop' WorkbenchDesktop = 'WorkbenchDesktop'
UsodyOS = 'UsodyOS'
def __str__(self): def __str__(self):
return self.name return self.name
@ -378,6 +379,14 @@ class ErasureStandards(Enum):
And be an :class:`ereuse_devicehub.resources.action.models.EraseSectors`. And be an :class:`ereuse_devicehub.resources.action.models.EraseSectors`.
""" """
NIST = "Infosec HGM Baseline"
"""Method for securely erasing data in compliance with HMG Infosec Standard 5
guidelines includes a single step of a random write process on the full disk.
This process overwrites all data with a randomized pattern, ensuring that
it cannot be recovered. Built-in validation confirms that the data has been
written correctly, and a final validation confirms that all data has been deleted.
"""
def __str__(self): def __str__(self):
return self.value return self.value

View file

@ -1,10 +1,9 @@
import ipaddress import ipaddress
from distutils.version import StrictVersion from distutils.version import LooseVersion
from typing import Type, Union from typing import Type, Union
import colour import colour
from boltons import strutils, urlutils from boltons import strutils, urlutils
from ereuse_devicehub.ereuse_utils import if_none_return_none
from flask import current_app as app from flask import current_app as app
from flask import g from flask import g
from marshmallow import utils from marshmallow import utils
@ -17,6 +16,7 @@ from marshmallow.validate import Validator
from marshmallow_enum import EnumField as _EnumField from marshmallow_enum import EnumField as _EnumField
from sqlalchemy_utils import PhoneNumber from sqlalchemy_utils import PhoneNumber
from ereuse_devicehub.ereuse_utils import if_none_return_none
from ereuse_devicehub.teal import db as tealdb from ereuse_devicehub.teal import db as tealdb
from ereuse_devicehub.teal.resource import Schema from ereuse_devicehub.teal.resource import Schema
@ -30,7 +30,7 @@ class Version(Field):
@if_none_return_none @if_none_return_none
def _deserialize(self, value, attr, data): def _deserialize(self, value, attr, data):
return StrictVersion(value) return LooseVersion(value)
class Color(Field): class Color(Field):