Compare commits

..

84 commits

Author SHA1 Message Date
Sergio Giménez Antón bd4f6b7d56 f31: Initial implementation for environmental impact calculator 2025-01-07 08:06:29 +01:00
Sergio Giménez Antón f9c9c9dd7c Very initial impleentation of co2 consumption 2024-12-17 09:58:20 +01:00
Sergio Giménez Antón 60ccbec369 Merge branch 'main' into feature/f31-device-enviromental-impact 2024-12-17 08:03:31 +01:00
Sergio Giménez Antón 3fb0961815 Add both impact and dpp in the view context 2024-12-16 09:01:05 +01:00
Sergio Giménez Antón 447946a576 Merge branch 'inxi' into feature/f31-device-enviromental-impact 2024-12-16 08:55:00 +01:00
Cayo Puigdefabregas 5d190d07a3 fix rebase from main 2024-12-12 17:11:05 +01:00
Cayo Puigdefabregas d1abb206e8 add inxi in parsing and show in details of devs 2024-12-11 17:41:05 +01:00
pedro 85bae67189 docker entrypoint: bugfix when DPP env var unbound 2024-12-11 17:12:58 +01:00
pedro d429485651 docker entrypoint: adapt it to DPP env var 2024-12-11 17:12:58 +01:00
pedro 07c25f4a92 propagate DPP env var to docker 2024-12-11 17:12:58 +01:00
Cayo Puigdefabregas 14277c17cb activate/deactivate DPP from env 2024-12-11 17:12:56 +01:00
Cayo Puigdefabregas f7051c3130 convert jsonld in credentials for dpps 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 09be1a2f74 drop loggers 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas a3dd5d9639 fix register dpp 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 3f5460b81f fix did dpp 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas bf7975bc24 debug timestamp 2024-12-11 17:09:25 +01:00
pedro 8e128557c0 bugfix attempt verifyProof
co-authored with cayo
2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 25e7e85548 more and more debug 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas ba126491be more debug 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 81e7ba267d fix call to proofs 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 1e08f0fc0c dpp for proofs 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas ebabb6b228 dpp for proofs 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 4954199610 drop actions for dpp 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas e84b72c70b drop actions for dpp 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 99435fff85 debug in proof call 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 6c0e77891f fix cors origin 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas a2d859494b fix json call to chid 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas ea6d990e56 fix phid hash list 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 612737d46c fix phid hash list 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 30be57ee25 fix phid 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 88bdabb64f fix 2024-12-11 17:09:25 +01:00
Cayo Puigdefabregas 96268c8caf new document and out device and components 2024-12-11 17:09:23 +01:00
pedro 7ed05f0932 comment logger trace when DEBUG
is it necessary?
2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas b652d7d452 remove flask sintax for django sintax 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 04ecb4f2f1 remove flask sintax for django sintax 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 1613eaaa44 add_services 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 06264558df add result for dpp and for chid 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 80b4c3b4ca fix new document json 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas e2078c7bde fix get_result 2024-12-11 17:04:20 +01:00
pedro cfae9d4ec9 dhub settings: bugfix wrong DLT TOKEN 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 578fa73fe5 fix get_result for get correct document 2024-12-11 17:04:20 +01:00
pedro f1d57ff618 progress on making it work
still fails
2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 3cf8ceb5d3 remove pdb 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas b56dc0dfda get_result for json 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 1c58bff515 view dpp page 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas e6c1ede93c fix 2024-12-11 17:04:20 +01:00
pedro 371845971c no sudo in docker-reset, all is with user 1000 2024-12-11 17:04:20 +01:00
pedro b4efcfb171 dh-django dockerfile: use uid 1000 (app)
at least temporarily
2024-12-11 17:04:20 +01:00
pedro ac0d36ea6f dh docker: bugfix wrong usage of up_snapshots 2024-12-11 17:04:20 +01:00
pedro 6a3a2b3a2b dh dockerfile: add time debpkg 2024-12-11 17:04:20 +01:00
pedro 850678fbe4 dh docker: bugfix wrong path in rm prev snapshots 2024-12-11 17:04:20 +01:00
pedro f43aaf6ac6 dh docker: create institution before first dlt usr 2024-12-11 17:04:20 +01:00
pedro 355ed08561 bugfix logger 2024-12-11 17:04:20 +01:00
pedro d0e46aa0b0 logger: bugfix function name changed for highlight 2024-12-11 17:04:20 +01:00
pedro 771b216a31 dh docker: cleanup other snapshots when dpp/dlt 2024-12-11 17:04:20 +01:00
pedro 263eacda99 add dpp 2024-12-11 17:04:20 +01:00
pedro 8fcd20f609 dh docker: first migrate, then config 2024-12-11 17:04:20 +01:00
pedro 15fb5d3739 utils/logger: ensure msgs are logged 2024-12-11 17:04:20 +01:00
pedro d7ff3c2798 logger: improve error handling 2024-12-11 17:04:20 +01:00
pedro 0e0ad400c2 dpp/dlt: fix typo 2024-12-11 17:04:20 +01:00
pedro 367d3a7f87 dpp/dlt: fix typo 2024-12-11 17:04:20 +01:00
pedro c90ed58ea0 dpp/dlt: fix typo 2024-12-11 17:04:20 +01:00
pedro 45629db102 dh docker entrypoint: use appropriate new env vars 2024-12-11 17:04:20 +01:00
pedro 1e29f9562d docker: remove unused vars in django
were used in the flask app devicehub-teal
2024-12-11 17:04:20 +01:00
pedro d0cac9d1d9 docker entrypoint: make DB_* optional 2024-12-11 17:04:20 +01:00
pedro 8b4d1f51f6 docker devicehub-django entrypoint 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 34ea4bedfc fix 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas fe429e7db6 fix 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas caf2606fd9 add memberFederated model 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 73d478f517 add did view 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 0f03171076 add commands for setup to dlt 2024-12-11 17:04:20 +01:00
pedro bfdcb33538 docker: add dpp python dep ereuseapitest 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas 271ac83d71 . 2024-12-11 17:04:20 +01:00
Cayo Puigdefabregas f7b2687ca2 register device and dpp in dlt and dpp api 2024-12-11 17:04:11 +01:00
Cayo Puigdefabregas 1dad22c3d3 first base for dpp 2024-12-11 17:02:26 +01:00
Cayo Puigdefabregas 7de6d69a6c fix parsing with credentials 2024-12-05 19:23:53 +01:00
Sergio Giménez Antón fa5b9eec67 Merge branch 'inxi' into feature/f31-device-enviromental-impact 2024-12-05 09:18:03 +01:00
Cayo Puigdefabregas 7fd42db3e4 fix parsing 2024-12-03 16:37:56 +01:00
Cayo Puigdefabregas bed40d3ee0 fix get_hid 2024-11-20 18:41:59 +01:00
Cayo Puigdefabregas 9553ed6a4c fix component empty 2024-11-20 18:35:27 +01:00
Sergio Giménez Antón f3c9297ffd [WIP] Add button for exporting to PDF 2024-11-19 08:27:44 +01:00
Sergio Giménez cb6c7f6fda Merge branch 'main' into feature/f31-device-enviromental-impact 2024-11-16 16:37:30 +01:00
Cayo Puigdefabregas a0276f439e add inxi in parsing and show in details of devs 2024-11-15 12:47:08 +01:00
sergio_gimenez a4d361ff9b Initial view of the enviromental impact without calculations 2024-11-07 08:15:42 +01:00
36 changed files with 683 additions and 1284 deletions

