devicehub-django/evidence/parse.py

246 lines
7.7 KiB
Python
Raw Normal View History

2024-07-11 15:40:45 +00:00
import json
import hashlib
2024-10-25 15:36:13 +00:00
import logging
2024-07-01 10:17:23 +00:00
2024-09-18 16:01:46 +00:00
from dmidecode import DMIParse
2024-10-21 16:39:31 +00:00
from evidence.models import Annotation
from evidence.xapian import index
2024-12-19 11:27:47 +00:00
from evidence.parse_details import ParseSnapshot, get_inxi_key, get_inxi, get_lshw_child
from utils.constants import CHASSIS_DH
2024-12-16 17:12:02 +00:00
from django.conf import settings
2024-12-05 09:54:41 +00:00
if settings.DPP:
from dpp.api_dlt import register_device_dlt, register_passport_dlt
2024-10-25 15:36:13 +00:00
logger = logging.getLogger('django')
2024-11-11 06:41:08 +00:00
2024-11-09 19:52:26 +00:00
def get_mac(inxi):
nets = get_inxi_key(inxi, "Network")
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
2024-11-09 19:52:26 +00:00
for n, iface in networks:
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
2024-07-11 15:40:45 +00:00
2024-09-20 16:05:29 +00:00
2024-12-19 11:27:47 +00:00
def get_legacy_mac(lshw):
try:
if type(lshw) is dict:
hw = lshw
else:
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
return ""
nets = []
get_lshw_child(hw, nets, 'network')
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
if nets_sorted:
mac = nets_sorted[0]['serial']
logger.debug("The snapshot has the following MAC: %s" , mac)
return mac
2024-07-11 15:40:45 +00:00
class Build:
2024-07-31 11:28:46 +00:00
def __init__(self, evidence_json, user, check=False):
2024-12-03 15:37:56 +00:00
self.evidence = evidence_json.copy()
self.json = evidence_json.copy()
2024-12-05 18:23:53 +00:00
2024-12-03 15:37:56 +00:00
if evidence_json.get("credentialSubject"):
self.json.update(evidence_json["credentialSubject"])
if evidence_json.get("evidence"):
self.json["data"] = {}
for ev in evidence_json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
2024-07-18 15:21:22 +00:00
self.uuid = self.json['uuid']
2024-07-11 15:40:45 +00:00
self.user = user
self.hid = None
self.chid = None
self.phid = self.get_signature(self.json)
2024-07-31 11:28:46 +00:00
self.generate_chids()
if check:
return
2024-07-11 15:40:45 +00:00
2024-07-15 14:23:14 +00:00
self.index()
2024-07-18 15:21:22 +00:00
self.create_annotations()
2024-12-05 09:54:41 +00:00
if settings.DPP:
self.register_device_dlt()
2024-07-11 15:40:45 +00:00
2024-07-15 14:23:14 +00:00
def index(self):
2024-12-03 15:37:56 +00:00
snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap)
2024-07-15 14:23:14 +00:00
2024-07-31 11:28:46 +00:00
def generate_chids(self):
self.algorithms = {
'hidalgo1': self.get_hid_14(),
'legacy_dpp': self.get_chid_dpp(),
2024-07-31 11:28:46 +00:00
}
2024-07-15 14:23:14 +00:00
def get_hid_14(self):
if self.json.get("software") == "workbench-script":
2024-12-19 11:27:47 +00:00
hid = self.get_hid()
2024-09-18 16:01:46 +00:00
else:
device = self.json['device']
manufacturer = device.get("manufacturer", '')
model = device.get("model", '')
chassis = device.get("chassis", '')
serial_number = device.get("serialNumber", '')
sku = device.get("sku", '')
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
2024-10-25 15:36:13 +00:00
self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
return self.chid
2024-07-15 14:23:14 +00:00
def get_chid_dpp(self):
if self.json.get("software") == "workbench-script":
2024-12-16 17:12:02 +00:00
device = ParseSnapshot(self.json).device
else:
device = self.json['device']
2024-12-16 17:12:02 +00:00
hid = self.get_id_hw_dpp(device)
self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest()
return self.chid
def get_id_hw_dpp(self, d):
manufacturer = d.get("manufacturer", '')
model = d.get("model", '')
chassis = d.get("chassis", '')
serial_number = d.get("serialNumber", '')
sku = d.get("sku", '')
typ = d.get("type", '')
version = d.get("version", '')
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
def get_phid(self):
if self.json.get("software") == "workbench-script":
data = ParseSnapshot(self.json)
self.device = data.device
self.components = data.components
else:
self.device = self.json.get("device")
2024-11-28 14:25:57 +00:00
self.components = self.json.get("components", [])
2024-11-28 16:13:21 +00:00
self.device.pop("actions", None)
2024-11-28 16:01:44 +00:00
for c in self.components:
c.pop("actions", None)
device = self.get_id_hw_dpp(self.device)
2024-11-28 14:47:03 +00:00
components = sorted(self.components, key=lambda x: x.get("type"))
doc = [("computer", device)]
for c in components:
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
return doc
2024-07-18 15:21:22 +00:00
def create_annotations(self):
2024-10-25 15:36:13 +00:00
annotation = Annotation.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
logger.warning(txt, self.uuid)
2024-10-25 15:36:13 +00:00
return
2024-07-18 15:21:22 +00:00
2024-07-31 11:28:46 +00:00
for k, v in self.algorithms.items():
2024-07-18 15:21:22 +00:00
Annotation.objects.create(
uuid=self.uuid,
2024-09-18 16:01:46 +00:00
owner=self.user.institution,
user=self.user,
2024-07-18 15:21:22 +00:00
type=Annotation.Type.SYSTEM,
key=k,
value=v
)
2024-09-18 16:01:46 +00:00
2024-12-19 11:27:47 +00:00
def get_hid(self):
if not self.json.get("data", {}).get("inxi"):
return self.get_legacy_hid()
try:
2024-12-05 18:23:53 +00:00
self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
manufacturer = system
model = get_inxi(m, "product")
serial_number = get_inxi(m, "serial")
chassis = get_inxi(m, "Type")
else:
sku = get_inxi(m, "part-nu")
mac = get_mac(self.inxi) or ""
2024-09-20 16:22:27 +00:00
if not mac:
2024-10-31 09:40:53 +00:00
txt = "Could not retrieve MAC address in snapshot %s"
2024-12-19 11:27:47 +00:00
logger.warning(txt, self.json.get('uuid', "n/a"))
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
def get_version(self):
return self.dmi.get("System")[0].get("Verson", '_virtual')
def get_legacy_hid(self):
dmidecode_raw = self.json.get("data", {}).get("dmidecode")
self.dmi = DMIParse(dmidecode_raw)
manufacturer = self.dmi.manufacturer().strip()
model = self.dmi.model().strip()
chassis = self.get_chassis_dh()
serial_number = self.dmi.serial_number()
sku = self.get_sku()
if not self.json.get("data", {}).get('lshw'):
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
2024-09-18 16:01:46 +00:00
2024-12-19 11:27:47 +00:00
lshw = self.json.get("data", {}).get("lshw")
# mac = get_mac2(hwinfo_raw) or ""
mac = get_legacy_mac(lshw) or ""
if not mac:
txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, self.json.get('uuid', "n/a"))
2024-09-20 16:05:29 +00:00
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
def get_signature(self, doc):
return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest()
def register_device_dlt(self):
chid = self.algorithms.get('legacy_dpp')
phid = self.get_signature(self.get_phid())
register_device_dlt(chid, phid, self.uuid, self.user)
register_passport_dlt(chid, phid, self.uuid, self.user)