Compare commits

..

6 commits

Author SHA1 Message Date
pedro 754820f631 Merge pull request 'upload_legacy_snapshot' (#40) from upload_legacy_snapshot into main
Reviewed-on: #40
2025-01-23 11:25:18 +00:00
pedro 01db6a3e8e add all snapshots that we support
coauthored with cayo
2025-01-23 12:24:18 +01:00
Cayo Puigdefabregas 9817d4eb45 fix credential snapshot 2025-01-23 12:24:18 +01:00
Cayo Puigdefabregas 0d60afaa35 split parse files for cases of use 2025-01-23 12:24:18 +01:00
Cayo Puigdefabregas 918cf73506 refactor legacy 2025-01-23 12:24:18 +01:00
pedro 9d963e7f54 docker: facilitate multiple instances in same host
start using namespace because more things are going to come in same
docker compose
2025-01-21 14:21:22 +01:00
36 changed files with 1284 additions and 683 deletions

View file

@ -1,8 +1,8 @@
DOMAIN=localhost
DH_DOMAIN=localhost
DH_PORT=8000
DEMO=true
# note that with DEBUG=true, logs are more verbose (include tracebacks)
DEBUG=true
ALLOWED_HOSTS=${DOMAIN},${DOMAIN}:8000,127.0.0.1,127.0.0.1:8000
DPP=false
STATIC_ROOT=/tmp/static/
@ -16,6 +16,7 @@ EMAIL_BACKEND="django.core.mail.backends.smtp.EmailBackend"
EMAIL_FILE_PATH="/tmp/app-messages"
ENABLE_EMAIL=false
PREDEFINED_TOKEN='5018dd65-9abd-4a62-8896-80f34ac66150'
DH_ALLOWED_HOSTS=${DH_DOMAIN},${DH_DOMAIN}:${DH_PORT},127.0.0.1,127.0.0.1:${DH_PORT}
# TODO review these vars
#SNAPSHOTS_DIR=/path/to/TODO
#EVIDENCES_DIR=/path/to/TODO

View file

@ -37,9 +37,6 @@
<li class="nav-item">
<a class="nav-link" href="{% url 'device:device_web' object.id %}" target="_blank">Web</a>
</li>
<li class="nav-item">
<a href="#environmental_impact" class="nav-link" data-bs-toggle="tab" data-bs-target="#environmental_impact">{% trans 'Environmental impact' %}</a>
</li>
</ul>
</div>
</div>
@ -245,87 +242,6 @@
</div>
</div>
<div class="tab-pane fade" id="environmental_impact">
<div class="container-fluid py-3">
<div class="d-flex justify-content-end mb-3">
<a class="btn btn-success">
<i class="bi bi-file-earmark-pdf"></i>
{% trans 'Export to PDF' %}
</a>
</div>
<div class="row g-4 mb-4">
<div class="col-md-4">
<div class="card h-100 border-success">
<div class="card-body text-center">
<div class="mb-3">
<i class="bi bi-arrow-down-circle text-success" style="font-size: 2rem;"></i>
</div>
<h5 class="card-title text-success">Carbon Reduction</h5>
<h2 class="mb-2">{{ impact.carbon_saved }}</h2>
<p class="card-text text-muted">kg CO₂e saved</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card h-100 border-danger">
<div class="card-body text-center">
<div class="mb-3">
<i class="bi bi-cloud-fill text-danger" style="font-size: 2rem;"></i>
</div>
<h5 class="card-title text-danger">Carbon Consumed</h5>
<h2 class="mb-2">{{ impact.co2_emissions }}</h2>
<p class="card-text text-muted">kg CO₂e consumed</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card h-100 border-success">
<div class="card-body text-center">
<div class="mb-3">
<i class="bi bi-recycle text-success" style="font-size: 2rem;"></i>
</div>
<h5 class="card-title text-success">Additional Impact Metric</h5>
<h2 class="mb-2">85%</h2>
<p class="card-text text-muted">whatever</p>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col">
<div class="card">
<div class="card-body">
<h5 class="card-title">Impact Details</h5>
<div class="table-responsive">
<table class="table table-bordered">
<tbody>
<tr>
<th scope="row" class="bg-light" style="width: 30%;">Manufacturing Impact Avoided</th>
<td>
<span class="text-success">{{ impact.carbon_saved }}</span> kg CO₂e
<br />
<small class="text-muted">Based on average laptop manufacturing emissions</small>
</td>
</tr>
</tbody>
</table>
</div>
<div class="mt-3">
<h6>Calculation Method</h6>
<small class="text-muted">Based on industry standards X Y and Z</small>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
{% if dpps %}
<div class="tab-pane fade" id="dpps">
<h5 class="card-title">{% trans 'List of dpps' %}</h5>

View file

@ -9,5 +9,6 @@ urlpatterns = [
path("<str:pk>/", views.DetailsView.as_view(), name="details"),
path("<str:pk>/annotation/add", views.AddAnnotationView.as_view(), name="add_annotation"),
path("<str:pk>/document/add", views.AddDocumentView.as_view(), name="add_document"),
path("<str:pk>/public/", views.PublicDeviceWebView.as_view(), name="device_web")
path("<str:pk>/public/", views.PublicDeviceWebView.as_view(), name="device_web"),
]

View file

@ -14,7 +14,6 @@ from evidence.models import Annotation
from lot.models import LotTag
from device.models import Device
from device.forms import DeviceFormSet
from environmental_impact.algorithms.algorithm_factory import FactoryEnvironmentImpactAlgorithm
if settings.DPP:
from dpp.models import Proof
from dpp.api_dlt import PROOF_TYPE
@ -111,16 +110,10 @@ class DetailsView(DashboardView, TemplateView):
uuid__in=self.object.uuids,
type=PROOF_TYPE["IssueDPP"]
)
enviromental_impact_algorithm = FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
"dummy_calc"
)
enviromental_impact = enviromental_impact_algorithm.get_device_environmental_impact(
self.object)
context.update({
'object': self.object,
'snapshot': self.object.get_last_evidence(),
'lot_tags': lot_tags,
'impact': enviromental_impact,
'dpps': dpps,
})
return context