View file

@ -1,8 +1,8 @@
DH_DOMAIN=localhost
DH_PORT=8000
DOMAIN=localhost
DEMO=true
# note that with DEBUG=true, logs are more verbose (include tracebacks)
DEBUG=true
ALLOWED_HOSTS=${DOMAIN},${DOMAIN}:8000,127.0.0.1,127.0.0.1:8000
DPP=false
STATIC_ROOT=/tmp/static/
@ -16,7 +16,6 @@ EMAIL_BACKEND="django.core.mail.backends.smtp.EmailBackend"
EMAIL_FILE_PATH="/tmp/app-messages"
ENABLE_EMAIL=false
PREDEFINED_TOKEN='5018dd65-9abd-4a62-8896-80f34ac66150'
DH_ALLOWED_HOSTS=${DH_DOMAIN},${DH_DOMAIN}:${DH_PORT},127.0.0.1,127.0.0.1:${DH_PORT}
# TODO review these vars
#SNAPSHOTS_DIR=/path/to/TODO
#EVIDENCES_DIR=/path/to/TODO

View file

@ -37,6 +37,9 @@
<li class="nav-item">
<a class="nav-link" href="{% url 'device:device_web' object.id %}" target="_blank">Web</a>
</li>
<li class="nav-item">
<a href="#environmental_impact" class="nav-link" data-bs-toggle="tab" data-bs-target="#environmental_impact">{% trans 'Environmental impact' %}</a>
</li>
</ul>
</div>
</div>
@ -242,6 +245,87 @@
</div>
</div>
<div class="tab-pane fade" id="environmental_impact">
<div class="container-fluid py-3">
<div class="d-flex justify-content-end mb-3">
<a class="btn btn-success">
<i class="bi bi-file-earmark-pdf"></i>
{% trans 'Export to PDF' %}
</a>
</div>
<div class="row g-4 mb-4">
<div class="col-md-4">
<div class="card h-100 border-success">
<div class="card-body text-center">
<div class="mb-3">
<i class="bi bi-arrow-down-circle text-success" style="font-size: 2rem;"></i>
</div>
<h5 class="card-title text-success">Carbon Reduction</h5>
<h2 class="mb-2">{{ impact.carbon_saved }}</h2>
<p class="card-text text-muted">kg CO₂e saved</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card h-100 border-danger">
<div class="card-body text-center">
<div class="mb-3">
<i class="bi bi-cloud-fill text-danger" style="font-size: 2rem;"></i>
</div>
<h5 class="card-title text-danger">Carbon Consumed</h5>
<h2 class="mb-2">{{ impact.co2_emissions }}</h2>
<p class="card-text text-muted">kg CO₂e consumed</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card h-100 border-success">
<div class="card-body text-center">
<div class="mb-3">
<i class="bi bi-recycle text-success" style="font-size: 2rem;"></i>
</div>
<h5 class="card-title text-success">Additional Impact Metric</h5>
<h2 class="mb-2">85%</h2>
<p class="card-text text-muted">whatever</p>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col">
<div class="card">
<div class="card-body">
<h5 class="card-title">Impact Details</h5>
<div class="table-responsive">
<table class="table table-bordered">
<tbody>
<tr>
<th scope="row" class="bg-light" style="width: 30%;">Manufacturing Impact Avoided</th>
<td>
<span class="text-success">{{ impact.carbon_saved }}</span> kg CO₂e
<br />
<small class="text-muted">Based on average laptop manufacturing emissions</small>
</td>
</tr>
</tbody>
</table>
</div>
<div class="mt-3">
<h6>Calculation Method</h6>
<small class="text-muted">Based on industry standards X Y and Z</small>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
{% if dpps %}
<div class="tab-pane fade" id="dpps">
<h5 class="card-title">{% trans 'List of dpps' %}</h5>

View file

@ -9,6 +9,5 @@ urlpatterns = [
path("<str:pk>/", views.DetailsView.as_view(), name="details"),
path("<str:pk>/annotation/add", views.AddAnnotationView.as_view(), name="add_annotation"),
path("<str:pk>/document/add", views.AddDocumentView.as_view(), name="add_document"),
path("<str:pk>/public/", views.PublicDeviceWebView.as_view(), name="device_web"),
path("<str:pk>/public/", views.PublicDeviceWebView.as_view(), name="device_web")
]

View file

@ -14,6 +14,7 @@ from evidence.models import Annotation
from lot.models import LotTag
from device.models import Device
from device.forms import DeviceFormSet
from environmental_impact.algorithms.algorithm_factory import FactoryEnvironmentImpactAlgorithm
if settings.DPP:
from dpp.models import Proof
from dpp.api_dlt import PROOF_TYPE
@ -110,10 +111,16 @@ class DetailsView(DashboardView, TemplateView):
uuid__in=self.object.uuids,
type=PROOF_TYPE["IssueDPP"]
)
enviromental_impact_algorithm = FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
"dummy_calc"
)
enviromental_impact = enviromental_impact_algorithm.get_device_environmental_impact(
self.object)
context.update({
'object': self.object,
'snapshot': self.object.get_last_evidence(),
'lot_tags': lot_tags,
'impact': enviromental_impact,
'dpps': dpps,
})
return context

View file

@ -89,6 +89,7 @@ INSTALLED_APPS = [
"dashboard",
"admin",
"api",
"environmental_impact"
]
DPP = config("DPP", default=False, cast=bool)

