inxi (take 2) #38

Merged
pedro merged 12 commits from inxi into main 2024-12-16 18:48:42 +00:00
15 changed files with 426 additions and 468 deletions

View file

@ -2,7 +2,7 @@ DOMAIN=localhost
DEMO=true DEMO=true
# note that with DEBUG=true, logs are more verbose (include tracebacks) # note that with DEBUG=true, logs are more verbose (include tracebacks)
DEBUG=true DEBUG=true
ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1, ALLOWED_HOSTS=${DOMAIN},${DOMAIN}:8000,127.0.0.1,127.0.0.1:8000
DPP=false DPP=false
STATIC_ROOT=/tmp/static/ STATIC_ROOT=/tmp/static/

View file

@ -85,17 +85,21 @@ class NewSnapshotView(ApiMixing):
# except Exception: # except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400) # return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"): ev_uuid = data.get("uuid")
if data.get("credentialSubject"):
ev_uuid = data["credentialSubject"].get("uuid")
if not ev_uuid:
txt = "error: the snapshot not have uuid" txt = "error: the snapshot not have uuid"
logger.error("%s", txt) logger.error("%s", txt)
return JsonResponse({'status': txt}, status=500) return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter( exist_annotation = Annotation.objects.filter(
uuid=data['uuid'] uuid=ev_uuid
).first() ).first()
if exist_annotation: if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid']) txt = "error: the snapshot {} exist".format(ev_uuid)
logger.warning("%s", txt) logger.warning("%s", txt)
return JsonResponse({'status': txt}, status=500) return JsonResponse({'status': txt}, status=500)
@ -105,14 +109,14 @@ class NewSnapshotView(ApiMixing):
except Exception as err: except Exception as err:
if settings.DEBUG: if settings.DEBUG:
logger.exception("%s", err) logger.exception("%s", err)
snapshot_id = data.get("uuid", "") snapshot_id = ev_uuid
txt = "It is not possible to parse snapshot: %s." txt = "It is not possible to parse snapshot: %s."
logger.error(txt, snapshot_id) logger.error(txt, snapshot_id)
text = "fail: It is not possible to parse snapshot" text = "fail: It is not possible to parse snapshot"
return JsonResponse({'status': text}, status=500) return JsonResponse({'status': text}, status=500)
annotation = Annotation.objects.filter( annotation = Annotation.objects.filter(
uuid=data['uuid'], uuid=ev_uuid,
type=Annotation.Type.SYSTEM, type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm # TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1", key="hidalgo1",
@ -121,7 +125,7 @@ class NewSnapshotView(ApiMixing):
if not annotation: if not annotation:
logger.error("Error: No annotation for uuid: %s", data["uuid"]) logger.error("Error: No annotation for uuid: %s", ev_uuid)
return JsonResponse({'status': 'fail'}, status=500) return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,)) url_args = reverse_lazy("device:details", args=(annotation.value,))

View file

@ -69,7 +69,11 @@
{{ dev.manufacturer }} {{ dev.manufacturer }}
</td> </td>
<td> <td>
{% if dev.version %}
{{dev.version}} {{ dev.model }}
{% else %}
{{ dev.model }} {{ dev.model }}
{% endif %}
</td> </td>
</tr> </tr>
</tbody> </tbody>

View file

@ -84,8 +84,11 @@ class SearchView(InventaryMixin):
return devices, count return devices, count
def get_annotations(self, xp): def get_annotations(self, xp):
snap = xp.document.get_data() snap = json.loads(xp.document.get_data())
uuid = json.loads(snap).get('uuid') if snap.get("credentialSubject"):
uuid = snap["credentialSubject"]["uuid"]
else:
uuid = snap["uuid"]
return Device.get_annotation_from_uuid(uuid, self.request.user.institution) return Device.get_annotation_from_uuid(uuid, self.request.user.institution)
def search_hids(self, query, offset, limit): def search_hids(self, query, offset, limit):

View file