View file

@ -89,7 +89,6 @@ INSTALLED_APPS = [
"dashboard",
"admin",
"api",
"environmental_impact"
]
DPP = config("DPP", default=False, cast=bool)

View file

@ -5,13 +5,14 @@ services:
dockerfile: docker/devicehub-django.Dockerfile
environment:
- DEBUG=${DEBUG:-false}
- DOMAIN=${DOMAIN:-localhost}
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN}
- DOMAIN=${DH_DOMAIN:-localhost}
- PORT=${DH_PORT:-8000}
- ALLOWED_HOSTS=${DH_ALLOWED_HOSTS:-$DH_DOMAIN}
- DEMO=${DEMO:-false}
- PREDEFINED_TOKEN=${PREDEFINED_TOKEN:-}
- DPP=${DPP:-false}
volumes:
- .:/opt/devicehub-django
ports:
- 8000:8000
- ${DH_PORT}:${DH_PORT}

View file

@ -1,3 +0,0 @@
from django.contrib import admin
# Register your models here.

View file

@ -1,30 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .dummy_calculator import DummyEnvironmentalImpactAlgorithm
if TYPE_CHECKING:
from .algorithm_interface import EnvironmentImpactAlgorithm
class AlgorithmNames():
"""
Enum class for the different types of algorithms.
"""
DUMMY_CALC = "dummy_calc"
algorithm_names = {
DUMMY_CALC: DummyEnvironmentalImpactAlgorithm()
}
class FactoryEnvironmentImpactAlgorithm():
@staticmethod
def run_environmental_impact_calculation(algorithm_name: str) -> EnvironmentImpactAlgorithm:
try:
return AlgorithmNames.algorithm_names[algorithm_name]
except KeyError:
raise ValueError("Invalid algorithm name. Valid options are: " +
", ".join(AlgorithmNames.algorithm_names.keys()))

View file

@ -1,11 +0,0 @@
from abc import ABC, abstractmethod
from functools import lru_cache
from device.models import Device
from environmental_impact.models import EnvironmentalImpact
class EnvironmentImpactAlgorithm(ABC):
@abstractmethod
def get_device_environmental_impact(self, device: Device) -> EnvironmentalImpact:
pass

View file

@ -1,33 +0,0 @@
from device.models import Device
from .algorithm_interface import EnvironmentImpactAlgorithm
from environmental_impact.models import EnvironmentalImpact
class DummyEnvironmentalImpactAlgorithm(EnvironmentImpactAlgorithm):
def get_device_environmental_impact(self, device: Device) -> EnvironmentalImpact:
# TODO Make a constants file / class
avg_watts = 40 # Arbitrary laptop average consumption
co2_per_kwh = 0.475
power_on_hours = self.get_power_on_hours_from(device)
energy_kwh = (power_on_hours * avg_watts) / 1000
co2_emissions = energy_kwh * co2_per_kwh
return EnvironmentalImpact(co2_emissions=co2_emissions)
def get_power_on_hours_from(self, device: Device) -> int:
# TODO how do I check if the device is a legacy workbench? Is there a better way?
is_legacy_workbench = False if device.last_evidence.inxi else True
if not is_legacy_workbench:
storage_components = device.components[9]
str_time = storage_components.get('time of used', -1)
else:
str_time = ""
uptime_in_hours = self.convert_str_time_to_hours(str_time, is_legacy_workbench)
return uptime_in_hours
def convert_str_time_to_hours(self, time_str: str, is_legacy_workbench: bool) -> int:
if is_legacy_workbench:
return -1 # TODO Power on hours not available in legacy workbench
else:
multipliers = {'y': 365 * 24, 'd': 24, 'h': 1}
return sum(int(part[:-1]) * multipliers[part[-1]] for part in time_str.split())

View file

@ -1,6 +0,0 @@
from django.apps import AppConfig
class EnvironmentalImpactConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "environmental_impact"

View file

@ -1,8 +0,0 @@
from dataclasses import dataclass
from django.db import models
@dataclass
class EnvironmentalImpact:
carbon_saved: float = 0.0
co2_emissions: float = 0.0

View file

@ -1,44 +0,0 @@
from unittest.mock import patch
import uuid
from django.test import TestCase
from device.models import Device
from environmental_impact.models import EnvironmentalImpact
from environmental_impact.algorithms.dummy_calculator import DummyEnvironmentalImpactAlgorithm
from evidence.models import Evidence
class DummyEnvironmentalImpactAlgorithmTests(TestCase):
@patch('evidence.models.Evidence.get_doc', return_value={'credentialSubject': {}})
@patch('evidence.models.Evidence.get_time', return_value=None)
def setUp(self, mock_get_time, mock_get_doc):
self.device = Device(id='1')
evidence = self.device.last_evidence = Evidence(uuid=uuid.uuid4())
evidence.inxi = True
evidence.doc = {'credentialSubject': {}}
self.algorithm = DummyEnvironmentalImpactAlgorithm()
def test_get_power_on_hours_from_legacy_device(self):
# TODO is there a way to check that?
pass
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
def test_get_power_on_hours_from_inxi_device(self, mock_get_components):
hours = self.algorithm.get_power_on_hours_from(self.device)
self.assertEqual(
hours, 8811, "Inxi-parsed devices should correctly compute power-on hours")
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
def test_convert_str_time_to_hours(self, mock_get_components):
result = self.algorithm.convert_str_time_to_hours('1y 2d 3h', False)
self.assertEqual(
result, 8811, "String to hours conversion should match expected output")
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
def test_environmental_impact_calculation(self, mock_get_components):
impact = self.algorithm.get_device_environmental_impact(self.device)
self.assertIsInstance(impact, EnvironmentalImpact,
"Output should be an EnvironmentalImpact instance")
expected_co2 = 8811 * 40 * 0.475 / 1000
self.assertAlmostEqual(impact.co2_emissions, expected_co2,
2, "CO2 emissions calculation should be accurate")