View file

@ -5,14 +5,13 @@ services:
dockerfile: docker/devicehub-django.Dockerfile
environment:
- DEBUG=${DEBUG:-false}
- DOMAIN=${DH_DOMAIN:-localhost}
- PORT=${DH_PORT:-8000}
- ALLOWED_HOSTS=${DH_ALLOWED_HOSTS:-$DH_DOMAIN}
- DOMAIN=${DOMAIN:-localhost}
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN}
- DEMO=${DEMO:-false}
- PREDEFINED_TOKEN=${PREDEFINED_TOKEN:-}
- DPP=${DPP:-false}
volumes:
- .:/opt/devicehub-django
ports:
- ${DH_PORT}:${DH_PORT}
- 8000:8000

View file

View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View file

@ -0,0 +1,30 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .dummy_calculator import DummyEnvironmentalImpactAlgorithm
if TYPE_CHECKING:
from .algorithm_interface import EnvironmentImpactAlgorithm
class AlgorithmNames():
"""
Enum class for the different types of algorithms.
"""
DUMMY_CALC = "dummy_calc"
algorithm_names = {
DUMMY_CALC: DummyEnvironmentalImpactAlgorithm()
}
class FactoryEnvironmentImpactAlgorithm():
@staticmethod
def run_environmental_impact_calculation(algorithm_name: str) -> EnvironmentImpactAlgorithm:
try:
return AlgorithmNames.algorithm_names[algorithm_name]
except KeyError:
raise ValueError("Invalid algorithm name. Valid options are: " +
", ".join(AlgorithmNames.algorithm_names.keys()))

View file

@ -0,0 +1,11 @@
from abc import ABC, abstractmethod
from functools import lru_cache
from device.models import Device
from environmental_impact.models import EnvironmentalImpact
class EnvironmentImpactAlgorithm(ABC):
@abstractmethod
def get_device_environmental_impact(self, device: Device) -> EnvironmentalImpact:
pass

View file

@ -0,0 +1,33 @@
from device.models import Device
from .algorithm_interface import EnvironmentImpactAlgorithm
from environmental_impact.models import EnvironmentalImpact
class DummyEnvironmentalImpactAlgorithm(EnvironmentImpactAlgorithm):
def get_device_environmental_impact(self, device: Device) -> EnvironmentalImpact:
# TODO Make a constants file / class
avg_watts = 40 # Arbitrary laptop average consumption
co2_per_kwh = 0.475
power_on_hours = self.get_power_on_hours_from(device)
energy_kwh = (power_on_hours * avg_watts) / 1000
co2_emissions = energy_kwh * co2_per_kwh
return EnvironmentalImpact(co2_emissions=co2_emissions)
def get_power_on_hours_from(self, device: Device) -> int:
# TODO how do I check if the device is a legacy workbench? Is there a better way?
is_legacy_workbench = False if device.last_evidence.inxi else True
if not is_legacy_workbench:
storage_components = device.components[9]
str_time = storage_components.get('time of used', -1)
else:
str_time = ""
uptime_in_hours = self.convert_str_time_to_hours(str_time, is_legacy_workbench)
return uptime_in_hours
def convert_str_time_to_hours(self, time_str: str, is_legacy_workbench: bool) -> int:
if is_legacy_workbench:
return -1 # TODO Power on hours not available in legacy workbench
else:
multipliers = {'y': 365 * 24, 'd': 24, 'h': 1}
return sum(int(part[:-1]) * multipliers[part[-1]] for part in time_str.split())

View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class EnvironmentalImpactConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "environmental_impact"

View file

@ -0,0 +1,8 @@
from dataclasses import dataclass
from django.db import models
@dataclass
class EnvironmentalImpact:
carbon_saved: float = 0.0
co2_emissions: float = 0.0

View file

View file

@ -0,0 +1,44 @@
from unittest.mock import patch
import uuid
from django.test import TestCase
from device.models import Device
from environmental_impact.models import EnvironmentalImpact
from environmental_impact.algorithms.dummy_calculator import DummyEnvironmentalImpactAlgorithm
from evidence.models import Evidence
class DummyEnvironmentalImpactAlgorithmTests(TestCase):
@patch('evidence.models.Evidence.get_doc', return_value={'credentialSubject': {}})
@patch('evidence.models.Evidence.get_time', return_value=None)
def setUp(self, mock_get_time, mock_get_doc):
self.device = Device(id='1')
evidence = self.device.last_evidence = Evidence(uuid=uuid.uuid4())
evidence.inxi = True
evidence.doc = {'credentialSubject': {}}
self.algorithm = DummyEnvironmentalImpactAlgorithm()
def test_get_power_on_hours_from_legacy_device(self):
# TODO is there a way to check that?
pass
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
def test_get_power_on_hours_from_inxi_device(self, mock_get_components):
hours = self.algorithm.get_power_on_hours_from(self.device)
self.assertEqual(
hours, 8811, "Inxi-parsed devices should correctly compute power-on hours")
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
def test_convert_str_time_to_hours(self, mock_get_components):
result = self.algorithm.convert_str_time_to_hours('1y 2d 3h', False)
self.assertEqual(
result, 8811, "String to hours conversion should match expected output")
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
def test_environmental_impact_calculation(self, mock_get_components):
impact = self.algorithm.get_device_environmental_impact(self.device)
self.assertIsInstance(impact, EnvironmentalImpact,
"Output should be an EnvironmentalImpact instance")
expected_co2 = 8811 * 40 * 0.475 / 1000
self.assertAlmostEqual(impact.co2_emissions, expected_co2,
2, "CO2 emissions calculation should be accurate")

View file

@ -0,0 +1,17 @@
from environmental_impact.algorithms.algorithm_factory import FactoryEnvironmentImpactAlgorithm
from django.test import TestCase
from environmental_impact.algorithms.dummy_calculator import DummyEnvironmentalImpactAlgorithm
class FactoryEnvironmentImpactAlgorithmTests(TestCase):
def test_valid_algorithm_name(self):
algorithm = FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
'dummy_calc')
self.assertIsInstance(algorithm, DummyEnvironmentalImpactAlgorithm,
"Factory should return a DummyEnvironmentalImpactAlgorithm instance")
def test_invalid_algorithm_name(self):
with self.assertRaises(ValueError):
FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
'invalid_calc')

View file

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

View file

