diff --git a/.env.example b/.env.example index d8fed15..6fb20c9 100644 --- a/.env.example +++ b/.env.example @@ -2,7 +2,7 @@ DOMAIN=localhost DEMO=true # note that with DEBUG=true, logs are more verbose (include tracebacks) DEBUG=true -ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1, +ALLOWED_HOSTS=${DOMAIN},${DOMAIN}:8000,127.0.0.1,127.0.0.1:8000 DPP=false STATIC_ROOT=/tmp/static/ diff --git a/api/views.py b/api/views.py index 0de3e5a..87177fb 100644 --- a/api/views.py +++ b/api/views.py @@ -85,17 +85,21 @@ class NewSnapshotView(ApiMixing): # except Exception: # return JsonResponse({'error': 'Invalid Snapshot'}, status=400) - if not data.get("uuid"): + ev_uuid = data.get("uuid") + if data.get("credentialSubject"): + ev_uuid = data["credentialSubject"].get("uuid") + + if not ev_uuid: txt = "error: the snapshot not have uuid" logger.error("%s", txt) return JsonResponse({'status': txt}, status=500) exist_annotation = Annotation.objects.filter( - uuid=data['uuid'] + uuid=ev_uuid ).first() if exist_annotation: - txt = "error: the snapshot {} exist".format(data['uuid']) + txt = "error: the snapshot {} exist".format(ev_uuid) logger.warning("%s", txt) return JsonResponse({'status': txt}, status=500) @@ -105,14 +109,14 @@ class NewSnapshotView(ApiMixing): except Exception as err: if settings.DEBUG: logger.exception("%s", err) - snapshot_id = data.get("uuid", "") + snapshot_id = ev_uuid txt = "It is not possible to parse snapshot: %s." logger.error(txt, snapshot_id) text = "fail: It is not possible to parse snapshot" return JsonResponse({'status': text}, status=500) annotation = Annotation.objects.filter( - uuid=data['uuid'], + uuid=ev_uuid, type=Annotation.Type.SYSTEM, # TODO this is hardcoded, it should select the user preferred algorithm key="hidalgo1", @@ -121,7 +125,7 @@ class NewSnapshotView(ApiMixing): if not annotation: - logger.error("Error: No annotation for uuid: %s", data["uuid"]) + logger.error("Error: No annotation for uuid: %s", ev_uuid) return JsonResponse({'status': 'fail'}, status=500) url_args = reverse_lazy("device:details", args=(annotation.value,)) diff --git a/dashboard/templates/unassigned_devices.html b/dashboard/templates/unassigned_devices.html index 369edd2..d54e450 100644 --- a/dashboard/templates/unassigned_devices.html +++ b/dashboard/templates/unassigned_devices.html @@ -69,7 +69,11 @@ {{ dev.manufacturer }} + {% if dev.version %} + {{dev.version}} {{ dev.model }} + {% else %} {{ dev.model }} + {% endif %} diff --git a/dashboard/views.py b/dashboard/views.py index 8d91732..2ec12a0 100644 --- a/dashboard/views.py +++ b/dashboard/views.py @@ -84,8 +84,11 @@ class SearchView(InventaryMixin): return devices, count def get_annotations(self, xp): - snap = xp.document.get_data() - uuid = json.loads(snap).get('uuid') + snap = json.loads(xp.document.get_data()) + if snap.get("credentialSubject"): + uuid = snap["credentialSubject"]["uuid"] + else: + uuid = snap["uuid"] return Device.get_annotation_from_uuid(uuid, self.request.user.institution) def search_hids(self, query, offset, limit): diff --git a/device/models.py b/device/models.py index cd77389..7ca2a1f 100644 --- a/device/models.py +++ b/device/models.py @@ -305,6 +305,12 @@ class Device: self.get_last_evidence() return self.last_evidence.get_model() + @property + def version(self): + if not self.last_evidence: + self.get_last_evidence() + return self.last_evidence.get_version() + @property def components(self): self.get_last_evidence() diff --git a/device/templates/details.html b/device/templates/details.html index 99346c9..af6ce7c 100644 --- a/device/templates/details.html +++ b/device/templates/details.html @@ -58,34 +58,41 @@ {% endif %} -
+
Type
{{ object.type }}
{% if object.is_websnapshot and object.last_user_evidence %} {% for k, v in object.last_user_evidence %} -
+
{{ k }}
{{ v|default:'' }}
{% endfor %} {% else %} -
+
{% trans 'Manufacturer' %}
{{ object.manufacturer|default:'' }}
-
+
{% trans 'Model' %}
{{ object.model|default:'' }}
-
+
+
+ {% trans 'Version' %} +
+
{{ object.version|default:'' }}
+
+ +
{% trans 'Serial Number' %}
diff --git a/docker-reset.sh b/docker-reset.sh index 825e91c..8bbc229 100755 --- a/docker-reset.sh +++ b/docker-reset.sh @@ -21,6 +21,8 @@ main() { fi # remove old database rm -vfr ./db/* + # deactivate configured flag + rm -vfr ./already_configured docker compose down -v docker compose build docker compose up ${detach_arg:-} diff --git a/docker/devicehub-django.Dockerfile b/docker/devicehub-django.Dockerfile index 6f5ec95..0f0d408 100644 --- a/docker/devicehub-django.Dockerfile +++ b/docker/devicehub-django.Dockerfile @@ -28,9 +28,6 @@ compile = no no-cache-dir = True END -# upgrade pip, which might fail on lxc, then remove the "corrupted file" -RUN python -m pip install --upgrade pip || (rm -rf /usr/local/lib/python3.11/site-packages/pip-*.dist-info && python -m pip install --upgrade pip) - COPY ./requirements.txt /opt/devicehub-django RUN pip install -r requirements.txt # TODO hardcoded, is ignored in requirements.txt diff --git a/docker/devicehub-django.entrypoint.sh b/docker/devicehub-django.entrypoint.sh index d6d7bb2..1b9f548 100644 --- a/docker/devicehub-django.entrypoint.sh +++ b/docker/devicehub-django.entrypoint.sh @@ -50,13 +50,12 @@ API_RESOLVER=${API_RESOLVER} ID_FEDERATED=${ID_FEDERATED} END )" - fi - # generate config using env vars from docker # TODO rethink if this is needed because now this is django, not flask cat > .env < 1: - mm = model.split(" ") - model = mm[-1] - manufacturer = " ".join(mm[:-1]) - self.components.append( - { - "actions": self.sanitize(sm), - "type": self.get_data_storage_type(sm), - "model": model, - "manufacturer": manufacturer, - "serialNumber": sm.get('serial_number'), - "size": self.get_data_storage_size(sm), - "variant": sm.get("firmware_version"), - "interface": self.get_data_storage_interface(sm), - "hours": hours, + serial = get_inxi(d, "serial") + if serial: + hd = { + "type": "Storage", + "manufacturer": get_inxi(d, "vendor"), + "model": get_inxi(d, "model"), + "serialNumber": get_inxi(d, "serial"), + "size": get_inxi(d, "size"), + "speed": get_inxi(d, "speed"), + "interface": get_inxi(d, "tech"), + "firmware": get_inxi(d, "fw-rev") } - ) + rpm = get_inxi(d, "rpm") + if rpm: + hd["rpm"] = rpm + + family = get_inxi(d, "family") + if family: + hd["family"] = family + + sata = get_inxi(d, "sata") + if sata: + hd["sata"] = sata + + continue + + + cycles = get_inxi(d, "cycles") + if cycles: + hd['cycles'] = cycles + hd["health"] = get_inxi(d, "health") + hd["time of used"] = get_inxi(d, "on") + hd["read used"] = get_inxi(d, "read-units") + hd["written used"] = get_inxi(d, "written-units") + + self.components.append(hd) + continue + + hd = {} def sanitize(self, action): return [] - def get_bogomips(self): - if not self.hwinfo: - return self.default - - bogomips = 0 - for row in self.hwinfo: - for cel in row: - if 'BogoMips' in cel: - try: - bogomips += float(cel.split(":")[-1]) - except: - pass - return bogomips - def get_networks(self): - networks = [] - get_lshw_child(self.lshw, networks, 'network') - - for c in networks: - capacity = c.get('capacity') - wireless = bool(c.get('configuration', {}).get('wireless', False)) + nets = get_inxi_key(self.inxi, "Network") or [] + networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)] + + for n, iface in networks: + model = get_inxi(n, "Device") + if not model: + continue + + interface = '' + for k in n.keys(): + if "port" in k: + interface = "Integrated" + if "pcie" in k: + interface = "PciExpress" + if get_inxi(n, "type") == "USB": + interface = "USB" + self.components.append( { - "actions": [], "type": "NetworkAdapter", - "model": c.get('product'), - "manufacturer": c.get('vendor'), - "serialNumber": c.get('serial'), - "speed": capacity, - "variant": c.get('version', 1), - "wireless": wireless or False, - "integrated": "PCI:0000:00" in c.get("businfo", ""), + "model": model, + "manufacturer": get_inxi(n, 'vendor'), + "serialNumber": get_inxi(iface, 'mac'), + "speed": get_inxi(n, "speed"), + "interface": interface, } ) def get_sound_card(self): - multimedias = [] - get_lshw_child(self.lshw, multimedias, 'multimedia') - - for c in multimedias: + audio = get_inxi_key(self.inxi, "Audio") or [] + + for c in audio: + model = get_inxi(c, "Device") + if not model: + continue + self.components.append( { - "actions": [], "type": "SoundCard", - "model": c.get('product'), - "manufacturer": c.get('vendor'), - "serialNumber": c.get('serial'), + "model": model, + "manufacturer": get_inxi(c, 'vendor'), + "serialNumber": get_inxi(c, 'serial'), } ) - def get_display(self): # noqa: C901 - TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED' - - for c in self.monitors: - resolution_width, resolution_height = (None,) * 2 - refresh, serial, model, manufacturer, size = (None,) * 5 - year, week, production_date = (None,) * 3 - - for x in c: - if "Vendor: " in x: - manufacturer = x.split('Vendor: ')[-1].strip() - if "Model: " in x: - model = x.split('Model: ')[-1].strip() - if "Serial ID: " in x: - serial = x.split('Serial ID: ')[-1].strip() - if " Resolution: " in x: - rs = x.split(' Resolution: ')[-1].strip() - if 'x' in rs: - resolution_width, resolution_height = [ - int(r) for r in rs.split('x') - ] - if "Frequencies: " in x: - try: - refresh = int(float(x.split(',')[-1].strip()[:-3])) - except Exception: - pass - if 'Year of Manufacture' in x: - year = x.split(': ')[1] - - if 'Week of Manufacture' in x: - week = x.split(': ')[1] - - if "Size: " in x: - size = self.get_size_monitor(x) - technology = next((t for t in TECHS if t in c[0]), None) - - if year and week: - d = '{} {} 0'.format(year, week) - production_date = datetime.strptime(d, '%Y %W %w').isoformat() + def get_display(self): + graphics = get_inxi_key(self.inxi, "Graphics") or [] + for c in graphics: + if not get_inxi(c, "Monitor"): + continue self.components.append( { - "actions": [], "type": "Display", - "model": model, - "manufacturer": manufacturer, - "serialNumber": serial, - 'size': size, - 'resolutionWidth': resolution_width, - 'resolutionHeight': resolution_height, - "productionDate": production_date, - 'technology': technology, - 'refreshRate': refresh, + "model": get_inxi(c, "model"), + "manufacturer": get_inxi(c, "vendor"), + "serialNumber": get_inxi(c, "serial"), + 'size': get_inxi(c, "size"), + 'diagonal': get_inxi(c, "diag"), + 'resolution': get_inxi(c, "res"), + "date": get_inxi(c, "built"), + 'ratio': get_inxi(c, "ratio"), } ) - def get_hwinfo_monitors(self): - for c in self.hwinfo: - monitor = None - external = None - for x in c: - if 'Hardware Class: monitor' in x: - monitor = c - if 'Driver Info' in x: - external = c - - if monitor and not external: - self.monitors.append(c) - - def get_size_monitor(self, x): - i = 1 / 25.4 - t = x.split('Size: ')[-1].strip() - tt = t.split('mm') - if not tt: - return 0 - sizes = tt[0].strip() - if 'x' not in sizes: - return 0 - w, h = [int(x) for x in sizes.split('x')] - return "{:.2f}".format(np.sqrt(w**2 + h**2) * i) - - def get_cpu_address(self, cpu): - default = 64 - for ch in self.lshw.get('children', []): - for c in ch.get('children', []): - if c['class'] == 'processor': - return c.get('width', default) - return default - def get_usb_num(self): return len( [ @@ -364,133 +387,13 @@ class ParseSnapshot: ] ) - def get_bios_date(self): - return self.dmi.get("BIOS")[0].get("Release Date", self.default) - - def get_firmware(self): - return self.dmi.get("BIOS")[0].get("Firmware Revision", '1') - - def get_max_ram_size(self): - size = 0 - for slot in self.dmi.get("Physical Memory Array"): - capacity = slot.get("Maximum Capacity", '0').split(" ")[0] - size += int(capacity) - - return size - - def get_ram_slots(self): - slots = 0 - for x in self.dmi.get("Physical Memory Array"): - slots += int(x.get("Number Of Devices", 0)) - return slots - - def get_ram_size(self, ram): - memory = ram.get("Size", "0") - return memory - - def get_ram_speed(self, ram): - size = ram.get("Speed", "0") - return size - - def get_cpu_speed(self, cpu): - speed = cpu.get('Max Speed', "0") - return speed - - def get_sku(self): - return self.dmi.get("System")[0].get("SKU Number", self.default).strip() - - def get_version(self): - return self.dmi.get("System")[0].get("Version", self.default).strip() - - def get_uuid(self): - return self.dmi.get("System")[0].get("UUID", '').strip() - - def get_family(self): - return self.dmi.get("System")[0].get("Family", '') - - def get_chassis(self): - return self.dmi.get("Chassis")[0].get("Type", '_virtual') - - def get_type(self): - chassis_type = self.get_chassis() - return self.translation_to_devicehub(chassis_type) - - def translation_to_devicehub(self, original_type): - lower_type = original_type.lower() - CHASSIS_TYPE = { - 'Desktop': [ - 'desktop', - 'low-profile', - 'tower', - 'docking', - 'all-in-one', - 'pizzabox', - 'mini-tower', - 'space-saving', - 'lunchbox', - 'mini', - 'stick', - ], - 'Laptop': [ - 'portable', - 'laptop', - 'convertible', - 'tablet', - 'detachable', - 'notebook', - 'handheld', - 'sub-notebook', - ], - 'Server': ['server'], - 'Computer': ['_virtual'], - } - for k, v in CHASSIS_TYPE.items(): - if lower_type in v: - return k - return self.default - - def get_chassis_dh(self): - chassis = self.get_chassis() - lower_type = chassis.lower() - for k, v in CHASSIS_DH.items(): - if lower_type in v: - return k - return self.default - - def get_data_storage_type(self, x): - # TODO @cayop add more SSDS types - SSDS = ["nvme"] - SSD = 'SolidStateDrive' - HDD = 'HardDrive' - type_dev = x.get('device', {}).get('type') - trim = x.get('trim', {}).get("supported") in [True, "true"] - return SSD if type_dev in SSDS or trim else HDD - - def get_data_storage_interface(self, x): - interface = x.get('device', {}).get('protocol', 'ATA') - if interface.upper() in DATASTORAGEINTERFACE: - return interface.upper() - - txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format( - self.sid, interface - ) - self.errors("{}".format(err)) - - def get_data_storage_size(self, x): - return x.get('user_capacity', {}).get('bytes') - - def parse_hwinfo(self): - hw_blocks = self.hwinfo_raw.split("\n\n") - return [x.split("\n") for x in hw_blocks] + def get_bios_version(self): + return self.dmi.get("BIOS")[0].get("BIOS Revision", '1') def loads(self, x): if isinstance(x, str): try: - try: - hw = json.loads(x) - except json.decoder.JSONDecodeError: - hw = json.loads(repair_json(x)) - return hw + return json.loads(x) except Exception as ss: logger.warning("%s", ss) return {} @@ -502,4 +405,3 @@ class ParseSnapshot: logger.error(txt) self._errors.append("%s", txt) - diff --git a/requirements.txt b/requirements.txt index 52409e6..f12a20c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ xlrd==2.0.1 odfpy==1.4.1 pytz==2024.2 json-repair==0.30.0 -setuptools==75.5.0 +setuptools==65.5.1 requests==2.32.3 -wheel==0.45.0 +wheel==0.45.1 + diff --git a/utils/save_snapshots.py b/utils/save_snapshots.py index 8c02f06..efd1fe9 100644 --- a/utils/save_snapshots.py +++ b/utils/save_snapshots.py @@ -19,7 +19,10 @@ def move_json(path_name, user, place="snapshots"): def save_in_disk(data, user, place="snapshots"): - uuid = data.get('uuid', '') + uuid = data.get("uuid") + if data.get("credentialSubject"): + uuid = data["credentialSubject"].get("uuid") + now = datetime.now() year = now.year month = now.month