View file

@ -1,17 +0,0 @@
from environmental_impact.algorithms.algorithm_factory import FactoryEnvironmentImpactAlgorithm
from django.test import TestCase
from environmental_impact.algorithms.dummy_calculator import DummyEnvironmentalImpactAlgorithm
class FactoryEnvironmentImpactAlgorithmTests(TestCase):
def test_valid_algorithm_name(self):
algorithm = FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
'dummy_calc')
self.assertIsInstance(algorithm, DummyEnvironmentalImpactAlgorithm,
"Factory should return a DummyEnvironmentalImpactAlgorithm instance")
def test_invalid_algorithm_name(self):
with self.assertRaises(ValueError):
FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
'invalid_calc')

View file

@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View file

@ -29,7 +29,8 @@ class UploadForm(forms.Form):
try:
file_json = json.loads(file_data)
snap = Build(file_json, None, check=True)
build = Build
snap = build(file_json, None, check=True)
exist_annotation = Annotation.objects.filter(
uuid=snap.uuid
).first()
@ -57,7 +58,9 @@ class UploadForm(forms.Form):
for ev in self.evidences:
path_name = save_in_disk(ev[1], user.institution.name)
Build(ev[1], user)
build = Build
file_json = ev[1]
build(file_json, user)
move_json(path_name, user.institution.name)

69
evidence/legacy_parse.py Normal file
View file

@ -0,0 +1,69 @@
import json
import logging
from dmidecode import DMIParse
from json_repair import repair_json
from evidence.mixin_parse import BuildMix
from evidence.legacy_parse_details import get_lshw_child, ParseSnapshot
from utils.constants import CHASSIS_DH
logger = logging.getLogger('django')
def get_mac(lshw):
try:
if type(lshw) is dict:
hw = lshw
else:
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(lshw))
nets = []
get_lshw_child(hw, nets, 'network')
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
if nets_sorted:
mac = nets_sorted[0]['serial']
logger.debug("The snapshot has the following MAC: %s" , mac)
return mac
class Build(BuildMix):
# This parse is for get info from snapshots created with
# workbench-script but builded for send to devicehub-teal
def get_details(self):
dmidecode_raw = self.json["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw)
self.manufacturer = self.dmi.manufacturer().strip()
self.model = self.dmi.model().strip()
self.chassis = self.get_chassis_dh()
self.serial_number = self.dmi.serial_number()
self.sku = self.get_sku()
self.typ = self.chassis
self.version = self.get_version()
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
def get_version(self):
return self.dmi.get("System")[0].get("Verson", '_virtual')
def _get_components(self):
data = ParseSnapshot(self.json)
self.components = data.components

View file

@ -0,0 +1,503 @@
import json
import logging
import numpy as np
from datetime import datetime
from dmidecode import DMIParse
from json_repair import repair_json
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
logger = logging.getLogger('django')
def get_lshw_child(child, nets, component):
try:
if child.get('id') == component:
nets.append(child)
if child.get('children'):
[get_lshw_child(x, nets, component) for x in child['children']]
except Exception:
return []
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", [])
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
self.lscpi_raw = snapshot["data"].get("lspci", "")
self.device = {"actions": []}
self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw)
self.hwinfo = self.parse_hwinfo()
self.set_computer()
self.get_hwinfo_monitors()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": snapshot["software"],
"components": self.components,
"uuid": snapshot['uuid'],
"version": snapshot['version'],
"endTime": snapshot["timestamp"],
"elapsed": 1,
}
def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer().strip()
self.device['model'] = self.dmi.model().strip()
self.device['serialNumber'] = self.dmi.serial_number()
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['system_uuid'] = self.get_uuid()
self.device['family'] = self.get_family()
self.device['chassis'] = self.get_chassis_dh()
def set_components(self):
self.get_cpu()
self.get_ram()
self.get_mother_board()
self.get_graphic()
self.get_data_storage()
self.get_display()
self.get_sound_card()
self.get_networks()
def get_cpu(self):
for cpu in self.dmi.get('Processor'):
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": serial,
"brand": cpu.get('Family'),
"address": self.get_cpu_address(cpu),
"bogomips": self.get_bogomips(),
}
)
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", "DDR"),
"format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self):
for moder_board in self.dmi.get("Baseboard"):
self.components.append(
{
"actions": [],
"type": "Motherboard",
"version": moder_board.get("Version"),
"serialNumber": moder_board.get("Serial Number", "").strip(),
"manufacturer": moder_board.get("Manufacturer", "").strip(),
"biosDate": self.get_bios_date(),
"ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(),
"model": moder_board.get("Product Name", "").strip(),
"firewire": self.get_firmware_num(),
"pcmcia": self.get_pcmcia_num(),
"serial": self.get_serial_num(),
"usb": self.get_usb_num(),
}
)
def get_graphic(self):
displays = []
get_lshw_child(self.lshw, displays, 'display')
for c in displays:
if not c['configuration'].get('driver', None):
continue
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"memory": "",
"manufacturer": c.get("vendor", self.default),
"model": c.get("product", self.default),
"serialNumber": c.get("serial", self.default),
}
)
def get_data_storage(self):
for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1:
continue
model = sm.get('model_name')
manufacturer = None
hours = sm.get("power_on_time", {}).get("hours", 0)
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": self.sanitize(sm),
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
"hours": hours,
}
)
def sanitize(self, action):
return []
def get_bogomips(self):
if not self.hwinfo:
return self.default
bogomips = 0
for row in self.hwinfo:
for cel in row:
if 'BogoMips' in cel:
try:
bogomips += float(cel.split(":")[-1])
except Exception:
pass
return bogomips
def get_networks(self):
networks = []
get_lshw_child(self.lshw, networks, 'network')
for c in networks:
capacity = c.get('capacity')
wireless = bool(c.get('configuration', {}).get('wireless', False))
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
"speed": capacity,
"variant": c.get('version', 1),
"wireless": wireless or False,
"integrated": "PCI:0000:00" in c.get("businfo", ""),
}
)
def get_sound_card(self):
multimedias = []
get_lshw_child(self.lshw, multimedias, 'multimedia')
for c in multimedias:
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
}
)
def get_display(self): # noqa: C901
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
for c in self.monitors:
resolution_width, resolution_height = (None,) * 2
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append(
{
"actions": [],
"type": "Display",
"model": model,
"manufacturer": manufacturer,
"serialNumber": serial,
'size': size,
'resolutionWidth': resolution_width,
'resolutionHeight': resolution_height,
"productionDate": production_date,
'technology': technology,
'refreshRate': refresh,
}
)
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
def get_cpu_address(self, cpu):
default = 64
try:
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
except:
return default
return default
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_date(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
memory = ram.get("Size", "0")
return memory
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return size
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return speed
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default).strip()
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", '').strip()
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
if interface.upper() in DATASTORAGEINTERFACE:
return interface.upper()
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(txt))
def get_data_storage_size(self, x):
return x.get('user_capacity', {}).get('bytes')
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, str):
try:
try:
hw = json.loads(x)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(x))
return hw
except Exception as ss:
logger.warning("%s", ss)
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)

