From 0d60afaa35bbd6cbb1478948e0aa816a616bc168 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Sat, 11 Jan 2025 20:44:21 +0100 Subject: [PATCH] split parse files for cases of use --- evidence/forms.py | 9 - evidence/legacy_parse.py | 147 ++--------- evidence/legacy_parse_details.py | 23 +- evidence/mixin_parse.py | 58 +++++ evidence/models.py | 20 +- evidence/normal_parse.py | 63 +++++ evidence/normal_parse_details.py | 402 +++++++++++++++++++++++++++++ evidence/old_parse.py | 22 ++ evidence/old_parse_details.py | 13 + evidence/parse.py | 75 +++++- evidence/parse_details.py | 426 ++----------------------------- utils/constants.py | 4 +- 12 files changed, 699 insertions(+), 563 deletions(-) create mode 100644 evidence/mixin_parse.py create mode 100644 evidence/normal_parse.py create mode 100644 evidence/normal_parse_details.py create mode 100644 evidence/old_parse.py create mode 100644 evidence/old_parse_details.py diff --git a/evidence/forms.py b/evidence/forms.py index 117a9f3..1f3e322 100644 --- a/evidence/forms.py +++ b/evidence/forms.py @@ -8,7 +8,6 @@ from utils.device import create_annotation, create_doc, create_index from utils.forms import MultipleFileField from device.models import Device from evidence.parse import Build -from evidence.legacy_parse import Build as legacy_build from evidence.models import Annotation from utils.save_snapshots import move_json, save_in_disk @@ -31,10 +30,6 @@ class UploadForm(forms.Form): try: file_json = json.loads(file_data) build = Build - if file_json.get("data",{}).get("lshw"): - if file_json.get("software") == "workbench-script": - build = legacy_build - snap = build(file_json, None, check=True) exist_annotation = Annotation.objects.filter( uuid=snap.uuid @@ -65,10 +60,6 @@ class UploadForm(forms.Form): path_name = save_in_disk(ev[1], user.institution.name) build = Build file_json = ev[1] - if file_json.get("data",{}).get("lshw"): - if file_json.get("software") == "workbench-script": - build = legacy_build - build(file_json, user) move_json(path_name, user.institution.name) diff --git a/evidence/legacy_parse.py b/evidence/legacy_parse.py index 640abbb..1d9f6a0 100644 --- a/evidence/legacy_parse.py +++ b/evidence/legacy_parse.py @@ -1,17 +1,11 @@ import json -import hashlib import logging from dmidecode import DMIParse from json_repair import repair_json -from django.conf import settings +from evidence.mixin_parse import BuildMix from evidence.legacy_parse_details import get_lshw_child, ParseSnapshot - -from evidence.models import Annotation -from evidence.xapian import index from utils.constants import CHASSIS_DH -if settings.DPP: - from dpp.api_dlt import register_device_dlt, register_passport_dlt logger = logging.getLogger('django') @@ -37,100 +31,21 @@ def get_mac(lshw): return mac -class Build: - def __init__(self, evidence_json, user, check=False): - self.json = evidence_json - self.uuid = self.json['uuid'] - self.user = user - self.hid = None - self.chid = None - self.phid = self.get_signature(self.json) - self.generate_chids() +class Build(BuildMix): + # This parse is for get info from snapshots created with + # workbench-script but builded for send to devicehub-teal - if check: - return - - self.index() - self.create_annotations() - if settings.DPP: - self.register_device_dlt() - - def index(self): - snap = json.dumps(self.json) - index(self.user.institution, self.uuid, snap) - - def generate_chids(self): - self.algorithms = { - 'hidalgo1': self.get_hid(), - 'legacy_dpp': self.get_chid_dpp(), - } - - def get_chid_dpp(self): + def get_details(self): dmidecode_raw = self.json["data"]["dmidecode"] - dmi = DMIParse(dmidecode_raw) + self.dmi = DMIParse(dmidecode_raw) - manufacturer = dmi.manufacturer().strip() - model = dmi.model().strip() - chassis = self.get_chassis_dh() - serial_number = dmi.serial_number() - sku = self.get_sku() - typ = chassis - version = self.get_version() - hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}" - - self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest() - return self.chid - - def get_id_hw_dpp(self, d): - manufacturer = d.get("manufacturer", '') - model = d.get("model", '') - chassis = d.get("chassis", '') - serial_number = d.get("serialNumber", '') - sku = d.get("sku", '') - typ = d.get("type", '') - version = d.get("version", '') - - return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}" - - def get_phid(self): - data = ParseSnapshot(self.json) - self.device = data.device - self.components = data.components - - self.device.pop("actions", None) - for c in self.components: - c.pop("actions", None) - - device = self.get_id_hw_dpp(self.device) - components = sorted(self.components, key=lambda x: x.get("type")) - doc = [("computer", device)] - - for c in components: - doc.append((c.get("type"), self.get_id_hw_dpp(c))) - - return doc - - def create_annotations(self): - annotation = Annotation.objects.filter( - uuid=self.uuid, - owner=self.user.institution, - type=Annotation.Type.SYSTEM, - ) - - if annotation: - txt = "Warning: Snapshot %s already registered (annotation exists)" - logger.warning(txt, self.uuid) - return - - for k, v in self.algorithms.items(): - Annotation.objects.create( - uuid=self.uuid, - owner=self.user.institution, - user=self.user, - type=Annotation.Type.SYSTEM, - key=k, - value=v - ) + self.manufacturer = self.dmi.manufacturer().strip() + self.model = self.dmi.model().strip() + self.chassis = self.get_chassis_dh() + self.serial_number = self.dmi.serial_number() + self.sku = self.get_sku() + self.typ = self.chassis + self.version = self.get_version() def get_chassis_dh(self): chassis = self.get_chassis() @@ -149,36 +64,6 @@ class Build: def get_version(self): return self.dmi.get("System")[0].get("Verson", '_virtual') - def get_hid(self): - snapshot = self.json - dmidecode_raw = snapshot["data"]["dmidecode"] - self.dmi = DMIParse(dmidecode_raw) - - manufacturer = self.dmi.manufacturer().strip() - model = self.dmi.model().strip() - chassis = self.get_chassis_dh() - serial_number = self.dmi.serial_number() - sku = self.get_sku() - - if not snapshot["data"].get('lshw'): - return f"{manufacturer}{model}{chassis}{serial_number}{sku}" - - lshw = snapshot["data"]["lshw"] - # mac = get_mac2(hwinfo_raw) or "" - mac = get_mac(lshw) or "" - if not mac: - txt = "Could not retrieve MAC address in snapshot %s" - logger.warning(txt, snapshot['uuid']) - - hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}" - self.chid = hashlib.sha3_256(hid.encode()).hexdigest() - return self.chid - - def get_signature(self, doc): - return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest() - - def register_device_dlt(self): - chid = self.algorithms.get('legacy_dpp') - phid = self.get_signature(self.get_phid()) - register_device_dlt(chid, phid, self.uuid, self.user) - register_passport_dlt(chid, phid, self.uuid, self.user) + def _get_components(self): + data = ParseSnapshot(self.json) + self.components = data.components diff --git a/evidence/legacy_parse_details.py b/evidence/legacy_parse_details.py index 0edb7e5..c8df532 100644 --- a/evidence/legacy_parse_details.py +++ b/evidence/legacy_parse_details.py @@ -13,10 +13,13 @@ logger = logging.getLogger('django') def get_lshw_child(child, nets, component): - if child.get('id') == component: - nets.append(child) - if child.get('children'): - [get_lshw_child(x, nets, component) for x in child['children']] + try: + if child.get('id') == component: + nets.append(child) + if child.get('children'): + [get_lshw_child(x, nets, component) for x in child['children']] + except Exception: + return [] class ParseSnapshot: def __init__(self, snapshot, default="n/a"): @@ -314,10 +317,14 @@ class ParseSnapshot: def get_cpu_address(self, cpu): default = 64 - for ch in self.lshw.get('children', []): - for c in ch.get('children', []): - if c['class'] == 'processor': - return c.get('width', default) + + try: + for ch in self.lshw.get('children', []): + for c in ch.get('children', []): + if c['class'] == 'processor': + return c.get('width', default) + except: + return default return default def get_usb_num(self): diff --git a/evidence/mixin_parse.py b/evidence/mixin_parse.py new file mode 100644 index 0000000..18dcf76 --- /dev/null +++ b/evidence/mixin_parse.py @@ -0,0 +1,58 @@ +import logging +from django.conf import settings + +from utils.constants import ALGOS + + +logger = logging.getLogger('django') + + +class BuildMix: + def __init__(self, evidence_json): + self.json = evidence_json + self.uuid = self.json['uuid'] + self.manufacturer = "" + self.model = "" + self.serial_number = "" + self.chassis = "" + self.sku = "" + self.mac = "" + self.tpy = "" + self.version = "" + self.get_details() + self.generate_chids() + + def get_hid(self, algo): + algorithm = ALGOS.get(algo, []) + hid = "" + for f in algorithm: + if hasattr(self, f): + hid += getattr(self, f) + return hid + + def generate_chids(self): + self.algorithms = { + 'hidalgo1': self.get_hid('hidalgo1'), + } + if settings.DPP: + self.algorithms["legacy_dpp"] = self.get_hid("legacy_dpp") + + def get_doc(self): + self._get_components() + for c in self.components: + c.pop("actions", None) + + components = sorted(self.components, key=lambda x: x.get("type")) + device = self.algorithms.get('legacy_dpp') + + doc = [("computer", device)] + + for c in components: + doc.append((c.get("type"), self.get_id_hw_dpp(c))) + + def get_id_hw_dpp(self, d): + algorithm = ALGOS.get("legacy_dpp", []) + hid = "" + for f in algorithm: + hid += d.get(f, '') + return hid diff --git a/evidence/models.py b/evidence/models.py index f6a9131..024031d 100644 --- a/evidence/models.py +++ b/evidence/models.py @@ -6,8 +6,8 @@ from django.db import models from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH from evidence.xapian import search -from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key -from evidence.legacy_parse_details import ParseSnapshot as legacy_ParseSnapshot +from evidence.parse_details import ParseSnapshot +from evidence.normal_parse_details import get_inxi, get_inxi_key from user.models import User, Institution @@ -135,7 +135,7 @@ class Evidence: return list(self.doc.get('kv').values())[0] if self.is_legacy(): - return self.doc['device']['manufacturer'] + return self.doc.get('device', {}).get('manufacturer', '') if self.inxi: return self.device_manufacturer @@ -150,7 +150,7 @@ class Evidence: return list(self.doc.get('kv').values())[1] if self.is_legacy(): - return self.doc['device']['model'] + return self.doc.get('device', {}).get('model', '') if self.inxi: return self.device_model @@ -159,7 +159,7 @@ class Evidence: def get_chassis(self): if self.is_legacy(): - return self.doc['device']['model'] + return self.doc.get('device', {}).get('model', '') if self.inxi: return self.device_chassis @@ -174,7 +174,7 @@ class Evidence: def get_serial_number(self): if self.is_legacy(): - return self.doc['device']['serialNumber'] + return self.doc.get('device', {}).get('serialNumber', '') if self.inxi: return self.device_serial_number @@ -196,13 +196,7 @@ class Evidence: ).order_by("-created").values_list("uuid", "created").distinct() def set_components(self): - parse = ParseSnapshot - if self.doc.get("software") == "workbench-script": - if self.doc.get("data", {}).get("lshw"): - parse = legacy_ParseSnapshot - - snapshot = parse(self.doc).snapshot_json - self.components = snapshot['components'] + self.components = ParseSnapshot(self.doc).components def is_legacy(self): if self.doc.get("credentialSubject"): diff --git a/evidence/normal_parse.py b/evidence/normal_parse.py new file mode 100644 index 0000000..4ad901b --- /dev/null +++ b/evidence/normal_parse.py @@ -0,0 +1,63 @@ +import json +import logging + +from evidence.mixin_parse import BuildMix +from evidence.normal_parse_details import get_inxi_key, get_inxi, ParseSnapshot + + +logger = logging.getLogger('django') + + +def get_mac(inxi): + nets = get_inxi_key(inxi, "Network") + networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)] + + for n, iface in networks: + if get_inxi(n, "port"): + return get_inxi(iface, 'mac') + + +class Build(BuildMix): + + def get_details(self): + self.from_credential() + try: + self.inxi = self.json["data"]["inxi"] + if isinstance(self.inxi, str): + self.inxi = json.loads(self.inxi) + except Exception: + logger.error("No inxi in snapshot %s", self.uuid) + return "" + + machine = get_inxi_key(self.inxi, 'Machine') + for m in machine: + system = get_inxi(m, "System") + if system: + self.manufacturer = system + self.model = get_inxi(m, "product") + self.serial_number = get_inxi(m, "serial") + self.chassis = get_inxi(m, "Type") + else: + self.sku = get_inxi(m, "part-nu") + + self.mac = get_mac(self.inxi) or "" + if not self.mac: + txt = "Could not retrieve MAC address in snapshot %s" + logger.warning(txt, self.uuid) + + def from_credential(self): + if not self.json.get("credentialSubject"): + return + + self.json.update(self.json["credentialSubject"]) + if self.json.get("evidence"): + self.json["data"] = {} + for ev in self.json["evidence"]: + k = ev.get("operation") + if not k: + continue + self.json["data"][k] = ev.get("output") + + def _get_components(self): + data = ParseSnapshot(self.json) + self.components = data.components diff --git a/evidence/normal_parse_details.py b/evidence/normal_parse_details.py new file mode 100644 index 0000000..9c24b70 --- /dev/null +++ b/evidence/normal_parse_details.py @@ -0,0 +1,402 @@ +import json +import logging + +from dmidecode import DMIParse + + +logger = logging.getLogger('django') + + +def get_inxi_key(inxi, component): + for n in inxi: + for k, v in n.items(): + if component in k: + return v + + +def get_inxi(n, name): + for k, v in n.items(): + if f"#{name}" in k: + return v + + return "" + + +class ParseSnapshot: + def __init__(self, snapshot, default="n/a"): + self.default = default + self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}") + self.smart_raw = snapshot.get("data", {}).get("smartctl", []) + self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or "" + for ev in snapshot.get("evidence", []): + if "dmidecode" == ev.get("operation"): + self.dmidecode_raw = ev["output"] + if "inxi" == ev.get("operation"): + self.inxi_raw = ev["output"] + if "smartctl" == ev.get("operation"): + self.smart_raw = ev["output"] + data = snapshot + if snapshot.get("credentialSubject"): + data = snapshot["credentialSubject"] + + self.device = {"actions": []} + self.components = [] + + self.dmi = DMIParse(self.dmidecode_raw) + self.smart = self.loads(self.smart_raw) + self.inxi = self.loads(self.inxi_raw) + + self.set_computer() + self.set_components() + self.snapshot_json = { + "type": "Snapshot", + "device": self.device, + "software": data["software"], + "components": self.components, + "uuid": data['uuid'], + "endTime": data["timestamp"], + "elapsed": 1, + } + + def set_computer(self): + machine = get_inxi_key(self.inxi, 'Machine') or [] + for m in machine: + system = get_inxi(m, "System") + if system: + self.device['manufacturer'] = system + self.device['model'] = get_inxi(m, "product") + self.device['serialNumber'] = get_inxi(m, "serial") + self.device['type'] = get_inxi(m, "Type") + self.device['chassis'] = self.device['type'] + self.device['version'] = get_inxi(m, "v") + else: + self.device['system_uuid'] = get_inxi(m, "uuid") + self.device['sku'] = get_inxi(m, "part-nu") + + def set_components(self): + self.get_mother_board() + self.get_cpu() + self.get_ram() + self.get_graphic() + self.get_display() + self.get_networks() + self.get_sound_card() + self.get_data_storage() + self.get_battery() + + def get_mother_board(self): + machine = get_inxi_key(self.inxi, 'Machine') or [] + mb = {"type": "Motherboard",} + for m in machine: + bios_date = get_inxi(m, "date") + if not bios_date: + continue + mb["manufacturer"] = get_inxi(m, "Mobo") + mb["model"] = get_inxi(m, "model") + mb["serialNumber"] = get_inxi(m, "serial") + mb["version"] = get_inxi(m, "v") + mb["biosDate"] = bios_date + mb["biosVersion"] = self.get_bios_version() + mb["firewire"]: self.get_firmware_num() + mb["pcmcia"]: self.get_pcmcia_num() + mb["serial"]: self.get_serial_num() + mb["usb"]: self.get_usb_num() + + self.get_ram_slots(mb) + + self.components.append(mb) + + def get_ram_slots(self, mb): + memory = get_inxi_key(self.inxi, 'Memory') or [] + for m in memory: + slots = get_inxi(m, "slots") + if not slots: + continue + mb["slots"] = slots + mb["ramSlots"] = get_inxi(m, "modules") + mb["ramMaxSize"] = get_inxi(m, "capacity") + + + def get_cpu(self): + cpu = get_inxi_key(self.inxi, 'CPU') or [] + cp = {"type": "Processor"} + vulnerabilities = [] + for c in cpu: + base = get_inxi(c, "model") + if base: + cp["model"] = get_inxi(c, "model") + cp["arch"] = get_inxi(c, "arch") + cp["bits"] = get_inxi(c, "bits") + cp["gen"] = get_inxi(c, "gen") + cp["family"] = get_inxi(c, "family") + cp["date"] = get_inxi(c, "built") + continue + des = get_inxi(c, "L1") + if des: + cp["L1"] = des + cp["L2"] = get_inxi(c, "L2") + cp["L3"] = get_inxi(c, "L3") + cp["cpus"] = get_inxi(c, "cpus") + cp["cores"] = get_inxi(c, "cores") + cp["threads"] = get_inxi(c, "threads") + continue + bogo = get_inxi(c, "bogomips") + if bogo: + cp["bogomips"] = bogo + cp["base/boost"] = get_inxi(c, "base/boost") + cp["min/max"] = get_inxi(c, "min/max") + cp["ext-clock"] = get_inxi(c, "ext-clock") + cp["volts"] = get_inxi(c, "volts") + continue + ctype = get_inxi(c, "Type") + if ctype: + v = {"Type": ctype} + status = get_inxi(c, "status") + if status: + v["status"] = status + mitigation = get_inxi(c, "mitigation") + if mitigation: + v["mitigation"] = mitigation + vulnerabilities.append(v) + + self.components.append(cp) + + + def get_ram(self): + memory = get_inxi_key(self.inxi, 'Memory') or [] + mem = {"type": "RamModule"} + + for m in memory: + base = get_inxi(m, "System RAM") + if base: + mem["size"] = get_inxi(m, "total") + slot = get_inxi(m, "manufacturer") + if slot: + mem["manufacturer"] = slot + mem["model"] = get_inxi(m, "part-no") + mem["serialNumber"] = get_inxi(m, "serial") + mem["speed"] = get_inxi(m, "speed") + mem["bits"] = get_inxi(m, "data") + mem["interface"] = get_inxi(m, "type") + module = get_inxi(m, "modules") + if module: + mem["modules"] = module + + self.components.append(mem) + + def get_graphic(self): + graphics = get_inxi_key(self.inxi, 'Graphics') or [] + + for c in graphics: + if not get_inxi(c, "Device") or not get_inxi(c, "vendor"): + continue + + self.components.append( + { + "type": "GraphicCard", + "memory": self.get_memory_video(c), + "manufacturer": get_inxi(c, "vendor"), + "model": get_inxi(c, "Device"), + "arch": get_inxi(c, "arch"), + "serialNumber": get_inxi(c, "serial"), + "integrated": True if get_inxi(c, "port") else False + } + ) + + def get_battery(self): + bats = get_inxi_key(self.inxi, 'Battery') or [] + for b in bats: + self.components.append( + { + "type": "Battery", + "model": get_inxi(b, "model"), + "serialNumber": get_inxi(b, "serial"), + "condition": get_inxi(b, "condition"), + "cycles": get_inxi(b, "cycles"), + "volts": get_inxi(b, "volts") + } + ) + + def get_memory_video(self, c): + memory = get_inxi_key(self.inxi, 'Memory') or [] + + for m in memory: + igpu = get_inxi(m, "igpu") + agpu = get_inxi(m, "agpu") + ngpu = get_inxi(m, "ngpu") + gpu = get_inxi(m, "gpu") + if igpu or agpu or gpu or ngpu: + return igpu or agpu or gpu or ngpu + + return self.default + + def get_data_storage(self): + hdds= get_inxi_key(self.inxi, 'Drives') or [] + for d in hdds: + usb = get_inxi(d, "type") + if usb == "USB": + continue + + serial = get_inxi(d, "serial") + if serial: + hd = { + "type": "Storage", + "manufacturer": get_inxi(d, "vendor"), + "model": get_inxi(d, "model"), + "serialNumber": get_inxi(d, "serial"), + "size": get_inxi(d, "size"), + "speed": get_inxi(d, "speed"), + "interface": get_inxi(d, "tech"), + "firmware": get_inxi(d, "fw-rev") + } + rpm = get_inxi(d, "rpm") + if rpm: + hd["rpm"] = rpm + + family = get_inxi(d, "family") + if family: + hd["family"] = family + + sata = get_inxi(d, "sata") + if sata: + hd["sata"] = sata + + continue + + + cycles = get_inxi(d, "cycles") + if cycles: + hd['cycles'] = cycles + hd["health"] = get_inxi(d, "health") + hd["time of used"] = get_inxi(d, "on") + hd["read used"] = get_inxi(d, "read-units") + hd["written used"] = get_inxi(d, "written-units") + + self.components.append(hd) + continue + + hd = {} + + def sanitize(self, action): + return [] + + def get_networks(self): + nets = get_inxi_key(self.inxi, "Network") or [] + networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)] + + for n, iface in networks: + model = get_inxi(n, "Device") + if not model: + continue + + interface = '' + for k in n.keys(): + if "port" in k: + interface = "Integrated" + if "pcie" in k: + interface = "PciExpress" + if get_inxi(n, "type") == "USB": + interface = "USB" + + self.components.append( + { + "type": "NetworkAdapter", + "model": model, + "manufacturer": get_inxi(n, 'vendor'), + "serialNumber": get_inxi(iface, 'mac'), + "speed": get_inxi(n, "speed"), + "interface": interface, + } + ) + + def get_sound_card(self): + audio = get_inxi_key(self.inxi, "Audio") or [] + + for c in audio: + model = get_inxi(c, "Device") + if not model: + continue + + self.components.append( + { + "type": "SoundCard", + "model": model, + "manufacturer": get_inxi(c, 'vendor'), + "serialNumber": get_inxi(c, 'serial'), + } + ) + + def get_display(self): + graphics = get_inxi_key(self.inxi, "Graphics") or [] + for c in graphics: + if not get_inxi(c, "Monitor"): + continue + + self.components.append( + { + "type": "Display", + "model": get_inxi(c, "model"), + "manufacturer": get_inxi(c, "vendor"), + "serialNumber": get_inxi(c, "serial"), + 'size': get_inxi(c, "size"), + 'diagonal': get_inxi(c, "diag"), + 'resolution': get_inxi(c, "res"), + "date": get_inxi(c, "built"), + 'ratio': get_inxi(c, "ratio"), + } + ) + + def get_usb_num(self): + return len( + [ + u + for u in self.dmi.get("Port Connector") + if "USB" in u.get("Port Type", "").upper() + ] + ) + + def get_serial_num(self): + return len( + [ + u + for u in self.dmi.get("Port Connector") + if "SERIAL" in u.get("Port Type", "").upper() + ] + ) + + def get_firmware_num(self): + return len( + [ + u + for u in self.dmi.get("Port Connector") + if "FIRMWARE" in u.get("Port Type", "").upper() + ] + ) + + def get_pcmcia_num(self): + return len( + [ + u + for u in self.dmi.get("Port Connector") + if "PCMCIA" in u.get("Port Type", "").upper() + ] + ) + + def get_bios_version(self): + return self.dmi.get("BIOS")[0].get("BIOS Revision", '1') + + def loads(self, x): + if isinstance(x, str): + try: + return json.loads(x) + except Exception as ss: + logger.warning("%s", ss) + return {} + return x + + def errors(self, txt=None): + if not txt: + return self._errors + + logger.error(txt) + self._errors.append("%s", txt) diff --git a/evidence/old_parse.py b/evidence/old_parse.py new file mode 100644 index 0000000..8a3ac29 --- /dev/null +++ b/evidence/old_parse.py @@ -0,0 +1,22 @@ +import logging + +from evidence.mixin_parse import BuildMix + + +logger = logging.getLogger('django') + + +class Build(BuildMix): + # This parse is for get info from snapshots created with old workbench + # normaly is worbench 11 + + def get_details(self): + device = self.json.get('device', {}) + self.manufacturer = device.get("manufacturer", '') + self.model = device.get("model", '') + self.chassis = device.get("chassis", '') + self.serial_number = device.get("serialNumber", '') + self.sku = device.get("sku", '') + + def _get_components(self): + self.components = self.json.get("components", []) diff --git a/evidence/old_parse_details.py b/evidence/old_parse_details.py new file mode 100644 index 0000000..6e3750e --- /dev/null +++ b/evidence/old_parse_details.py @@ -0,0 +1,13 @@ +import logging + + +logger = logging.getLogger('django') + + +class ParseSnapshot: + def __init__(self, snapshot, default="n/a"): + self.default = default + self.snapshot_json = snapshot + + self.device = snapshot.get("device") + self.components = snapshot.get("components") diff --git a/evidence/parse.py b/evidence/parse.py index 374bd3a..fd04e2d 100644 --- a/evidence/parse.py +++ b/evidence/parse.py @@ -3,11 +3,13 @@ import hashlib import logging from evidence import legacy_parse +from evidence import old_parse +from evidence import normal_parse from evidence.parse_details import ParseSnapshot from evidence.models import Annotation from evidence.xapian import index -from evidence.parse_details import get_inxi_key, get_inxi +from evidence.normal_parse_details import get_inxi_key, get_inxi from django.conf import settings if settings.DPP: @@ -24,8 +26,77 @@ def get_mac(inxi): if get_inxi(n, "port"): return get_inxi(iface, 'mac') - class Build: + def __init__(self, evidence_json, user, check=False): + """ + This Build do the save in xapian as document, in Annotations and do + register in dlt if is configured for that. + + We have 4 cases for parser diferents snapshots than come from workbench. + 1) worbench 11 is old_parse. + 2) legacy is the worbench-script when create a snapshot for devicehub-teal + 3) some snapshots come as a credential. In this case is parsed as normal_parse + 4) normal snapshot from worbench-script is the most basic and is parsed as normal_parse + """ + self.evidence = evidence_json.copy() + self.uuid = self.evidence['uuid'] + self.user = user + + if evidence_json.get("credentialSubject"): + self.build = normal_parse.Build(evidence_json) + elif evidence_json.get("software") != "workbench-script": + self.build = old_parse.Build(evidence_json) + elif evidence_json.get("data",{}).get("lshw"): + self.build = legacy_parse.Build(evidence_json) + else: + self.build = normal_parse.Build(evidence_json) + + if check: + return + + self.index() + self.create_annotations() + if settings.DPP: + self.register_device_dlt() + + def index(self): + snap = json.dumps(self.evidence) + index(self.user.institution, self.uuid, snap) + + def create_annotations(self): + annotation = Annotation.objects.filter( + uuid=self.uuid, + owner=self.user.institution, + type=Annotation.Type.SYSTEM, + ) + + if annotation: + txt = "Warning: Snapshot %s already registered (annotation exists)" + logger.warning(txt, self.uuid) + return + + for k, v in self.build.algorithms.items(): + Annotation.objects.create( + uuid=self.uuid, + owner=self.user.institution, + user=self.user, + type=Annotation.Type.SYSTEM, + key=k, + value=self.sign(v) + ) + + def sign(self, doc): + return hashlib.sha3_256(doc.encode()).hexdigest() + + def register_device_dlt(self): + legacy_dpp = self.build.algorithms.get('legacy_dpp') + chid = self.sign(legacy_dpp) + phid = self.sign(json.dumps(self.build.get_doc())) + register_device_dlt(chid, phid, self.uuid, self.user) + register_passport_dlt(chid, phid, self.uuid, self.user) + + +class Build2: def __init__(self, evidence_json, user, check=False): if evidence_json.get("data",{}).get("lshw"): if evidence_json.get("software") == "workbench-script": diff --git a/evidence/parse_details.py b/evidence/parse_details.py index 7bf95a7..c329c1d 100644 --- a/evidence/parse_details.py +++ b/evidence/parse_details.py @@ -1,408 +1,38 @@ -import json import logging -from dmidecode import DMIParse - -from evidence import legacy_parse_details +from evidence import ( + legacy_parse_details, + normal_parse_details, + old_parse_details +) logger = logging.getLogger('django') -def get_inxi_key(inxi, component): - for n in inxi: - for k, v in n.items(): - if component in k: - return v - - -def get_inxi(n, name): - for k, v in n.items(): - if f"#{name}" in k: - return v - - return "" - - class ParseSnapshot: def __init__(self, snapshot, default="n/a"): - if snapshot.get("data",{}).get("lshw"): - if snapshot.get("software") == "workbench-script": - return legacy_parse_details.ParseSnapshot(snapshot, default=default) + if snapshot.get("credentialSubject"): + self.build = normal_parse_details.ParseSnapshot( + snapshot, + default=default + ) + elif snapshot.get("software") != "workbench-script": + self.build = old_parse_details.ParseSnapshot( + snapshot, + default=default + ) + elif snapshot.get("data",{}).get("lshw"): + self.build = legacy_parse_details.ParseSnapshot( + snapshot, + default=default + ) + else: + self.build = normal_parse_details.ParseSnapshot( + snapshot, + default=default + ) - self.default = default - self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}") - self.smart_raw = snapshot.get("data", {}).get("smartctl", []) - self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or "" - for ev in snapshot.get("evidence", []): - if "dmidecode" == ev.get("operation"): - self.dmidecode_raw = ev["output"] - if "inxi" == ev.get("operation"): - self.inxi_raw = ev["output"] - if "smartctl" == ev.get("operation"): - self.smart_raw = ev["output"] - data = snapshot - if snapshot.get("credentialSubject"): - data = snapshot["credentialSubject"] - - self.device = {"actions": []} - self.components = [] - - self.dmi = DMIParse(self.dmidecode_raw) - self.smart = self.loads(self.smart_raw) - self.inxi = self.loads(self.inxi_raw) - - self.set_computer() - self.set_components() - self.snapshot_json = { - "type": "Snapshot", - "device": self.device, - "software": data["software"], - "components": self.components, - "uuid": data['uuid'], - "endTime": data["timestamp"], - "elapsed": 1, - } - - def set_computer(self): - machine = get_inxi_key(self.inxi, 'Machine') or [] - for m in machine: - system = get_inxi(m, "System") - if system: - self.device['manufacturer'] = system - self.device['model'] = get_inxi(m, "product") - self.device['serialNumber'] = get_inxi(m, "serial") - self.device['type'] = get_inxi(m, "Type") - self.device['chassis'] = self.device['type'] - self.device['version'] = get_inxi(m, "v") - else: - self.device['system_uuid'] = get_inxi(m, "uuid") - self.device['sku'] = get_inxi(m, "part-nu") - - def set_components(self): - self.get_mother_board() - self.get_cpu() - self.get_ram() - self.get_graphic() - self.get_display() - self.get_networks() - self.get_sound_card() - self.get_data_storage() - self.get_battery() - - def get_mother_board(self): - machine = get_inxi_key(self.inxi, 'Machine') or [] - mb = {"type": "Motherboard",} - for m in machine: - bios_date = get_inxi(m, "date") - if not bios_date: - continue - mb["manufacturer"] = get_inxi(m, "Mobo") - mb["model"] = get_inxi(m, "model") - mb["serialNumber"] = get_inxi(m, "serial") - mb["version"] = get_inxi(m, "v") - mb["biosDate"] = bios_date - mb["biosVersion"] = self.get_bios_version() - mb["firewire"]: self.get_firmware_num() - mb["pcmcia"]: self.get_pcmcia_num() - mb["serial"]: self.get_serial_num() - mb["usb"]: self.get_usb_num() - - self.get_ram_slots(mb) - - self.components.append(mb) - - def get_ram_slots(self, mb): - memory = get_inxi_key(self.inxi, 'Memory') or [] - for m in memory: - slots = get_inxi(m, "slots") - if not slots: - continue - mb["slots"] = slots - mb["ramSlots"] = get_inxi(m, "modules") - mb["ramMaxSize"] = get_inxi(m, "capacity") - - - def get_cpu(self): - cpu = get_inxi_key(self.inxi, 'CPU') or [] - cp = {"type": "Processor"} - vulnerabilities = [] - for c in cpu: - base = get_inxi(c, "model") - if base: - cp["model"] = get_inxi(c, "model") - cp["arch"] = get_inxi(c, "arch") - cp["bits"] = get_inxi(c, "bits") - cp["gen"] = get_inxi(c, "gen") - cp["family"] = get_inxi(c, "family") - cp["date"] = get_inxi(c, "built") - continue - des = get_inxi(c, "L1") - if des: - cp["L1"] = des - cp["L2"] = get_inxi(c, "L2") - cp["L3"] = get_inxi(c, "L3") - cp["cpus"] = get_inxi(c, "cpus") - cp["cores"] = get_inxi(c, "cores") - cp["threads"] = get_inxi(c, "threads") - continue - bogo = get_inxi(c, "bogomips") - if bogo: - cp["bogomips"] = bogo - cp["base/boost"] = get_inxi(c, "base/boost") - cp["min/max"] = get_inxi(c, "min/max") - cp["ext-clock"] = get_inxi(c, "ext-clock") - cp["volts"] = get_inxi(c, "volts") - continue - ctype = get_inxi(c, "Type") - if ctype: - v = {"Type": ctype} - status = get_inxi(c, "status") - if status: - v["status"] = status - mitigation = get_inxi(c, "mitigation") - if mitigation: - v["mitigation"] = mitigation - vulnerabilities.append(v) - - self.components.append(cp) - - - def get_ram(self): - memory = get_inxi_key(self.inxi, 'Memory') or [] - mem = {"type": "RamModule"} - - for m in memory: - base = get_inxi(m, "System RAM") - if base: - mem["size"] = get_inxi(m, "total") - slot = get_inxi(m, "manufacturer") - if slot: - mem["manufacturer"] = slot - mem["model"] = get_inxi(m, "part-no") - mem["serialNumber"] = get_inxi(m, "serial") - mem["speed"] = get_inxi(m, "speed") - mem["bits"] = get_inxi(m, "data") - mem["interface"] = get_inxi(m, "type") - module = get_inxi(m, "modules") - if module: - mem["modules"] = module - - self.components.append(mem) - - def get_graphic(self): - graphics = get_inxi_key(self.inxi, 'Graphics') or [] - - for c in graphics: - if not get_inxi(c, "Device") or not get_inxi(c, "vendor"): - continue - - self.components.append( - { - "type": "GraphicCard", - "memory": self.get_memory_video(c), - "manufacturer": get_inxi(c, "vendor"), - "model": get_inxi(c, "Device"), - "arch": get_inxi(c, "arch"), - "serialNumber": get_inxi(c, "serial"), - "integrated": True if get_inxi(c, "port") else False - } - ) - - def get_battery(self): - bats = get_inxi_key(self.inxi, 'Battery') or [] - for b in bats: - self.components.append( - { - "type": "Battery", - "model": get_inxi(b, "model"), - "serialNumber": get_inxi(b, "serial"), - "condition": get_inxi(b, "condition"), - "cycles": get_inxi(b, "cycles"), - "volts": get_inxi(b, "volts") - } - ) - - def get_memory_video(self, c): - memory = get_inxi_key(self.inxi, 'Memory') or [] - - for m in memory: - igpu = get_inxi(m, "igpu") - agpu = get_inxi(m, "agpu") - ngpu = get_inxi(m, "ngpu") - gpu = get_inxi(m, "gpu") - if igpu or agpu or gpu or ngpu: - return igpu or agpu or gpu or ngpu - - return self.default - - def get_data_storage(self): - hdds= get_inxi_key(self.inxi, 'Drives') or [] - for d in hdds: - usb = get_inxi(d, "type") - if usb == "USB": - continue - - serial = get_inxi(d, "serial") - if serial: - hd = { - "type": "Storage", - "manufacturer": get_inxi(d, "vendor"), - "model": get_inxi(d, "model"), - "serialNumber": get_inxi(d, "serial"), - "size": get_inxi(d, "size"), - "speed": get_inxi(d, "speed"), - "interface": get_inxi(d, "tech"), - "firmware": get_inxi(d, "fw-rev") - } - rpm = get_inxi(d, "rpm") - if rpm: - hd["rpm"] = rpm - - family = get_inxi(d, "family") - if family: - hd["family"] = family - - sata = get_inxi(d, "sata") - if sata: - hd["sata"] = sata - - continue - - - cycles = get_inxi(d, "cycles") - if cycles: - hd['cycles'] = cycles - hd["health"] = get_inxi(d, "health") - hd["time of used"] = get_inxi(d, "on") - hd["read used"] = get_inxi(d, "read-units") - hd["written used"] = get_inxi(d, "written-units") - - self.components.append(hd) - continue - - hd = {} - - def sanitize(self, action): - return [] - - def get_networks(self): - nets = get_inxi_key(self.inxi, "Network") or [] - networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)] - - for n, iface in networks: - model = get_inxi(n, "Device") - if not model: - continue - - interface = '' - for k in n.keys(): - if "port" in k: - interface = "Integrated" - if "pcie" in k: - interface = "PciExpress" - if get_inxi(n, "type") == "USB": - interface = "USB" - - self.components.append( - { - "type": "NetworkAdapter", - "model": model, - "manufacturer": get_inxi(n, 'vendor'), - "serialNumber": get_inxi(iface, 'mac'), - "speed": get_inxi(n, "speed"), - "interface": interface, - } - ) - - def get_sound_card(self): - audio = get_inxi_key(self.inxi, "Audio") or [] - - for c in audio: - model = get_inxi(c, "Device") - if not model: - continue - - self.components.append( - { - "type": "SoundCard", - "model": model, - "manufacturer": get_inxi(c, 'vendor'), - "serialNumber": get_inxi(c, 'serial'), - } - ) - - def get_display(self): - graphics = get_inxi_key(self.inxi, "Graphics") or [] - for c in graphics: - if not get_inxi(c, "Monitor"): - continue - - self.components.append( - { - "type": "Display", - "model": get_inxi(c, "model"), - "manufacturer": get_inxi(c, "vendor"), - "serialNumber": get_inxi(c, "serial"), - 'size': get_inxi(c, "size"), - 'diagonal': get_inxi(c, "diag"), - 'resolution': get_inxi(c, "res"), - "date": get_inxi(c, "built"), - 'ratio': get_inxi(c, "ratio"), - } - ) - - def get_usb_num(self): - return len( - [ - u - for u in self.dmi.get("Port Connector") - if "USB" in u.get("Port Type", "").upper() - ] - ) - - def get_serial_num(self): - return len( - [ - u - for u in self.dmi.get("Port Connector") - if "SERIAL" in u.get("Port Type", "").upper() - ] - ) - - def get_firmware_num(self): - return len( - [ - u - for u in self.dmi.get("Port Connector") - if "FIRMWARE" in u.get("Port Type", "").upper() - ] - ) - - def get_pcmcia_num(self): - return len( - [ - u - for u in self.dmi.get("Port Connector") - if "PCMCIA" in u.get("Port Type", "").upper() - ] - ) - - def get_bios_version(self): - return self.dmi.get("BIOS")[0].get("BIOS Revision", '1') - - def loads(self, x): - if isinstance(x, str): - try: - return json.loads(x) - except Exception as ss: - logger.warning("%s", ss) - return {} - return x - - def errors(self, txt=None): - if not txt: - return self._errors - - logger.error(txt) - self._errors.append("%s", txt) + self.default = default + self.device = self.build.snapshot_json.get("device") + self.components = self.build.snapshot_json.get("components") diff --git a/utils/constants.py b/utils/constants.py index 75c5dc5..3104f59 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -13,7 +13,7 @@ HID_ALGO1 = [ "manufacturer", "model", "chassis", - "serialNumber", + "serial_number", "sku" ] @@ -21,7 +21,7 @@ LEGACY_DPP = [ "manufacturer", "model", "chassis", - "serialNumber", + "serial_number", "sku", "type", "version"