@ -29,8 +29,7 @@ class UploadForm(forms.Form):
try:
file_json = json.loads(file_data)
build = Build
snap = build(file_json, None, check=True)
snap = Build(file_json, None, check=True)
exist_annotation = Annotation.objects.filter(
uuid=snap.uuid
).first()
@ -58,9 +57,7 @@ class UploadForm(forms.Form):
for ev in self.evidences:
path_name = save_in_disk(ev[1], user.institution.name)
build = Build
file_json = ev[1]
build(file_json, user)
Build(ev[1], user)
move_json(path_name, user.institution.name)

View file

@ -1,69 +0,0 @@
import json
import logging
from dmidecode import DMIParse
from json_repair import repair_json
from evidence.mixin_parse import BuildMix
from evidence.legacy_parse_details import get_lshw_child, ParseSnapshot
from utils.constants import CHASSIS_DH
logger = logging.getLogger('django')
def get_mac(lshw):
try:
if type(lshw) is dict:
hw = lshw
else:
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(lshw))
nets = []
get_lshw_child(hw, nets, 'network')
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
if nets_sorted:
mac = nets_sorted[0]['serial']
logger.debug("The snapshot has the following MAC: %s" , mac)
return mac
class Build(BuildMix):
# This parse is for get info from snapshots created with
# workbench-script but builded for send to devicehub-teal
def get_details(self):
dmidecode_raw = self.json["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw)
self.manufacturer = self.dmi.manufacturer().strip()
self.model = self.dmi.model().strip()
self.chassis = self.get_chassis_dh()
self.serial_number = self.dmi.serial_number()
self.sku = self.get_sku()
self.typ = self.chassis
self.version = self.get_version()
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
def get_version(self):
return self.dmi.get("System")[0].get("Verson", '_virtual')
def _get_components(self):
data = ParseSnapshot(self.json)
self.components = data.components

View file

@ -1,503 +0,0 @@
import json
import logging
import numpy as np
from datetime import datetime
from dmidecode import DMIParse
from json_repair import repair_json
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
logger = logging.getLogger('django')
def get_lshw_child(child, nets, component):
try:
if child.get('id') == component:
nets.append(child)
if child.get('children'):
[get_lshw_child(x, nets, component) for x in child['children']]
except Exception:
return []
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", [])
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
self.lscpi_raw = snapshot["data"].get("lspci", "")
self.device = {"actions": []}
self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw)
self.hwinfo = self.parse_hwinfo()
self.set_computer()
self.get_hwinfo_monitors()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": snapshot["software"],
"components": self.components,
"uuid": snapshot['uuid'],
"version": snapshot['version'],
"endTime": snapshot["timestamp"],
"elapsed": 1,
}
def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer().strip()
self.device['model'] = self.dmi.model().strip()
self.device['serialNumber'] = self.dmi.serial_number()
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['system_uuid'] = self.get_uuid()
self.device['family'] = self.get_family()
self.device['chassis'] = self.get_chassis_dh()
def set_components(self):
self.get_cpu()
self.get_ram()
self.get_mother_board()
self.get_graphic()
self.get_data_storage()
self.get_display()
self.get_sound_card()
self.get_networks()
def get_cpu(self):
for cpu in self.dmi.get('Processor'):
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": serial,
"brand": cpu.get('Family'),
"address": self.get_cpu_address(cpu),
"bogomips": self.get_bogomips(),
}
)
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", "DDR"),
"format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self):
for moder_board in self.dmi.get("Baseboard"):
self.components.append(
{
"actions": [],
"type": "Motherboard",
"version": moder_board.get("Version"),
"serialNumber": moder_board.get("Serial Number", "").strip(),
"manufacturer": moder_board.get("Manufacturer", "").strip(),
"biosDate": self.get_bios_date(),
"ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(),
"model": moder_board.get("Product Name", "").strip(),
"firewire": self.get_firmware_num(),
"pcmcia": self.get_pcmcia_num(),
"serial": self.get_serial_num(),
"usb": self.get_usb_num(),
}
)
def get_graphic(self):
displays = []
get_lshw_child(self.lshw, displays, 'display')
for c in displays:
if not c['configuration'].get('driver', None):
continue
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"memory": "",
"manufacturer": c.get("vendor", self.default),
"model": c.get("product", self.default),
"serialNumber": c.get("serial", self.default),
}
)
def get_data_storage(self):
for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1:
continue
model = sm.get('model_name')
manufacturer = None
hours = sm.get("power_on_time", {}).get("hours", 0)
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": self.sanitize(sm),
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
"hours": hours,
}
)
def sanitize(self, action):
return []
def get_bogomips(self):
if not self.hwinfo:
return self.default
bogomips = 0
for row in self.hwinfo:
for cel in row:
if 'BogoMips' in cel:
try:
bogomips += float(cel.split(":")[-1])
except Exception:
pass
return bogomips
def get_networks(self):
networks = []
get_lshw_child(self.lshw, networks, 'network')
for c in networks:
capacity = c.get('capacity')
wireless = bool(c.get('configuration', {}).get('wireless', False))
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
"speed": capacity,
"variant": c.get('version', 1),
"wireless": wireless or False,
"integrated": "PCI:0000:00" in c.get("businfo", ""),
}
)
def get_sound_card(self):
multimedias = []
get_lshw_child(self.lshw, multimedias, 'multimedia')
for c in multimedias:
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
}
)
def get_display(self): # noqa: C901
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
for c in self.monitors:
resolution_width, resolution_height = (None,) * 2
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append(
{
"actions": [],
"type": "Display",
"model": model,
"manufacturer": manufacturer,
"serialNumber": serial,
'size': size,
'resolutionWidth': resolution_width,
'resolutionHeight': resolution_height,
"productionDate": production_date,
'technology': technology,
'refreshRate': refresh,
}
)
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
def get_cpu_address(self, cpu):
default = 64
try:
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
except:
return default
return default
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_date(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
memory = ram.get("Size", "0")
return memory
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return size
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return speed
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default).strip()
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", '').strip()
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
if interface.upper() in DATASTORAGEINTERFACE:
return interface.upper()
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(txt))
def get_data_storage_size(self, x):
return x.get('user_capacity', {}).get('bytes')
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, str):
try:
try:
hw = json.loads(x)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(x))
return hw
except Exception as ss:
logger.warning("%s", ss)
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)

View file