58
evidence/mixin_parse.py Normal file
View file

@ -0,0 +1,58 @@
import logging
from django.conf import settings
from utils.constants import ALGOS
logger = logging.getLogger('django')
class BuildMix:
def __init__(self, evidence_json):
self.json = evidence_json
self.uuid = self.json.get('uuid')
self.manufacturer = ""
self.model = ""
self.serial_number = ""
self.chassis = ""
self.sku = ""
self.mac = ""
self.tpy = ""
self.version = ""
self.get_details()
self.generate_chids()
def get_hid(self, algo):
algorithm = ALGOS.get(algo, [])
hid = ""
for f in algorithm:
if hasattr(self, f):
hid += getattr(self, f)
return hid
def generate_chids(self):
self.algorithms = {
'hidalgo1': self.get_hid('hidalgo1'),
}
if settings.DPP:
self.algorithms["legacy_dpp"] = self.get_hid("legacy_dpp")
def get_doc(self):
self._get_components()
for c in self.components:
c.pop("actions", None)
components = sorted(self.components, key=lambda x: x.get("type"))
device = self.algorithms.get('legacy_dpp')
doc = [("computer", device)]
for c in components:
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
def get_id_hw_dpp(self, d):
algorithm = ALGOS.get("legacy_dpp", [])
hid = ""
for f in algorithm:
hid += d.get(f, '')
return hid

View file

@ -6,7 +6,8 @@ from django.db import models
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search
from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key
from evidence.parse_details import ParseSnapshot
from evidence.normal_parse_details import get_inxi, get_inxi_key
from user.models import User, Institution
@ -92,7 +93,7 @@ class Evidence:
self.inxi = ev["output"]
else:
dmidecode_raw = self.doc["data"]["dmidecode"]
inxi_raw = self.doc["data"]["inxi"]
inxi_raw = self.doc.get("data", {}).get("inxi")
self.dmi = DMIParse(dmidecode_raw)
try:
self.inxi = json.loads(inxi_raw)
@ -134,7 +135,7 @@ class Evidence:
return list(self.doc.get('kv').values())[0]
if self.is_legacy():
return self.doc['device']['manufacturer']
return self.doc.get('device', {}).get('manufacturer', '')
if self.inxi:
return self.device_manufacturer
@ -149,7 +150,7 @@ class Evidence:
return list(self.doc.get('kv').values())[1]
if self.is_legacy():
return self.doc['device']['model']
return self.doc.get('device', {}).get('model', '')
if self.inxi:
return self.device_model
@ -158,7 +159,7 @@ class Evidence:
def get_chassis(self):
if self.is_legacy():
return self.doc['device']['model']
return self.doc.get('device', {}).get('model', '')
if self.inxi:
return self.device_chassis
@ -173,7 +174,7 @@ class Evidence:
def get_serial_number(self):
if self.is_legacy():
return self.doc['device']['serialNumber']
return self.doc.get('device', {}).get('serialNumber', '')
if self.inxi:
return self.device_serial_number
@ -195,8 +196,7 @@ class Evidence:
).order_by("-created").values_list("uuid", "created").distinct()
def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components']
self.components = ParseSnapshot(self.doc).components
def is_legacy(self):
if self.doc.get("credentialSubject"):

64
evidence/normal_parse.py Normal file
View file