@ -305,6 +305,12 @@ class Device:
self.get_last_evidence() self.get_last_evidence()
return self.last_evidence.get_model() return self.last_evidence.get_model()
@property
def version(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_version()
@property @property
def components(self): def components(self):
self.get_last_evidence() self.get_last_evidence()

View file

@ -58,34 +58,41 @@
</div> </div>
{% endif %} {% endif %}
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label">Type</div> <div class="col-lg-3 col-md-4 label">Type</div>
<div class="col-lg-9 col-md-8">{{ object.type }}</div> <div class="col-lg-9 col-md-8">{{ object.type }}</div>
</div> </div>
{% if object.is_websnapshot and object.last_user_evidence %} {% if object.is_websnapshot and object.last_user_evidence %}
{% for k, v in object.last_user_evidence %} {% for k, v in object.last_user_evidence %}
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label">{{ k }}</div> <div class="col-lg-3 col-md-4 label">{{ k }}</div>
<div class="col-lg-9 col-md-8">{{ v|default:'' }}</div> <div class="col-lg-9 col-md-8">{{ v|default:'' }}</div>
</div> </div>
{% endfor %} {% endfor %}
{% else %} {% else %}
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label"> <div class="col-lg-3 col-md-4 label">
{% trans 'Manufacturer' %} {% trans 'Manufacturer' %}
</div> </div>
<div class="col-lg-9 col-md-8">{{ object.manufacturer|default:'' }}</div> <div class="col-lg-9 col-md-8">{{ object.manufacturer|default:'' }}</div>
</div> </div>
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label"> <div class="col-lg-3 col-md-4 label">
{% trans 'Model' %} {% trans 'Model' %}
</div> </div>
<div class="col-lg-9 col-md-8">{{ object.model|default:'' }}</div> <div class="col-lg-9 col-md-8">{{ object.model|default:'' }}</div>
</div> </div>
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label">
{% trans 'Version' %}
</div>
<div class="col-lg-9 col-md-8">{{ object.version|default:'' }}</div>
</div>
<div class="row mb-1">
<div class="col-lg-3 col-md-4 label"> <div class="col-lg-3 col-md-4 label">
{% trans 'Serial Number' %} {% trans 'Serial Number' %}
</div> </div>

View file

@ -21,6 +21,8 @@ main() {
fi fi
# remove old database # remove old database
rm -vfr ./db/* rm -vfr ./db/*
# deactivate configured flag
rm -vfr ./already_configured
docker compose down -v docker compose down -v
docker compose build docker compose build
docker compose up ${detach_arg:-} docker compose up ${detach_arg:-}

View file

@ -28,9 +28,6 @@ compile = no
no-cache-dir = True no-cache-dir = True
END END
# upgrade pip, which might fail on lxc, then remove the "corrupted file"
RUN python -m pip install --upgrade pip || (rm -rf /usr/local/lib/python3.11/site-packages/pip-*.dist-info && python -m pip install --upgrade pip)
COPY ./requirements.txt /opt/devicehub-django COPY ./requirements.txt /opt/devicehub-django
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
# TODO hardcoded, is ignored in requirements.txt # TODO hardcoded, is ignored in requirements.txt

View file

@ -50,13 +50,12 @@ API_RESOLVER=${API_RESOLVER}
ID_FEDERATED=${ID_FEDERATED} ID_FEDERATED=${ID_FEDERATED}
END END
)" )"
fi
# generate config using env vars from docker # generate config using env vars from docker
# TODO rethink if this is needed because now this is django, not flask # TODO rethink if this is needed because now this is django, not flask
cat > .env <<END cat > .env <<END
${dpp_env_vars:-} ${dpp_env_vars:-}
END END
fi
} }
handle_federated_id() { handle_federated_id() {

View file

@ -29,17 +29,17 @@ class UploadForm(forms.Form):
try: try:
file_json = json.loads(file_data) file_json = json.loads(file_data)
Build(file_json, None, check=True) snap = Build(file_json, None, check=True)
exist_annotation = Annotation.objects.filter( exist_annotation = Annotation.objects.filter(
uuid=file_json['uuid'] uuid=snap.uuid
).first() ).first()
if exist_annotation: if exist_annotation:
raise ValidationError( raise ValidationError(
_("The snapshot already exists"), _("The snapshot already exists"),
code="duplicate_snapshot", code="duplicate_snapshot",
) )
#Catch any error and display it as Validation Error so the Form handles it #Catch any error and display it as Validation Error so the Form handles it
except Exception as e: except Exception as e:
raise ValidationError( raise ValidationError(
@ -221,7 +221,7 @@ class EraseServerForm(forms.Form):
if self.instance: if self.instance:
return return
Annotation.objects.create( Annotation.objects.create(
uuid=self.uuid, uuid=self.uuid,
type=Annotation.Type.ERASE_SERVER, type=Annotation.Type.ERASE_SERVER,

View file

@ -6,7 +6,7 @@ from django.db import models
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search from evidence.xapian import search
from evidence.parse_details import ParseSnapshot from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key
from user.models import User, Institution from user.models import User, Institution
@ -40,6 +40,7 @@ class Evidence:
self.doc = None self.doc = None
self.created = None self.created = None
self.dmi = None self.dmi = None
self.inxi = None
self.annotations = [] self.annotations = []
self.components = [] self.components = []
self.default = "n/a" self.default = "n/a"
@ -62,13 +63,16 @@ class Evidence:
def get_phid(self): def get_phid(self):
if not self.doc: if not self.doc:
self.get_doc() self.get_doc()
return hashlib.sha3_256(json.dumps(self.doc)).hexdigest() return hashlib.sha3_256(json.dumps(self.doc)).hexdigest()
def get_doc(self): def get_doc(self):
self.doc = {} self.doc = {}
self.inxi = None
if not self.owner: if not self.owner:
self.get_owner() self.get_owner()
qry = 'uuid:"{}"'.format(self.uuid) qry = 'uuid:"{}"'.format(self.uuid)
matches = search(self.owner, qry, limit=1) matches = search(self.owner, qry, limit=1)
if matches and matches.size() < 0: if matches and matches.size() < 0:
@ -77,9 +81,36 @@ class Evidence:
for xa in matches: for xa in matches:
self.doc = json.loads(xa.document.get_data()) self.doc = json.loads(xa.document.get_data())
if not self.is_legacy(): if self.is_legacy():
return
if self.doc.get("credentialSubject"):
for ev in self.doc["evidence"]:
if "dmidecode" == ev.get("operation"):
dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi = ev["output"]
else:
dmidecode_raw = self.doc["data"]["dmidecode"] dmidecode_raw = self.doc["data"]["dmidecode"]
inxi_raw = self.doc["data"]["inxi"]
self.dmi = DMIParse(dmidecode_raw) self.dmi = DMIParse(dmidecode_raw)
try:
self.inxi = json.loads(inxi_raw)
except Exception:
pass
if self.inxi:
try:
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
self.device_manufacturer = system
self.device_model = get_inxi(m, "product")
self.device_serial_number = get_inxi(m, "serial")
self.device_chassis = get_inxi(m, "Type")
self.device_version = get_inxi(m, "v")
except Exception:
return
def get_time(self): def get_time(self):
if not self.doc: if not self.doc:
@ -105,6 +136,9 @@ class Evidence:
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['manufacturer'] return self.doc['device']['manufacturer']
if self.inxi:
return self.device_manufacturer
return self.dmi.manufacturer().strip() return self.dmi.manufacturer().strip()
def get_model(self): def get_model(self):
@ -117,12 +151,18 @@ class Evidence:
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
if self.inxi:
return self.device_model
return self.dmi.model().strip() return self.dmi.model().strip()
def get_chassis(self): def get_chassis(self):
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
if self.inxi:
return self.device_chassis
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual') chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
lower_type = chassis.lower() lower_type = chassis.lower()
@ -134,8 +174,18 @@ class Evidence:
def get_serial_number(self): def get_serial_number(self):
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['serialNumber'] return self.doc['device']['serialNumber']
if self.inxi:
return self.device_serial_number
return self.dmi.serial_number().strip() return self.dmi.serial_number().strip()
def get_version(self):
if self.inxi:
return self.device_version
return ""
@classmethod @classmethod
def get_all(cls, user): def get_all(cls, user):
return Annotation.objects.filter( return Annotation.objects.filter(
@ -149,6 +199,9 @@ class Evidence:
self.components = snapshot['components'] self.components = snapshot['components']
def is_legacy(self): def is_legacy(self):
if self.doc.get("credentialSubject"):
return False
return self.doc.get("software") != "workbench-script" return self.doc.get("software") != "workbench-script"
def is_web_snapshot(self): def is_web_snapshot(self):

View file

@ -3,44 +3,43 @@ import hashlib
import logging import logging
from dmidecode import DMIParse from dmidecode import DMIParse
from json_repair import repair_json from evidence.parse_details import ParseSnapshot
from django.conf import settings
from evidence.parse_details import get_lshw_child, ParseSnapshot
from evidence.models import Annotation from evidence.models import Annotation
from evidence.xapian import index from evidence.xapian import index
from utils.constants import CHASSIS_DH from evidence.parse_details import get_inxi_key, get_inxi
from django.conf import settings
if settings.DPP: if settings.DPP:
from dpp.api_dlt import register_device_dlt, register_passport_dlt from dpp.api_dlt import register_device_dlt, register_passport_dlt
logger = logging.getLogger('django') logger = logging.getLogger('django')
def get_mac(lshw): def get_mac(inxi):
try: nets = get_inxi_key(inxi, "Network")
if type(lshw) is dict: networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
hw = lshw
else:
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(lshw))
nets = []
get_lshw_child(hw, nets, 'network')
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
if nets_sorted:
mac = nets_sorted[0]['serial']
logger.debug("The snapshot has the following MAC: %s" , mac)
return mac
for n, iface in networks:
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build: class Build:
def __init__(self, evidence_json, user, check=False): def __init__(self, evidence_json, user, check=False):
self.json = evidence_json self.evidence = evidence_json.copy()
self.json = evidence_json.copy()
if evidence_json.get("credentialSubject"):
self.json.update(evidence_json["credentialSubject"])
if evidence_json.get("evidence"):
self.json["data"] = {}
for ev in evidence_json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
self.uuid = self.json['uuid'] self.uuid = self.json['uuid']
self.user = user self.user = user
self.hid = None self.hid = None
@ -57,7 +56,7 @@ class Build:
self.register_device_dlt() self.register_device_dlt()
def index(self): def index(self):
snap = json.dumps(self.json) snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap) index(self.user.institution, self.uuid, snap)
def generate_chids(self): def generate_chids(self):
@ -78,27 +77,16 @@ class Build:
sku = device.get("sku", '') sku = device.get("sku", '')
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}" hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
self.chid = hashlib.sha3_256(hid.encode()).hexdigest() self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
return self.chid return self.chid
def get_chid_dpp(self): def get_chid_dpp(self):
if self.json.get("software") == "workbench-script": if self.json.get("software") == "workbench-script":
dmidecode_raw = self.json["data"]["dmidecode"] device = ParseSnapshot(self.json).device
dmi = DMIParse(dmidecode_raw)
manufacturer = dmi.manufacturer().strip()
model = dmi.model().strip()
chassis = self.get_chassis_dh()
serial_number = dmi.serial_number()
sku = self.get_sku()
typ = chassis
version = self.get_version()
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
else: else:
device = self.json['device'] device = self.json['device']
hid = self.get_id_hw_dpp(device)
hid = self.get_id_hw_dpp(device)
self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest() self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest()
return self.chid return self.chid
@ -157,42 +145,31 @@ class Build:
value=v value=v
) )
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
def get_version(self):
return self.dmi.get("System")[0].get("Verson", '_virtual')
def get_hid(self, snapshot): def get_hid(self, snapshot):
dmidecode_raw = snapshot["data"]["dmidecode"] try:
self.dmi = DMIParse(dmidecode_raw) self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
manufacturer = self.dmi.manufacturer().strip() machine = get_inxi_key(self.inxi, 'Machine')
model = self.dmi.model().strip() for m in machine:
chassis = self.get_chassis_dh() system = get_inxi(m, "System")
serial_number = self.dmi.serial_number() if system:
sku = self.get_sku() manufacturer = system
model = get_inxi(m, "product")
serial_number = get_inxi(m, "serial")
chassis = get_inxi(m, "Type")
else:
sku = get_inxi(m, "part-nu")
if not snapshot["data"].get('lshw'): mac = get_mac(self.inxi) or ""
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
lshw = snapshot["data"]["lshw"]
# mac = get_mac2(hwinfo_raw) or ""
mac = get_mac(lshw) or ""
if not mac: if not mac:
txt = "Could not retrieve MAC address in snapshot %s" txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, snapshot['uuid']) logger.warning(txt, snapshot['uuid'])
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}" return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"

View file

@ -1,10 +1,10 @@
import re
import json import json
import logging import logging
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime
from dmidecode import DMIParse from dmidecode import DMIParse
from json_repair import repair_json
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
@ -12,322 +12,345 @@ from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
logger = logging.getLogger('django') logger = logging.getLogger('django')
def get_lshw_child(child, nets, component): def get_inxi_key(inxi, component):
if child.get('id') == component: for n in inxi:
nets.append(child) for k, v in n.items():
if child.get('children'): if component in k:
[get_lshw_child(x, nets, component) for x in child['children']] return v
def get_inxi(n, name):
for k, v in n.items():
if f"#{name}" in k:
return v
return ""
class ParseSnapshot: class ParseSnapshot:
def __init__(self, snapshot, default="n/a"): def __init__(self, snapshot, default="n/a"):
self.default = default self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}") self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", []) self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.hwinfo_raw = snapshot["data"].get("hwinfo", "") self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
self.lshw_raw = snapshot["data"].get("lshw", {}) or {} for ev in snapshot.get("evidence", []):
self.lscpi_raw = snapshot["data"].get("lspci", "") if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
data = snapshot["credentialSubject"]
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw) self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw) self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw) self.inxi = self.loads(self.inxi_raw)
self.hwinfo = self.parse_hwinfo()
self.set_computer() self.set_computer()
self.get_hwinfo_monitors()
self.set_components() self.set_components()
self.snapshot_json = { self.snapshot_json = {
"type": "Snapshot", "type": "Snapshot",
"device": self.device, "device": self.device,
"software": snapshot["software"], "software": data["software"],
"components": self.components, "components": self.components,
"uuid": snapshot['uuid'], "uuid": data['uuid'],
"version": snapshot['version'], "endTime": data["timestamp"],
"endTime": snapshot["timestamp"],
"elapsed": 1, "elapsed": 1,
} }
def set_computer(self): def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer().strip() machine = get_inxi_key(self.inxi, 'Machine') or []
self.device['model'] = self.dmi.model().strip() for m in machine:
self.device['serialNumber'] = self.dmi.serial_number() system = get_inxi(m, "System")
self.device['type'] = self.get_type() if system:
self.device['sku'] = self.get_sku() self.device['manufacturer'] = system
self.device['version'] = self.get_version() self.device['model'] = get_inxi(m, "product")
self.device['system_uuid'] = self.get_uuid() self.device['serialNumber'] = get_inxi(m, "serial")
self.device['family'] = self.get_family() self.device['type'] = get_inxi(m, "Type")
self.device['chassis'] = self.get_chassis_dh() self.device['chassis'] = self.device['type']
self.device['version'] = get_inxi(m, "v")
else:
self.device['system_uuid'] = get_inxi(m, "uuid")
self.device['sku'] = get_inxi(m, "part-nu")
def set_components(self): def set_components(self):
self.get_mother_board()
self.get_cpu() self.get_cpu()
self.get_ram() self.get_ram()
self.get_mother_board()
self.get_graphic() self.get_graphic()
self.get_data_storage()
self.get_display() self.get_display()
self.get_sound_card()
self.get_networks() self.get_networks()
self.get_sound_card()
def get_cpu(self): self.get_data_storage()
for cpu in self.dmi.get('Processor'): self.get_battery()
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": serial,
"brand": cpu.get('Family'),
"address": self.get_cpu_address(cpu),
"bogomips": self.get_bogomips(),
}
)
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", "DDR"),
"format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self): def get_mother_board(self):
for moder_board in self.dmi.get("Baseboard"): machine = get_inxi_key(self.inxi, 'Machine') or []
self.components.append( mb = {"type": "Motherboard",}
{ for m in machine:
"actions": [], bios_date = get_inxi(m, "date")
"type": "Motherboard", if not bios_date:
"version": moder_board.get("Version"), continue
"serialNumber": moder_board.get("Serial Number", "").strip(), mb["manufacturer"] = get_inxi(m, "Mobo")
"manufacturer": moder_board.get("Manufacturer", "").strip(), mb["model"] = get_inxi(m, "model")
"biosDate": self.get_bios_date(), mb["serialNumber"] = get_inxi(m, "serial")
"ramMaxSize": self.get_max_ram_size(), mb["version"] = get_inxi(m, "v")
"ramSlots": len(self.dmi.get("Memory Device")), mb["biosDate"] = bios_date
"slots": self.get_ram_slots(), mb["biosVersion"] = self.get_bios_version()
"model": moder_board.get("Product Name", "").strip(), mb["firewire"]: self.get_firmware_num()
"firewire": self.get_firmware_num(), mb["pcmcia"]: self.get_pcmcia_num()
"pcmcia": self.get_pcmcia_num(), mb["serial"]: self.get_serial_num()
"serial": self.get_serial_num(), mb["usb"]: self.get_usb_num()
"usb": self.get_usb_num(),
} self.get_ram_slots(mb)
)
self.components.append(mb)
def get_ram_slots(self, mb):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
slots = get_inxi(m, "slots")
if not slots:
continue
mb["slots"] = slots
mb["ramSlots"] = get_inxi(m, "modules")
mb["ramMaxSize"] = get_inxi(m, "capacity")
def get_cpu(self):
cpu = get_inxi_key(self.inxi, 'CPU') or []
cp = {"type": "Processor"}
vulnerabilities = []
for c in cpu:
base = get_inxi(c, "model")
if base:
cp["model"] = get_inxi(c, "model")
cp["arch"] = get_inxi(c, "arch")
cp["bits"] = get_inxi(c, "bits")
cp["gen"] = get_inxi(c, "gen")
cp["family"] = get_inxi(c, "family")
cp["date"] = get_inxi(c, "built")
continue
des = get_inxi(c, "L1")
if des:
cp["L1"] = des
cp["L2"] = get_inxi(c, "L2")
cp["L3"] = get_inxi(c, "L3")
cp["cpus"] = get_inxi(c, "cpus")
cp["cores"] = get_inxi(c, "cores")
cp["threads"] = get_inxi(c, "threads")
continue
bogo = get_inxi(c, "bogomips")
if bogo:
cp["bogomips"] = bogo
cp["base/boost"] = get_inxi(c, "base/boost")
cp["min/max"] = get_inxi(c, "min/max")
cp["ext-clock"] = get_inxi(c, "ext-clock")
cp["volts"] = get_inxi(c, "volts")
continue
ctype = get_inxi(c, "Type")
if ctype:
v = {"Type": ctype}
status = get_inxi(c, "status")
if status:
v["status"] = status
mitigation = get_inxi(c, "mitigation")
if mitigation:
v["mitigation"] = mitigation
vulnerabilities.append(v)
self.components.append(cp)
def get_ram(self):
memory = get_inxi_key(self.inxi, 'Memory') or []
mem = {"type": "RamModule"}
for m in memory:
base = get_inxi(m, "System RAM")
if base:
mem["size"] = get_inxi(m, "total")
slot = get_inxi(m, "manufacturer")
if slot:
mem["manufacturer"] = slot
mem["model"] = get_inxi(m, "part-no")
mem["serialNumber"] = get_inxi(m, "serial")
mem["speed"] = get_inxi(m, "speed")
mem["bits"] = get_inxi(m, "data")
mem["interface"] = get_inxi(m, "type")
module = get_inxi(m, "modules")
if module:
mem["modules"] = module
self.components.append(mem)
def get_graphic(self): def get_graphic(self):
displays = [] graphics = get_inxi_key(self.inxi, 'Graphics') or []
get_lshw_child(self.lshw, displays, 'display')
for c in graphics:
for c in displays: if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
if not c['configuration'].get('driver', None):
continue continue
self.components.append( self.components.append(
{ {
"actions": [],
"type": "GraphicCard", "type": "GraphicCard",
"memory": self.get_memory_video(c), "memory": self.get_memory_video(c),
"manufacturer": c.get("vendor", self.default), "manufacturer": get_inxi(c, "vendor"),
"model": c.get("product", self.default), "model": get_inxi(c, "Device"),
"serialNumber": c.get("serial", self.default), "arch": get_inxi(c, "arch"),
"serialNumber": get_inxi(c, "serial"),
"integrated": True if get_inxi(c, "port") else False
}
)
def get_battery(self):
bats = get_inxi_key(self.inxi, 'Battery') or []
for b in bats:
self.components.append(
{
"type": "Battery",
"model": get_inxi(b, "model"),
"serialNumber": get_inxi(b, "serial"),
"condition": get_inxi(b, "condition"),
"cycles": get_inxi(b, "cycles"),
"volts": get_inxi(b, "volts")
} }
) )
def get_memory_video(self, c): def get_memory_video(self, c):
# get info of lspci memory = get_inxi_key(self.inxi, 'Memory') or []
# pci_id = c['businfo'].split('@')[1]
# lspci.get(pci_id) | grep size for m in memory:
# lspci -v -s 00:02.0 igpu = get_inxi(m, "igpu")
return None agpu = get_inxi(m, "agpu")
ngpu = get_inxi(m, "ngpu")
gpu = get_inxi(m, "gpu")
if igpu or agpu or gpu or ngpu:
return igpu or agpu or gpu or ngpu
return self.default
def get_data_storage(self): def get_data_storage(self):
for sm in self.smart: hdds= get_inxi_key(self.inxi, 'Drives') or []
if sm.get('smartctl', {}).get('exit_status') == 1: for d in hdds:
usb = get_inxi(d, "type")
if usb == "USB":
continue continue
model = sm.get('model_name')
manufacturer = None
hours = sm.get("power_on_time", {}).get("hours", 0)
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append( serial = get_inxi(d, "serial")
{ if serial:
"actions": self.sanitize(sm), hd = {
"type": self.get_data_storage_type(sm), "type": "Storage",
"model": model, "manufacturer": get_inxi(d, "vendor"),
"manufacturer": manufacturer, "model": get_inxi(d, "model"),
"serialNumber": sm.get('serial_number'), "serialNumber": get_inxi(d, "serial"),
"size": self.get_data_storage_size(sm), "size": get_inxi(d, "size"),
"variant": sm.get("firmware_version"), "speed": get_inxi(d, "speed"),
"interface": self.get_data_storage_interface(sm), "interface": get_inxi(d, "tech"),
"hours": hours, "firmware": get_inxi(d, "fw-rev")
} }
) rpm = get_inxi(d, "rpm")
if rpm:
hd["rpm"] = rpm
family = get_inxi(d, "family")
if family:
hd["family"] = family
sata = get_inxi(d, "sata")
if sata:
hd["sata"] = sata
continue
cycles = get_inxi(d, "cycles")
if cycles:
hd['cycles'] = cycles
hd["health"] = get_inxi(d, "health")
hd["time of used"] = get_inxi(d, "on")
hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units")
self.components.append(hd)
continue
hd = {}
def sanitize(self, action): def sanitize(self, action):
return [] return []
def get_bogomips(self):
if not self.hwinfo:
return self.default
bogomips = 0
for row in self.hwinfo:
for cel in row:
if 'BogoMips' in cel:
try:
bogomips += float(cel.split(":")[-1])
except:
pass
return bogomips
def get_networks(self): def get_networks(self):
networks = [] nets = get_inxi_key(self.inxi, "Network") or []
get_lshw_child(self.lshw, networks, 'network') networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for c in networks: for n, iface in networks:
capacity = c.get('capacity') model = get_inxi(n, "Device")
wireless = bool(c.get('configuration', {}).get('wireless', False)) if not model:
continue
interface = ''
for k in n.keys():
if "port" in k:
interface = "Integrated"
if "pcie" in k:
interface = "PciExpress"
if get_inxi(n, "type") == "USB":
interface = "USB"
self.components.append( self.components.append(
{ {
"actions": [],
"type": "NetworkAdapter", "type": "NetworkAdapter",
"model": c.get('product'), "model": model,
"manufacturer": c.get('vendor'), "manufacturer": get_inxi(n, 'vendor'),
"serialNumber": c.get('serial'), "serialNumber": get_inxi(iface, 'mac'),
"speed": capacity, "speed": get_inxi(n, "speed"),
"variant": c.get('version', 1), "interface": interface,
"wireless": wireless or False,
"integrated": "PCI:0000:00" in c.get("businfo", ""),
} }
) )
def get_sound_card(self): def get_sound_card(self):
multimedias = [] audio = get_inxi_key(self.inxi, "Audio") or []
get_lshw_child(self.lshw, multimedias, 'multimedia')
for c in audio:
for c in multimedias: model = get_inxi(c, "Device")
if not model:
continue
self.components.append( self.components.append(
{ {
"actions": [],
"type": "SoundCard", "type": "SoundCard",
"model": c.get('product'), "model": model,
"manufacturer": c.get('vendor'), "manufacturer": get_inxi(c, 'vendor'),
"serialNumber": c.get('serial'), "serialNumber": get_inxi(c, 'serial'),
} }
) )
def get_display(self): # noqa: C901 def get_display(self):
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED' graphics = get_inxi_key(self.inxi, "Graphics") or []
for c in graphics:
for c in self.monitors: if not get_inxi(c, "Monitor"):
resolution_width, resolution_height = (None,) * 2 continue
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append( self.components.append(
{ {
"actions": [],
"type": "Display", "type": "Display",
"model": model, "model": get_inxi(c, "model"),
"manufacturer": manufacturer, "manufacturer": get_inxi(c, "vendor"),
"serialNumber": serial, "serialNumber": get_inxi(c, "serial"),
'size': size, 'size': get_inxi(c, "size"),
'resolutionWidth': resolution_width, 'diagonal': get_inxi(c, "diag"),
'resolutionHeight': resolution_height, 'resolution': get_inxi(c, "res"),
"productionDate": production_date, "date": get_inxi(c, "built"),
'technology': technology, 'ratio': get_inxi(c, "ratio"),
'refreshRate': refresh,
} }
) )
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_usb_num(self): def get_usb_num(self):
return len( return len(
[ [
@ -364,133 +387,13 @@ class ParseSnapshot:
] ]
) )
def get_bios_date(self): def get_bios_version(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default) return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
memory = ram.get("Size", "0")
return memory
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return size
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return speed
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default).strip()
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", '').strip()
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
if interface.upper() in DATASTORAGEINTERFACE:
return interface.upper()
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(err))
def get_data_storage_size(self, x):
return x.get('user_capacity', {}).get('bytes')
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x): def loads(self, x):
if isinstance(x, str): if isinstance(x, str):
try: try:
try: return json.loads(x)
hw = json.loads(x)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(x))
return hw
except Exception as ss: except Exception as ss:
logger.warning("%s", ss) logger.warning("%s", ss)
return {} return {}
@ -502,4 +405,3 @@ class ParseSnapshot:
logger.error(txt) logger.error(txt)
self._errors.append("%s", txt) self._errors.append("%s", txt)

View file

@ -11,6 +11,7 @@ xlrd==2.0.1
odfpy==1.4.1 odfpy==1.4.1
pytz==2024.2 pytz==2024.2
json-repair==0.30.0 json-repair==0.30.0
setuptools==75.5.0 setuptools==65.5.1
requests==2.32.3 requests==2.32.3
wheel==0.45.0 wheel==0.45.1

View file

@ -19,7 +19,10 @@ def move_json(path_name, user, place="snapshots"):
def save_in_disk(data, user, place="snapshots"): def save_in_disk(data, user, place="snapshots"):
uuid = data.get('uuid', '') uuid = data.get("uuid")
if data.get("credentialSubject"):
uuid = data["credentialSubject"].get("uuid")
now = datetime.now() now = datetime.now()
year = now.year year = now.year
month = now.month month = now.month