@ -1,58 +0,0 @@
import logging
from django.conf import settings
from utils.constants import ALGOS
logger = logging.getLogger('django')
class BuildMix:
def __init__(self, evidence_json):
self.json = evidence_json
self.uuid = self.json.get('uuid')
self.manufacturer = ""
self.model = ""
self.serial_number = ""
self.chassis = ""
self.sku = ""
self.mac = ""
self.tpy = ""
self.version = ""
self.get_details()
self.generate_chids()
def get_hid(self, algo):
algorithm = ALGOS.get(algo, [])
hid = ""
for f in algorithm:
if hasattr(self, f):
hid += getattr(self, f)
return hid
def generate_chids(self):
self.algorithms = {
'hidalgo1': self.get_hid('hidalgo1'),
}
if settings.DPP:
self.algorithms["legacy_dpp"] = self.get_hid("legacy_dpp")
def get_doc(self):
self._get_components()
for c in self.components:
c.pop("actions", None)
components = sorted(self.components, key=lambda x: x.get("type"))
device = self.algorithms.get('legacy_dpp')
doc = [("computer", device)]
for c in components:
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
def get_id_hw_dpp(self, d):
algorithm = ALGOS.get("legacy_dpp", [])
hid = ""
for f in algorithm:
hid += d.get(f, '')
return hid

View file

@ -6,8 +6,7 @@ from django.db import models
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search
from evidence.parse_details import ParseSnapshot
from evidence.normal_parse_details import get_inxi, get_inxi_key
from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key
from user.models import User, Institution
@ -93,7 +92,7 @@ class Evidence:
self.inxi = ev["output"]
else:
dmidecode_raw = self.doc["data"]["dmidecode"]
inxi_raw = self.doc.get("data", {}).get("inxi")
inxi_raw = self.doc["data"]["inxi"]
self.dmi = DMIParse(dmidecode_raw)
try:
self.inxi = json.loads(inxi_raw)
@ -135,7 +134,7 @@ class Evidence:
return list(self.doc.get('kv').values())[0]
if self.is_legacy():
return self.doc.get('device', {}).get('manufacturer', '')
return self.doc['device']['manufacturer']
if self.inxi:
return self.device_manufacturer
@ -150,7 +149,7 @@ class Evidence:
return list(self.doc.get('kv').values())[1]
if self.is_legacy():
return self.doc.get('device', {}).get('model', '')
return self.doc['device']['model']
if self.inxi:
return self.device_model
@ -159,7 +158,7 @@ class Evidence:
def get_chassis(self):
if self.is_legacy():
return self.doc.get('device', {}).get('model', '')
return self.doc['device']['model']
if self.inxi:
return self.device_chassis
@ -174,7 +173,7 @@ class Evidence:
def get_serial_number(self):
if self.is_legacy():
return self.doc.get('device', {}).get('serialNumber', '')
return self.doc['device']['serialNumber']
if self.inxi:
return self.device_serial_number
@ -196,7 +195,8 @@ class Evidence:
).order_by("-created").values_list("uuid", "created").distinct()
def set_components(self):
self.components = ParseSnapshot(self.doc).components
snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components']
def is_legacy(self):
if self.doc.get("credentialSubject"):

View file

@ -1,64 +0,0 @@
import json
import logging
from evidence.mixin_parse import BuildMix
from evidence.normal_parse_details import get_inxi_key, get_inxi, ParseSnapshot
logger = logging.getLogger('django')
def get_mac(inxi):
nets = get_inxi_key(inxi, "Network")
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build(BuildMix):
def get_details(self):
self.from_credential()
try:
self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
self.manufacturer = system
self.model = get_inxi(m, "product")
self.serial_number = get_inxi(m, "serial")
self.chassis = get_inxi(m, "Type")
else:
self.sku = get_inxi(m, "part-nu")
self.mac = get_mac(self.inxi) or ""
if not self.mac:
txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, self.uuid)
def from_credential(self):
if not self.json.get("credentialSubject"):
return
self.uuid = self.json.get("credentialSubject", {}).get("uuid")
self.json.update(self.json["credentialSubject"])
if self.json.get("evidence"):
self.json["data"] = {}
for ev in self.json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
def _get_components(self):
data = ParseSnapshot(self.json)
self.components = data.components

View file

