fix parsing with credentials

This commit is contained in:
Cayo Puigdefabregas 2024-12-05 19:23:53 +01:00
parent 7fd42db3e4
commit 7de6d69a6c
4 changed files with 60 additions and 30 deletions

View file

@ -84,8 +84,11 @@ class SearchView(InventaryMixin):
return devices, count return devices, count
def get_annotations(self, xp): def get_annotations(self, xp):
snap = xp.document.get_data() snap = json.loads(xp.document.get_data())
uuid = json.loads(snap).get('uuid') if snap.get("credentialSubject"):
uuid = snap["credentialSubject"]["uuid"]
else:
uuid = snap["uuid"]
return Device.get_annotation_from_uuid(uuid, self.request.user.institution) return Device.get_annotation_from_uuid(uuid, self.request.user.institution)
def search_hids(self, query, offset, limit): def search_hids(self, query, offset, limit):

View file

@ -71,25 +71,37 @@ class Evidence:
for xa in matches: for xa in matches:
self.doc = json.loads(xa.document.get_data()) self.doc = json.loads(xa.document.get_data())
if not self.is_legacy(): if self.is_legacy():
dmidecode_raw = self.doc["data"]["dmidecode"] return
inxi_raw = self.doc["data"]["inxi"]
self.dmi = DMIParse(dmidecode_raw)
try:
self.inxi = json.loads(inxi_raw)
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
self.device_manufacturer = system
self.device_model = get_inxi(m, "product")
self.device_serial_number = get_inxi(m, "serial")
self.device_chassis = get_inxi(m, "Type")
self.device_version = get_inxi(m, "v")
if self.doc.get("credentialSubject"):
for ev in self.doc["evidence"]:
if "dmidecode" == ev.get("operation"):
dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi = ev["output"]
else:
dmidecode_raw = self.doc["data"]["dmidecode"]
try:
self.inxi = json.loads(self.doc["data"]["inxi"])
except Exception: except Exception:
return return
self.dmi = DMIParse(dmidecode_raw)
try:
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
self.device_manufacturer = system
self.device_model = get_inxi(m, "product")
self.device_serial_number = get_inxi(m, "serial")
self.device_chassis = get_inxi(m, "Type")
self.device_version = get_inxi(m, "v")
except Exception:
return
def get_time(self): def get_time(self):
if not self.doc: if not self.doc:
self.get_doc() self.get_doc()
@ -116,7 +128,7 @@ class Evidence:
if self.inxi: if self.inxi:
return self.device_manufacturer return self.device_manufacturer
return self.dmi.manufacturer().strip() return self.dmi.manufacturer().strip()
def get_model(self): def get_model(self):
@ -131,13 +143,13 @@ class Evidence:
if self.inxi: if self.inxi:
return self.device_model return self.device_model
return self.dmi.model().strip() return self.dmi.model().strip()
def get_chassis(self): def get_chassis(self):
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
if self.inxi: if self.inxi:
return self.device_chassis return self.device_chassis
@ -152,7 +164,7 @@ class Evidence:
def get_serial_number(self): def get_serial_number(self):
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['serialNumber'] return self.doc['device']['serialNumber']
if self.inxi: if self.inxi:
return self.device_serial_number return self.device_serial_number
@ -178,6 +190,9 @@ class Evidence:
self.components = snapshot['components'] self.components = snapshot['components']
def is_legacy(self): def is_legacy(self):
if self.doc.get("credentialSubject"):
return False
return self.doc.get("software") != "workbench-script" return self.doc.get("software") != "workbench-script"
def is_web_snapshot(self): def is_web_snapshot(self):

View file

@ -25,6 +25,7 @@ class Build:
def __init__(self, evidence_json, user, check=False): def __init__(self, evidence_json, user, check=False):
self.evidence = evidence_json.copy() self.evidence = evidence_json.copy()
self.json = evidence_json.copy() self.json = evidence_json.copy()
if evidence_json.get("credentialSubject"): if evidence_json.get("credentialSubject"):
self.json.update(evidence_json["credentialSubject"]) self.json.update(evidence_json["credentialSubject"])
if evidence_json.get("evidence"): if evidence_json.get("evidence"):
@ -94,7 +95,9 @@ class Build:
def get_hid(self, snapshot): def get_hid(self, snapshot):
try: try:
self.inxi = json.loads(self.json["data"]["inxi"]) self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception: except Exception:
logger.error("No inxi in snapshot %s", self.uuid) logger.error("No inxi in snapshot %s", self.uuid)
return "" return ""

View file

@ -30,9 +30,20 @@ def get_inxi(n, name):
class ParseSnapshot: class ParseSnapshot:
def __init__(self, snapshot, default="n/a"): def __init__(self, snapshot, default="n/a"):
self.default = default self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}") self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", []) self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.inxi_raw = snapshot["data"].get("inxi", "") or "" self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
for ev in snapshot.get("evidence", []):
if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
data = snapshot["credentialSubject"]
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
@ -45,11 +56,10 @@ class ParseSnapshot:
self.snapshot_json = { self.snapshot_json = {
"type": "Snapshot", "type": "Snapshot",
"device": self.device, "device": self.device,
"software": snapshot["software"], "software": data["software"],
"components": self.components, "components": self.components,
"uuid": snapshot['uuid'], "uuid": data['uuid'],
"version": snapshot['version'], "endTime": data["timestamp"],
"endTime": snapshot["timestamp"],
"elapsed": 1, "elapsed": 1,
} }
@ -267,7 +277,6 @@ class ParseSnapshot:
hd["read used"] = get_inxi(d, "read-units") hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units") hd["written used"] = get_inxi(d, "written-units")
# import pdb; pdb.set_trace()
self.components.append(hd) self.components.append(hd)
continue continue