@ -0,0 +1,64 @@
import json
import logging
from evidence.mixin_parse import BuildMix
from evidence.normal_parse_details import get_inxi_key, get_inxi, ParseSnapshot
logger = logging.getLogger('django')
def get_mac(inxi):
nets = get_inxi_key(inxi, "Network")
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build(BuildMix):
def get_details(self):
self.from_credential()
try:
self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
self.manufacturer = system
self.model = get_inxi(m, "product")
self.serial_number = get_inxi(m, "serial")
self.chassis = get_inxi(m, "Type")
else:
self.sku = get_inxi(m, "part-nu")
self.mac = get_mac(self.inxi) or ""
if not self.mac:
txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, self.uuid)
def from_credential(self):
if not self.json.get("credentialSubject"):
return
self.uuid = self.json.get("credentialSubject", {}).get("uuid")
self.json.update(self.json["credentialSubject"])
if self.json.get("evidence"):
self.json["data"] = {}
for ev in self.json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
def _get_components(self):
data = ParseSnapshot(self.json)
self.components = data.components

View file

@ -0,0 +1,402 @@
import json
import logging
from dmidecode import DMIParse
logger = logging.getLogger('django')
def get_inxi_key(inxi, component):
for n in inxi:
for k, v in n.items():
if component in k:
return v
def get_inxi(n, name):
for k, v in n.items():
if f"#{name}" in k:
return v
return ""
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
for ev in snapshot.get("evidence", []):
if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
data = snapshot["credentialSubject"]
self.device = {"actions": []}
self.components = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.inxi = self.loads(self.inxi_raw)
self.set_computer()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": data["software"],
"components": self.components,
"uuid": data['uuid'],
"endTime": data["timestamp"],
"elapsed": 1,
}
def set_computer(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
for m in machine:
system = get_inxi(m, "System")
if system:
self.device['manufacturer'] = system
self.device['model'] = get_inxi(m, "product")
self.device['serialNumber'] = get_inxi(m, "serial")
self.device['type'] = get_inxi(m, "Type")
self.device['chassis'] = self.device['type']
self.device['version'] = get_inxi(m, "v")
else:
self.device['system_uuid'] = get_inxi(m, "uuid")
self.device['sku'] = get_inxi(m, "part-nu")
def set_components(self):
self.get_mother_board()
self.get_cpu()
self.get_ram()
self.get_graphic()
self.get_display()
self.get_networks()
self.get_sound_card()
self.get_data_storage()
self.get_battery()
def get_mother_board(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
mb = {"type": "Motherboard",}
for m in machine:
bios_date = get_inxi(m, "date")
if not bios_date:
continue
mb["manufacturer"] = get_inxi(m, "Mobo")
mb["model"] = get_inxi(m, "model")
mb["serialNumber"] = get_inxi(m, "serial")
mb["version"] = get_inxi(m, "v")
mb["biosDate"] = bios_date
mb["biosVersion"] = self.get_bios_version()
mb["firewire"]: self.get_firmware_num()
mb["pcmcia"]: self.get_pcmcia_num()
mb["serial"]: self.get_serial_num()
mb["usb"]: self.get_usb_num()
self.get_ram_slots(mb)
self.components.append(mb)
def get_ram_slots(self, mb):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
slots = get_inxi(m, "slots")
if not slots:
continue
mb["slots"] = slots
mb["ramSlots"] = get_inxi(m, "modules")
mb["ramMaxSize"] = get_inxi(m, "capacity")
def get_cpu(self):
cpu = get_inxi_key(self.inxi, 'CPU') or []
cp = {"type": "Processor"}
vulnerabilities = []
for c in cpu:
base = get_inxi(c, "model")
if base:
cp["model"] = get_inxi(c, "model")
cp["arch"] = get_inxi(c, "arch")
cp["bits"] = get_inxi(c, "bits")
cp["gen"] = get_inxi(c, "gen")
cp["family"] = get_inxi(c, "family")
cp["date"] = get_inxi(c, "built")
continue
des = get_inxi(c, "L1")
if des:
cp["L1"] = des
cp["L2"] = get_inxi(c, "L2")
cp["L3"] = get_inxi(c, "L3")
cp["cpus"] = get_inxi(c, "cpus")
cp["cores"] = get_inxi(c, "cores")
cp["threads"] = get_inxi(c, "threads")
continue
bogo = get_inxi(c, "bogomips")
if bogo:
cp["bogomips"] = bogo
cp["base/boost"] = get_inxi(c, "base/boost")
cp["min/max"] = get_inxi(c, "min/max")
cp["ext-clock"] = get_inxi(c, "ext-clock")
cp["volts"] = get_inxi(c, "volts")
continue
ctype = get_inxi(c, "Type")
if ctype:
v = {"Type": ctype}
status = get_inxi(c, "status")
if status:
v["status"] = status
mitigation = get_inxi(c, "mitigation")
if mitigation:
v["mitigation"] = mitigation
vulnerabilities.append(v)
self.components.append(cp)
def get_ram(self):
memory = get_inxi_key(self.inxi, 'Memory') or []
mem = {"type": "RamModule"}
for m in memory:
base = get_inxi(m, "System RAM")
if base:
mem["size"] = get_inxi(m, "total")
slot = get_inxi(m, "manufacturer")
if slot:
mem["manufacturer"] = slot
mem["model"] = get_inxi(m, "part-no")
mem["serialNumber"] = get_inxi(m, "serial")
mem["speed"] = get_inxi(m, "speed")
mem["bits"] = get_inxi(m, "data")
mem["interface"] = get_inxi(m, "type")
module = get_inxi(m, "modules")
if module:
mem["modules"] = module
self.components.append(mem)
def get_graphic(self):
graphics = get_inxi_key(self.inxi, 'Graphics') or []
for c in graphics:
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
continue
self.components.append(
{
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": get_inxi(c, "vendor"),
"model": get_inxi(c, "Device"),
"arch": get_inxi(c, "arch"),
"serialNumber": get_inxi(c, "serial"),
"integrated": True if get_inxi(c, "port") else False
}
)
def get_battery(self):
bats = get_inxi_key(self.inxi, 'Battery') or []
for b in bats:
self.components.append(
{
"type": "Battery",
"model": get_inxi(b, "model"),
"serialNumber": get_inxi(b, "serial"),
"condition": get_inxi(b, "condition"),
"cycles": get_inxi(b, "cycles"),
"volts": get_inxi(b, "volts")
}
)
def get_memory_video(self, c):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
igpu = get_inxi(m, "igpu")
agpu = get_inxi(m, "agpu")
ngpu = get_inxi(m, "ngpu")
gpu = get_inxi(m, "gpu")
if igpu or agpu or gpu or ngpu:
return igpu or agpu or gpu or ngpu
return self.default
def get_data_storage(self):
hdds= get_inxi_key(self.inxi, 'Drives') or []
for d in hdds:
usb = get_inxi(d, "type")
if usb == "USB":
continue
serial = get_inxi(d, "serial")
if serial:
hd = {
"type": "Storage",
"manufacturer": get_inxi(d, "vendor"),
"model": get_inxi(d, "model"),
"serialNumber": get_inxi(d, "serial"),
"size": get_inxi(d, "size"),
"speed": get_inxi(d, "speed"),
"interface": get_inxi(d, "tech"),
"firmware": get_inxi(d, "fw-rev")
}
rpm = get_inxi(d, "rpm")
if rpm:
hd["rpm"] = rpm
family = get_inxi(d, "family")
if family:
hd["family"] = family
sata = get_inxi(d, "sata")
if sata:
hd["sata"] = sata
continue
cycles = get_inxi(d, "cycles")
if cycles:
hd['cycles'] = cycles
hd["health"] = get_inxi(d, "health")
hd["time of used"] = get_inxi(d, "on")
hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units")
self.components.append(hd)
continue
hd = {}
def sanitize(self, action):
return []
def get_networks(self):
nets = get_inxi_key(self.inxi, "Network") or []
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
model = get_inxi(n, "Device")
if not model:
continue
interface = ''
for k in n.keys():
if "port" in k:
interface = "Integrated"
if "pcie" in k:
interface = "PciExpress"
if get_inxi(n, "type") == "USB":
interface = "USB"
self.components.append(
{
"type": "NetworkAdapter",
"model": model,
"manufacturer": get_inxi(n, 'vendor'),
"serialNumber": get_inxi(iface, 'mac'),
"speed": get_inxi(n, "speed"),
"interface": interface,
}
)
def get_sound_card(self):
audio = get_inxi_key(self.inxi, "Audio") or []
for c in audio:
model = get_inxi(c, "Device")
if not model:
continue
self.components.append(
{
"type": "SoundCard",
"model": model,
"manufacturer": get_inxi(c, 'vendor'),
"serialNumber": get_inxi(c, 'serial'),
}
)
def get_display(self):
graphics = get_inxi_key(self.inxi, "Graphics") or []
for c in graphics:
if not get_inxi(c, "Monitor"):
continue
self.components.append(
{
"type": "Display",
"model": get_inxi(c, "model"),
"manufacturer": get_inxi(c, "vendor"),
"serialNumber": get_inxi(c, "serial"),
'size': get_inxi(c, "size"),
'diagonal': get_inxi(c, "diag"),
'resolution': get_inxi(c, "res"),
"date": get_inxi(c, "built"),
'ratio': get_inxi(c, "ratio"),
}
)
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_version(self):
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
def loads(self, x):
if isinstance(x, str):
try:
return json.loads(x)
except Exception as ss:
logger.warning("%s", ss)
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)

22
evidence/old_parse.py Normal file
View file

@ -0,0 +1,22 @@
import logging
from evidence.mixin_parse import BuildMix
logger = logging.getLogger('django')
class Build(BuildMix):
# This parse is for get info from snapshots created with old workbench
# normaly is worbench 11
def get_details(self):
device = self.json.get('device', {})
self.manufacturer = device.get("manufacturer", '')
self.model = device.get("model", '')
self.chassis = device.get("chassis", '')
self.serial_number = device.get("serialNumber", '')
self.sku = device.get("sku", '')
def _get_components(self):
self.components = self.json.get("components", [])

View file

@ -0,0 +1,13 @@
import logging
logger = logging.getLogger('django')
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.snapshot_json = snapshot
self.device = snapshot.get("device")
self.components = snapshot.get("components")

View file

@ -2,12 +2,14 @@ import json
import hashlib
import logging
from dmidecode import DMIParse
from evidence import legacy_parse
from evidence import old_parse
from evidence import normal_parse
from evidence.parse_details import ParseSnapshot
from evidence.models import Annotation
from evidence.xapian import index
from evidence.parse_details import get_inxi_key, get_inxi
from evidence.normal_parse_details import get_inxi_key, get_inxi
from django.conf import settings
if settings.DPP:
@ -24,9 +26,83 @@ def get_mac(inxi):
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build:
def __init__(self, evidence_json, user, check=False):
"""
This Build do the save in xapian as document, in Annotations and do
register in dlt if is configured for that.
We have 4 cases for parser diferents snapshots than come from workbench.
1) worbench 11 is old_parse.
2) legacy is the worbench-script when create a snapshot for devicehub-teal
3) some snapshots come as a credential. In this case is parsed as normal_parse
4) normal snapshot from worbench-script is the most basic and is parsed as normal_parse
"""
self.evidence = evidence_json.copy()
self.uuid = self.evidence.get('uuid')
self.user = user
if evidence_json.get("credentialSubject"):
self.build = normal_parse.Build(evidence_json)
self.uuid = evidence_json.get("credentialSubject", {}).get("uuid")
elif evidence_json.get("software") != "workbench-script":
self.build = old_parse.Build(evidence_json)
elif evidence_json.get("data",{}).get("lshw"):
self.build = legacy_parse.Build(evidence_json)
else:
self.build = normal_parse.Build(evidence_json)
if check:
return
self.index()
self.create_annotations()
if settings.DPP:
self.register_device_dlt()
def index(self):
snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap)
def create_annotations(self):
annotation = Annotation.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
logger.warning(txt, self.uuid)
return
for k, v in self.build.algorithms.items():
Annotation.objects.create(
uuid=self.uuid,
owner=self.user.institution,
user=self.user,
type=Annotation.Type.SYSTEM,
key=k,
value=self.sign(v)
)
def sign(self, doc):
return hashlib.sha3_256(doc.encode()).hexdigest()
def register_device_dlt(self):
legacy_dpp = self.build.algorithms.get('legacy_dpp')
chid = self.sign(legacy_dpp)
phid = self.sign(json.dumps(self.build.get_doc()))
register_device_dlt(chid, phid, self.uuid, self.user)
register_passport_dlt(chid, phid, self.uuid, self.user)
class Build2:
def __init__(self, evidence_json, user, check=False):
if evidence_json.get("data",{}).get("lshw"):
if evidence_json.get("software") == "workbench-script":
return legacy_parse.Build(evidence_json, user, check=check)
self.evidence = evidence_json.copy()
self.json = evidence_json.copy()