@ -1,402 +0,0 @@
import json
import logging
from dmidecode import DMIParse
logger = logging.getLogger('django')
def get_inxi_key(inxi, component):
for n in inxi:
for k, v in n.items():
if component in k:
return v
def get_inxi(n, name):
for k, v in n.items():
if f"#{name}" in k:
return v
return ""
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
for ev in snapshot.get("evidence", []):
if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
data = snapshot["credentialSubject"]
self.device = {"actions": []}
self.components = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.inxi = self.loads(self.inxi_raw)
self.set_computer()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": data["software"],
"components": self.components,
"uuid": data['uuid'],
"endTime": data["timestamp"],
"elapsed": 1,
}
def set_computer(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
for m in machine:
system = get_inxi(m, "System")
if system:
self.device['manufacturer'] = system
self.device['model'] = get_inxi(m, "product")
self.device['serialNumber'] = get_inxi(m, "serial")
self.device['type'] = get_inxi(m, "Type")
self.device['chassis'] = self.device['type']
self.device['version'] = get_inxi(m, "v")
else:
self.device['system_uuid'] = get_inxi(m, "uuid")
self.device['sku'] = get_inxi(m, "part-nu")
def set_components(self):
self.get_mother_board()
self.get_cpu()
self.get_ram()
self.get_graphic()
self.get_display()
self.get_networks()
self.get_sound_card()
self.get_data_storage()
self.get_battery()
def get_mother_board(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
mb = {"type": "Motherboard",}
for m in machine:
bios_date = get_inxi(m, "date")
if not bios_date:
continue
mb["manufacturer"] = get_inxi(m, "Mobo")
mb["model"] = get_inxi(m, "model")
mb["serialNumber"] = get_inxi(m, "serial")
mb["version"] = get_inxi(m, "v")
mb["biosDate"] = bios_date
mb["biosVersion"] = self.get_bios_version()
mb["firewire"]: self.get_firmware_num()
mb["pcmcia"]: self.get_pcmcia_num()
mb["serial"]: self.get_serial_num()
mb["usb"]: self.get_usb_num()
self.get_ram_slots(mb)
self.components.append(mb)
def get_ram_slots(self, mb):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
slots = get_inxi(m, "slots")
if not slots:
continue
mb["slots"] = slots
mb["ramSlots"] = get_inxi(m, "modules")
mb["ramMaxSize"] = get_inxi(m, "capacity")
def get_cpu(self):
cpu = get_inxi_key(self.inxi, 'CPU') or []
cp = {"type": "Processor"}
vulnerabilities = []
for c in cpu:
base = get_inxi(c, "model")
if base:
cp["model"] = get_inxi(c, "model")
cp["arch"] = get_inxi(c, "arch")
cp["bits"] = get_inxi(c, "bits")
cp["gen"] = get_inxi(c, "gen")
cp["family"] = get_inxi(c, "family")
cp["date"] = get_inxi(c, "built")
continue
des = get_inxi(c, "L1")
if des:
cp["L1"] = des
cp["L2"] = get_inxi(c, "L2")
cp["L3"] = get_inxi(c, "L3")
cp["cpus"] = get_inxi(c, "cpus")
cp["cores"] = get_inxi(c, "cores")
cp["threads"] = get_inxi(c, "threads")
continue
bogo = get_inxi(c, "bogomips")
if bogo:
cp["bogomips"] = bogo
cp["base/boost"] = get_inxi(c, "base/boost")
cp["min/max"] = get_inxi(c, "min/max")
cp["ext-clock"] = get_inxi(c, "ext-clock")
cp["volts"] = get_inxi(c, "volts")
continue
ctype = get_inxi(c, "Type")
if ctype:
v = {"Type": ctype}
status = get_inxi(c, "status")
if status:
v["status"] = status
mitigation = get_inxi(c, "mitigation")
if mitigation:
v["mitigation"] = mitigation
vulnerabilities.append(v)
self.components.append(cp)
def get_ram(self):
memory = get_inxi_key(self.inxi, 'Memory') or []
mem = {"type": "RamModule"}
for m in memory:
base = get_inxi(m, "System RAM")
if base:
mem["size"] = get_inxi(m, "total")
slot = get_inxi(m, "manufacturer")
if slot:
mem["manufacturer"] = slot
mem["model"] = get_inxi(m, "part-no")
mem["serialNumber"] = get_inxi(m, "serial")
mem["speed"] = get_inxi(m, "speed")
mem["bits"] = get_inxi(m, "data")
mem["interface"] = get_inxi(m, "type")
module = get_inxi(m, "modules")
if module:
mem["modules"] = module
self.components.append(mem)
def get_graphic(self):
graphics = get_inxi_key(self.inxi, 'Graphics') or []
for c in graphics:
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
continue
self.components.append(
{
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": get_inxi(c, "vendor"),
"model": get_inxi(c, "Device"),
"arch": get_inxi(c, "arch"),
"serialNumber": get_inxi(c, "serial"),
"integrated": True if get_inxi(c, "port") else False
}
)
def get_battery(self):
bats = get_inxi_key(self.inxi, 'Battery') or []
for b in bats:
self.components.append(
{
"type": "Battery",
"model": get_inxi(b, "model"),
"serialNumber": get_inxi(b, "serial"),
"condition": get_inxi(b, "condition"),
"cycles": get_inxi(b, "cycles"),
"volts": get_inxi(b, "volts")
}
)
def get_memory_video(self, c):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
igpu = get_inxi(m, "igpu")
agpu = get_inxi(m, "agpu")
ngpu = get_inxi(m, "ngpu")
gpu = get_inxi(m, "gpu")
if igpu or agpu or gpu or ngpu:
return igpu or agpu or gpu or ngpu
return self.default
def get_data_storage(self):
hdds= get_inxi_key(self.inxi, 'Drives') or []
for d in hdds:
usb = get_inxi(d, "type")
if usb == "USB":
continue
serial = get_inxi(d, "serial")
if serial:
hd = {
"type": "Storage",
"manufacturer": get_inxi(d, "vendor"),
"model": get_inxi(d, "model"),
"serialNumber": get_inxi(d, "serial"),
"size": get_inxi(d, "size"),
"speed": get_inxi(d, "speed"),
"interface": get_inxi(d, "tech"),
"firmware": get_inxi(d, "fw-rev")
}
rpm = get_inxi(d, "rpm")
if rpm:
hd["rpm"] = rpm
family = get_inxi(d, "family")
if family:
hd["family"] = family
sata = get_inxi(d, "sata")
if sata:
hd["sata"] = sata
continue
cycles = get_inxi(d, "cycles")
if cycles:
hd['cycles'] = cycles
hd["health"] = get_inxi(d, "health")
hd["time of used"] = get_inxi(d, "on")
hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units")
self.components.append(hd)
continue
hd = {}
def sanitize(self, action):
return []
def get_networks(self):
nets = get_inxi_key(self.inxi, "Network") or []
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
model = get_inxi(n, "Device")
if not model:
continue
interface = ''
for k in n.keys():
if "port" in k:
interface = "Integrated"
if "pcie" in k:
interface = "PciExpress"
if get_inxi(n, "type") == "USB":
interface = "USB"
self.components.append(
{
"type": "NetworkAdapter",
"model": model,
"manufacturer": get_inxi(n, 'vendor'),
"serialNumber": get_inxi(iface, 'mac'),
"speed": get_inxi(n, "speed"),
"interface": interface,
}
)
def get_sound_card(self):
audio = get_inxi_key(self.inxi, "Audio") or []
for c in audio:
model = get_inxi(c, "Device")
if not model:
continue
self.components.append(
{
"type": "SoundCard",
"model": model,
"manufacturer": get_inxi(c, 'vendor'),
"serialNumber": get_inxi(c, 'serial'),
}
)
def get_display(self):
graphics = get_inxi_key(self.inxi, "Graphics") or []
for c in graphics:
if not get_inxi(c, "Monitor"):
continue
self.components.append(
{
"type": "Display",
"model": get_inxi(c, "model"),
"manufacturer": get_inxi(c, "vendor"),
"serialNumber": get_inxi(c, "serial"),
'size': get_inxi(c, "size"),
'diagonal': get_inxi(c, "diag"),
'resolution': get_inxi(c, "res"),
"date": get_inxi(c, "built"),
'ratio': get_inxi(c, "ratio"),
}
)
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_version(self):
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
def loads(self, x):
if isinstance(x, str):
try:
return json.loads(x)
except Exception as ss:
logger.warning("%s", ss)
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)

View file

@ -1,22 +0,0 @@
import logging
from evidence.mixin_parse import BuildMix
logger = logging.getLogger('django')
class Build(BuildMix):
# This parse is for get info from snapshots created with old workbench
# normaly is worbench 11
def get_details(self):
device = self.json.get('device', {})
self.manufacturer = device.get("manufacturer", '')
self.model = device.get("model", '')
self.chassis = device.get("chassis", '')
self.serial_number = device.get("serialNumber", '')
self.sku = device.get("sku", '')
def _get_components(self):
self.components = self.json.get("components", [])

View file

@ -1,13 +0,0 @@
import logging
logger = logging.getLogger('django')
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.snapshot_json = snapshot
self.device = snapshot.get("device")
self.components = snapshot.get("components")

View file

@ -2,14 +2,12 @@ import json
import hashlib
import logging
from evidence import legacy_parse
from evidence import old_parse
from evidence import normal_parse
from dmidecode import DMIParse
from evidence.parse_details import ParseSnapshot
from evidence.models import Annotation
from evidence.xapian import index
from evidence.normal_parse_details import get_inxi_key, get_inxi
from evidence.parse_details import get_inxi_key, get_inxi
from django.conf import settings
if settings.DPP:
@ -26,83 +24,9 @@ def get_mac(inxi):
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build:
def __init__(self, evidence_json, user, check=False):
"""
This Build do the save in xapian as document, in Annotations and do
register in dlt if is configured for that.
We have 4 cases for parser diferents snapshots than come from workbench.
1) worbench 11 is old_parse.
2) legacy is the worbench-script when create a snapshot for devicehub-teal
3) some snapshots come as a credential. In this case is parsed as normal_parse
4) normal snapshot from worbench-script is the most basic and is parsed as normal_parse
"""
self.evidence = evidence_json.copy()
self.uuid = self.evidence.get('uuid')
self.user = user
if evidence_json.get("credentialSubject"):
self.build = normal_parse.Build(evidence_json)
self.uuid = evidence_json.get("credentialSubject", {}).get("uuid")
elif evidence_json.get("software") != "workbench-script":
self.build = old_parse.Build(evidence_json)
elif evidence_json.get("data",{}).get("lshw"):
self.build = legacy_parse.Build(evidence_json)
else:
self.build = normal_parse.Build(evidence_json)
if check:
return
self.index()
self.create_annotations()
if settings.DPP:
self.register_device_dlt()
def index(self):
snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap)
def create_annotations(self):
annotation = Annotation.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
logger.warning(txt, self.uuid)
return
for k, v in self.build.algorithms.items():
Annotation.objects.create(
uuid=self.uuid,
owner=self.user.institution,
user=self.user,
type=Annotation.Type.SYSTEM,
key=k,
value=self.sign(v)
)
def sign(self, doc):
return hashlib.sha3_256(doc.encode()).hexdigest()
def register_device_dlt(self):
legacy_dpp = self.build.algorithms.get('legacy_dpp')
chid = self.sign(legacy_dpp)
phid = self.sign(json.dumps(self.build.get_doc()))
register_device_dlt(chid, phid, self.uuid, self.user)
register_passport_dlt(chid, phid, self.uuid, self.user)
class Build2:
def __init__(self, evidence_json, user, check=False):
if evidence_json.get("data",{}).get("lshw"):
if evidence_json.get("software") == "workbench-script":
return legacy_parse.Build(evidence_json, user, check=check)
self.evidence = evidence_json.copy()
self.json = evidence_json.copy()

View file

@ -1,38 +1,407 @@
import re
import json
import logging
import numpy as np
from evidence import (
legacy_parse_details,
normal_parse_details,
old_parse_details
)
from datetime import datetime
from dmidecode import DMIParse
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
logger = logging.getLogger('django')
def get_inxi_key(inxi, component):
for n in inxi:
for k, v in n.items():
if component in k:
return v
def get_inxi(n, name):
for k, v in n.items():
if f"#{name}" in k:
return v
return ""
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
for ev in snapshot.get("evidence", []):
if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
self.build = normal_parse_details.ParseSnapshot(
snapshot,
default=default
)
elif snapshot.get("software") != "workbench-script":
self.build = old_parse_details.ParseSnapshot(
snapshot,
default=default
)
elif snapshot.get("data",{}).get("lshw"):
self.build = legacy_parse_details.ParseSnapshot(
snapshot,
default=default
)
data = snapshot["credentialSubject"]
self.device = {"actions": []}
self.components = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.inxi = self.loads(self.inxi_raw)
self.set_computer()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": data["software"],
"components": self.components,
"uuid": data['uuid'],
"endTime": data["timestamp"],
"elapsed": 1,
}
def set_computer(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
for m in machine:
system = get_inxi(m, "System")
if system:
self.device['manufacturer'] = system
self.device['model'] = get_inxi(m, "product")
self.device['serialNumber'] = get_inxi(m, "serial")
self.device['type'] = get_inxi(m, "Type")
self.device['chassis'] = self.device['type']
self.device['version'] = get_inxi(m, "v")
else:
self.build = normal_parse_details.ParseSnapshot(
snapshot,
default=default
self.device['system_uuid'] = get_inxi(m, "uuid")
self.device['sku'] = get_inxi(m, "part-nu")
def set_components(self):
self.get_mother_board()
self.get_cpu()
self.get_ram()
self.get_graphic()
self.get_display()
self.get_networks()
self.get_sound_card()
self.get_data_storage()
self.get_battery()
def get_mother_board(self):
machine = get_inxi_key(self.inxi, 'Machine') or []
mb = {"type": "Motherboard",}
for m in machine:
bios_date = get_inxi(m, "date")
if not bios_date:
continue
mb["manufacturer"] = get_inxi(m, "Mobo")
mb["model"] = get_inxi(m, "model")
mb["serialNumber"] = get_inxi(m, "serial")
mb["version"] = get_inxi(m, "v")
mb["biosDate"] = bios_date
mb["biosVersion"] = self.get_bios_version()
mb["firewire"]: self.get_firmware_num()
mb["pcmcia"]: self.get_pcmcia_num()
mb["serial"]: self.get_serial_num()
mb["usb"]: self.get_usb_num()
self.get_ram_slots(mb)
self.components.append(mb)
def get_ram_slots(self, mb):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
slots = get_inxi(m, "slots")
if not slots:
continue
mb["slots"] = slots
mb["ramSlots"] = get_inxi(m, "modules")
mb["ramMaxSize"] = get_inxi(m, "capacity")
def get_cpu(self):
cpu = get_inxi_key(self.inxi, 'CPU') or []
cp = {"type": "Processor"}
vulnerabilities = []
for c in cpu:
base = get_inxi(c, "model")
if base:
cp["model"] = get_inxi(c, "model")
cp["arch"] = get_inxi(c, "arch")
cp["bits"] = get_inxi(c, "bits")
cp["gen"] = get_inxi(c, "gen")
cp["family"] = get_inxi(c, "family")
cp["date"] = get_inxi(c, "built")
continue
des = get_inxi(c, "L1")
if des:
cp["L1"] = des
cp["L2"] = get_inxi(c, "L2")
cp["L3"] = get_inxi(c, "L3")
cp["cpus"] = get_inxi(c, "cpus")
cp["cores"] = get_inxi(c, "cores")
cp["threads"] = get_inxi(c, "threads")
continue
bogo = get_inxi(c, "bogomips")
if bogo:
cp["bogomips"] = bogo
cp["base/boost"] = get_inxi(c, "base/boost")
cp["min/max"] = get_inxi(c, "min/max")
cp["ext-clock"] = get_inxi(c, "ext-clock")
cp["volts"] = get_inxi(c, "volts")
continue
ctype = get_inxi(c, "Type")
if ctype:
v = {"Type": ctype}
status = get_inxi(c, "status")
if status:
v["status"] = status
mitigation = get_inxi(c, "mitigation")
if mitigation:
v["mitigation"] = mitigation
vulnerabilities.append(v)
self.components.append(cp)
def get_ram(self):
memory = get_inxi_key(self.inxi, 'Memory') or []
mem = {"type": "RamModule"}
for m in memory:
base = get_inxi(m, "System RAM")
if base:
mem["size"] = get_inxi(m, "total")
slot = get_inxi(m, "manufacturer")
if slot:
mem["manufacturer"] = slot
mem["model"] = get_inxi(m, "part-no")
mem["serialNumber"] = get_inxi(m, "serial")
mem["speed"] = get_inxi(m, "speed")
mem["bits"] = get_inxi(m, "data")
mem["interface"] = get_inxi(m, "type")
module = get_inxi(m, "modules")
if module:
mem["modules"] = module
self.components.append(mem)
def get_graphic(self):
graphics = get_inxi_key(self.inxi, 'Graphics') or []
for c in graphics:
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
continue
self.components.append(
{
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": get_inxi(c, "vendor"),
"model": get_inxi(c, "Device"),
"arch": get_inxi(c, "arch"),
"serialNumber": get_inxi(c, "serial"),
"integrated": True if get_inxi(c, "port") else False
}
)
self.default = default
self.device = self.build.snapshot_json.get("device")
self.components = self.build.snapshot_json.get("components")
def get_battery(self):
bats = get_inxi_key(self.inxi, 'Battery') or []
for b in bats:
self.components.append(
{
"type": "Battery",
"model": get_inxi(b, "model"),
"serialNumber": get_inxi(b, "serial"),
"condition": get_inxi(b, "condition"),
"cycles": get_inxi(b, "cycles"),
"volts": get_inxi(b, "volts")
}
)
def get_memory_video(self, c):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
igpu = get_inxi(m, "igpu")
agpu = get_inxi(m, "agpu")
ngpu = get_inxi(m, "ngpu")
gpu = get_inxi(m, "gpu")
if igpu or agpu or gpu or ngpu:
return igpu or agpu or gpu or ngpu
return self.default
def get_data_storage(self):
hdds= get_inxi_key(self.inxi, 'Drives') or []
for d in hdds:
usb = get_inxi(d, "type")
if usb == "USB":
continue
serial = get_inxi(d, "serial")
if serial:
hd = {
"type": "Storage",
"manufacturer": get_inxi(d, "vendor"),
"model": get_inxi(d, "model"),
"serialNumber": get_inxi(d, "serial"),
"size": get_inxi(d, "size"),
"speed": get_inxi(d, "speed"),
"interface": get_inxi(d, "tech"),
"firmware": get_inxi(d, "fw-rev")
}
rpm = get_inxi(d, "rpm")
if rpm:
hd["rpm"] = rpm
family = get_inxi(d, "family")
if family:
hd["family"] = family
sata = get_inxi(d, "sata")
if sata:
hd["sata"] = sata
continue
cycles = get_inxi(d, "cycles")
if cycles:
hd['cycles'] = cycles
hd["health"] = get_inxi(d, "health")
hd["time of used"] = get_inxi(d, "on")
hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units")
self.components.append(hd)
continue
hd = {}
def sanitize(self, action):
return []
def get_networks(self):
nets = get_inxi_key(self.inxi, "Network") or []
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
model = get_inxi(n, "Device")
if not model:
continue
interface = ''
for k in n.keys():
if "port" in k:
interface = "Integrated"
if "pcie" in k:
interface = "PciExpress"
if get_inxi(n, "type") == "USB":
interface = "USB"
self.components.append(
{
"type": "NetworkAdapter",
"model": model,
"manufacturer": get_inxi(n, 'vendor'),
"serialNumber": get_inxi(iface, 'mac'),
"speed": get_inxi(n, "speed"),
"interface": interface,
}
)
def get_sound_card(self):
audio = get_inxi_key(self.inxi, "Audio") or []
for c in audio:
model = get_inxi(c, "Device")
if not model:
continue
self.components.append(
{
"type": "SoundCard",
"model": model,
"manufacturer": get_inxi(c, 'vendor'),
"serialNumber": get_inxi(c, 'serial'),
}
)
def get_display(self):
graphics = get_inxi_key(self.inxi, "Graphics") or []
for c in graphics:
if not get_inxi(c, "Monitor"):
continue
self.components.append(
{
"type": "Display",
"model": get_inxi(c, "model"),
"manufacturer": get_inxi(c, "vendor"),
"serialNumber": get_inxi(c, "serial"),
'size': get_inxi(c, "size"),
'diagonal': get_inxi(c, "diag"),
'resolution': get_inxi(c, "res"),
"date": get_inxi(c, "built"),
'ratio': get_inxi(c, "ratio"),
}
)
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_version(self):
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
def loads(self, x):
if isinstance(x, str):
try:
return json.loads(x)
except Exception as ss:
logger.warning("%s", ss)
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -13,7 +13,7 @@ HID_ALGO1 = [
"manufacturer",
"model",
"chassis",
"serial_number",
"serialNumber",
"sku"
]
@ -21,7 +21,7 @@ LEGACY_DPP = [
"manufacturer",
"model",
"chassis",
"serial_number",
"serialNumber",
"sku",
"type",
"version"