View file

@ -1,407 +1,38 @@
import re
import json
import logging
import numpy as np
from datetime import datetime
from dmidecode import DMIParse
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
from evidence import (
legacy_parse_details,
normal_parse_details,
old_parse_details
)
logger = logging.getLogger('django')
def get_inxi_key(inxi, component):
for n in inxi:
for k, v in n.items():
if component in k:
return v
def get_inxi(n, name):
for k, v in n.items():
if f"#{name}" in k:
return v
return ""
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
for ev in snapshot.get("evidence", []):
if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
data = snapshot["credentialSubject"]
self.device = {"actions": []}
self.components = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.inxi = self.loads(self.inxi_raw)
self.set_computer()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": data["software"],
"components": self.components,
"uuid": data['uuid'],
"endTime": data["timestamp"],
"elapsed": 1,
}
def set_computer(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
for m in machine:
system = get_inxi(m, "System")
if system:
self.device['manufacturer'] = system
self.device['model'] = get_inxi(m, "product")
self.device['serialNumber'] = get_inxi(m, "serial")
self.device['type'] = get_inxi(m, "Type")
self.device['chassis'] = self.device['type']
self.device['version'] = get_inxi(m, "v")
self.build = normal_parse_details.ParseSnapshot(
snapshot,
default=default
)
elif snapshot.get("software") != "workbench-script":
self.build = old_parse_details.ParseSnapshot(
snapshot,
default=default
)
elif snapshot.get("data",{}).get("lshw"):
self.build = legacy_parse_details.ParseSnapshot(
snapshot,
default=default
)
else:
self.device['system_uuid'] = get_inxi(m, "uuid")
self.device['sku'] = get_inxi(m, "part-nu")
def set_components(self):
self.get_mother_board()
self.get_cpu()
self.get_ram()
self.get_graphic()
self.get_display()
self.get_networks()
self.get_sound_card()
self.get_data_storage()
self.get_battery()
def get_mother_board(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
mb = {"type": "Motherboard",}
for m in machine:
bios_date = get_inxi(m, "date")
if not bios_date:
continue
mb["manufacturer"] = get_inxi(m, "Mobo")
mb["model"] = get_inxi(m, "model")
mb["serialNumber"] = get_inxi(m, "serial")
mb["version"] = get_inxi(m, "v")
mb["biosDate"] = bios_date
mb["biosVersion"] = self.get_bios_version()
mb["firewire"]: self.get_firmware_num()
mb["pcmcia"]: self.get_pcmcia_num()
mb["serial"]: self.get_serial_num()
mb["usb"]: self.get_usb_num()
self.get_ram_slots(mb)
self.components.append(mb)
def get_ram_slots(self, mb):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
slots = get_inxi(m, "slots")
if not slots:
continue
mb["slots"] = slots
mb["ramSlots"] = get_inxi(m, "modules")
mb["ramMaxSize"] = get_inxi(m, "capacity")
def get_cpu(self):
cpu = get_inxi_key(self.inxi, 'CPU') or []
cp = {"type": "Processor"}
vulnerabilities = []
for c in cpu:
base = get_inxi(c, "model")
if base:
cp["model"] = get_inxi(c, "model")
cp["arch"] = get_inxi(c, "arch")
cp["bits"] = get_inxi(c, "bits")
cp["gen"] = get_inxi(c, "gen")
cp["family"] = get_inxi(c, "family")
cp["date"] = get_inxi(c, "built")
continue
des = get_inxi(c, "L1")
if des:
cp["L1"] = des
cp["L2"] = get_inxi(c, "L2")
cp["L3"] = get_inxi(c, "L3")
cp["cpus"] = get_inxi(c, "cpus")
cp["cores"] = get_inxi(c, "cores")
cp["threads"] = get_inxi(c, "threads")
continue
bogo = get_inxi(c, "bogomips")
if bogo:
cp["bogomips"] = bogo
cp["base/boost"] = get_inxi(c, "base/boost")
cp["min/max"] = get_inxi(c, "min/max")
cp["ext-clock"] = get_inxi(c, "ext-clock")
cp["volts"] = get_inxi(c, "volts")
continue
ctype = get_inxi(c, "Type")
if ctype:
v = {"Type": ctype}
status = get_inxi(c, "status")
if status:
v["status"] = status
mitigation = get_inxi(c, "mitigation")
if mitigation:
v["mitigation"] = mitigation
vulnerabilities.append(v)
self.components.append(cp)
def get_ram(self):
memory = get_inxi_key(self.inxi, 'Memory') or []
mem = {"type": "RamModule"}
for m in memory:
base = get_inxi(m, "System RAM")
if base:
mem["size"] = get_inxi(m, "total")
slot = get_inxi(m, "manufacturer")
if slot:
mem["manufacturer"] = slot
mem["model"] = get_inxi(m, "part-no")
mem["serialNumber"] = get_inxi(m, "serial")
mem["speed"] = get_inxi(m, "speed")
mem["bits"] = get_inxi(m, "data")
mem["interface"] = get_inxi(m, "type")
module = get_inxi(m, "modules")
if module:
mem["modules"] = module
self.components.append(mem)
def get_graphic(self):
graphics = get_inxi_key(self.inxi, 'Graphics') or []
for c in graphics:
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
continue
self.components.append(
{
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": get_inxi(c, "vendor"),
"model": get_inxi(c, "Device"),
"arch": get_inxi(c, "arch"),
"serialNumber": get_inxi(c, "serial"),
"integrated": True if get_inxi(c, "port") else False
}
self.build = normal_parse_details.ParseSnapshot(
snapshot,
default=default
)
def get_battery(self):
bats = get_inxi_key(self.inxi, 'Battery') or []
for b in bats:
self.components.append(
{
"type": "Battery",
"model": get_inxi(b, "model"),
"serialNumber": get_inxi(b, "serial"),
"condition": get_inxi(b, "condition"),
"cycles": get_inxi(b, "cycles"),
"volts": get_inxi(b, "volts")
}
)
def get_memory_video(self, c):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
igpu = get_inxi(m, "igpu")
agpu = get_inxi(m, "agpu")
ngpu = get_inxi(m, "ngpu")
gpu = get_inxi(m, "gpu")
if igpu or agpu or gpu or ngpu:
return igpu or agpu or gpu or ngpu
return self.default
def get_data_storage(self):
hdds= get_inxi_key(self.inxi, 'Drives') or []
for d in hdds:
usb = get_inxi(d, "type")
if usb == "USB":
continue
serial = get_inxi(d, "serial")
if serial:
hd = {
"type": "Storage",
"manufacturer": get_inxi(d, "vendor"),
"model": get_inxi(d, "model"),
"serialNumber": get_inxi(d, "serial"),
"size": get_inxi(d, "size"),
"speed": get_inxi(d, "speed"),
"interface": get_inxi(d, "tech"),
"firmware": get_inxi(d, "fw-rev")
}
rpm = get_inxi(d, "rpm")
if rpm:
hd["rpm"] = rpm
family = get_inxi(d, "family")
if family:
hd["family"] = family
sata = get_inxi(d, "sata")
if sata:
hd["sata"] = sata
continue
cycles = get_inxi(d, "cycles")
if cycles:
hd['cycles'] = cycles
hd["health"] = get_inxi(d, "health")
hd["time of used"] = get_inxi(d, "on")
hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units")
self.components.append(hd)
continue
hd = {}
def sanitize(self, action):
return []
def get_networks(self):
nets = get_inxi_key(self.inxi, "Network") or []
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
model = get_inxi(n, "Device")
if not model:
continue
interface = ''
for k in n.keys():
if "port" in k:
interface = "Integrated"
if "pcie" in k:
interface = "PciExpress"
if get_inxi(n, "type") == "USB":
interface = "USB"
self.components.append(
{
"type": "NetworkAdapter",
"model": model,
"manufacturer": get_inxi(n, 'vendor'),
"serialNumber": get_inxi(iface, 'mac'),
"speed": get_inxi(n, "speed"),
"interface": interface,
}
)
def get_sound_card(self):
audio = get_inxi_key(self.inxi, "Audio") or []
for c in audio:
model = get_inxi(c, "Device")
if not model:
continue
self.components.append(
{
"type": "SoundCard",
"model": model,
"manufacturer": get_inxi(c, 'vendor'),
"serialNumber": get_inxi(c, 'serial'),
}
)
def get_display(self):
graphics = get_inxi_key(self.inxi, "Graphics") or []
for c in graphics:
if not get_inxi(c, "Monitor"):
continue
self.components.append(
{
"type": "Display",
"model": get_inxi(c, "model"),
"manufacturer": get_inxi(c, "vendor"),
"serialNumber": get_inxi(c, "serial"),
'size': get_inxi(c, "size"),
'diagonal': get_inxi(c, "diag"),
'resolution': get_inxi(c, "res"),
"date": get_inxi(c, "built"),
'ratio': get_inxi(c, "ratio"),
}
)
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_version(self):
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
def loads(self, x):
if isinstance(x, str):
try:
return json.loads(x)
except Exception as ss:
logger.warning("%s", ss)
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)
self.default = default
self.device = self.build.snapshot_json.get("device")
self.components = self.build.snapshot_json.get("components")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -13,7 +13,7 @@ HID_ALGO1 = [
"manufacturer",
"model",
"chassis",
"serialNumber",
"serial_number",
"sku"
]
@ -21,7 +21,7 @@ LEGACY_DPP = [
"manufacturer",
"model",
"chassis",
"serialNumber",
"serial_number",
"sku",
"type",
"version"