Compare commits
84 commits
main
...
feature/f3
Author | SHA1 | Date | |
---|---|---|---|
Sergio Giménez Antón | bd4f6b7d56 | ||
Sergio Giménez Antón | f9c9c9dd7c | ||
Sergio Giménez Antón | 60ccbec369 | ||
Sergio Giménez Antón | 3fb0961815 | ||
Sergio Giménez Antón | 447946a576 | ||
Cayo Puigdefabregas | 5d190d07a3 | ||
Cayo Puigdefabregas | d1abb206e8 | ||
85bae67189 | |||
d429485651 | |||
07c25f4a92 | |||
Cayo Puigdefabregas | 14277c17cb | ||
Cayo Puigdefabregas | f7051c3130 | ||
Cayo Puigdefabregas | 09be1a2f74 | ||
Cayo Puigdefabregas | a3dd5d9639 | ||
Cayo Puigdefabregas | 3f5460b81f | ||
Cayo Puigdefabregas | bf7975bc24 | ||
8e128557c0 | |||
Cayo Puigdefabregas | 25e7e85548 | ||
Cayo Puigdefabregas | ba126491be | ||
Cayo Puigdefabregas | 81e7ba267d | ||
Cayo Puigdefabregas | 1e08f0fc0c | ||
Cayo Puigdefabregas | ebabb6b228 | ||
Cayo Puigdefabregas | 4954199610 | ||
Cayo Puigdefabregas | e84b72c70b | ||
Cayo Puigdefabregas | 99435fff85 | ||
Cayo Puigdefabregas | 6c0e77891f | ||
Cayo Puigdefabregas | a2d859494b | ||
Cayo Puigdefabregas | ea6d990e56 | ||
Cayo Puigdefabregas | 612737d46c | ||
Cayo Puigdefabregas | 30be57ee25 | ||
Cayo Puigdefabregas | 88bdabb64f | ||
Cayo Puigdefabregas | 96268c8caf | ||
7ed05f0932 | |||
Cayo Puigdefabregas | b652d7d452 | ||
Cayo Puigdefabregas | 04ecb4f2f1 | ||
Cayo Puigdefabregas | 1613eaaa44 | ||
Cayo Puigdefabregas | 06264558df | ||
Cayo Puigdefabregas | 80b4c3b4ca | ||
Cayo Puigdefabregas | e2078c7bde | ||
cfae9d4ec9 | |||
Cayo Puigdefabregas | 578fa73fe5 | ||
f1d57ff618 | |||
Cayo Puigdefabregas | 3cf8ceb5d3 | ||
Cayo Puigdefabregas | b56dc0dfda | ||
Cayo Puigdefabregas | 1c58bff515 | ||
Cayo Puigdefabregas | e6c1ede93c | ||
371845971c | |||
b4efcfb171 | |||
ac0d36ea6f | |||
6a3a2b3a2b | |||
850678fbe4 | |||
f43aaf6ac6 | |||
355ed08561 | |||
d0e46aa0b0 | |||
771b216a31 | |||
263eacda99 | |||
8fcd20f609 | |||
15fb5d3739 | |||
d7ff3c2798 | |||
0e0ad400c2 | |||
367d3a7f87 | |||
c90ed58ea0 | |||
45629db102 | |||
1e29f9562d | |||
d0cac9d1d9 | |||
8b4d1f51f6 | |||
Cayo Puigdefabregas | 34ea4bedfc | ||
Cayo Puigdefabregas | fe429e7db6 | ||
Cayo Puigdefabregas | caf2606fd9 | ||
Cayo Puigdefabregas | 73d478f517 | ||
Cayo Puigdefabregas | 0f03171076 | ||
bfdcb33538 | |||
Cayo Puigdefabregas | 271ac83d71 | ||
Cayo Puigdefabregas | f7b2687ca2 | ||
Cayo Puigdefabregas | 1dad22c3d3 | ||
Cayo Puigdefabregas | 7de6d69a6c | ||
Sergio Giménez Antón | fa5b9eec67 | ||
Cayo Puigdefabregas | 7fd42db3e4 | ||
Cayo Puigdefabregas | bed40d3ee0 | ||
Cayo Puigdefabregas | 9553ed6a4c | ||
Sergio Giménez Antón | f3c9297ffd | ||
cb6c7f6fda | |||
Cayo Puigdefabregas | a0276f439e | ||
a4d361ff9b |
|
@ -1,8 +1,8 @@
|
|||
DH_DOMAIN=localhost
|
||||
DH_PORT=8000
|
||||
DOMAIN=localhost
|
||||
DEMO=true
|
||||
# note that with DEBUG=true, logs are more verbose (include tracebacks)
|
||||
DEBUG=true
|
||||
ALLOWED_HOSTS=${DOMAIN},${DOMAIN}:8000,127.0.0.1,127.0.0.1:8000
|
||||
DPP=false
|
||||
|
||||
STATIC_ROOT=/tmp/static/
|
||||
|
@ -16,7 +16,6 @@ EMAIL_BACKEND="django.core.mail.backends.smtp.EmailBackend"
|
|||
EMAIL_FILE_PATH="/tmp/app-messages"
|
||||
ENABLE_EMAIL=false
|
||||
PREDEFINED_TOKEN='5018dd65-9abd-4a62-8896-80f34ac66150'
|
||||
DH_ALLOWED_HOSTS=${DH_DOMAIN},${DH_DOMAIN}:${DH_PORT},127.0.0.1,127.0.0.1:${DH_PORT}
|
||||
# TODO review these vars
|
||||
#SNAPSHOTS_DIR=/path/to/TODO
|
||||
#EVIDENCES_DIR=/path/to/TODO
|
||||
|
|
|
@ -37,6 +37,9 @@
|
|||
<li class="nav-item">
|
||||
<a class="nav-link" href="{% url 'device:device_web' object.id %}" target="_blank">Web</a>
|
||||
</li>
|
||||
<li class="nav-item">
|
||||
<a href="#environmental_impact" class="nav-link" data-bs-toggle="tab" data-bs-target="#environmental_impact">{% trans 'Environmental impact' %}</a>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
|
@ -242,6 +245,87 @@
|
|||
</div>
|
||||
</div>
|
||||
|
||||
<div class="tab-pane fade" id="environmental_impact">
|
||||
<div class="container-fluid py-3">
|
||||
<div class="d-flex justify-content-end mb-3">
|
||||
<a class="btn btn-success">
|
||||
<i class="bi bi-file-earmark-pdf"></i>
|
||||
{% trans 'Export to PDF' %}
|
||||
</a>
|
||||
</div>
|
||||
|
||||
<div class="row g-4 mb-4">
|
||||
<div class="col-md-4">
|
||||
<div class="card h-100 border-success">
|
||||
<div class="card-body text-center">
|
||||
<div class="mb-3">
|
||||
<i class="bi bi-arrow-down-circle text-success" style="font-size: 2rem;"></i>
|
||||
</div>
|
||||
<h5 class="card-title text-success">Carbon Reduction</h5>
|
||||
<h2 class="mb-2">{{ impact.carbon_saved }}</h2>
|
||||
<p class="card-text text-muted">kg CO₂e saved</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="col-md-4">
|
||||
<div class="card h-100 border-danger">
|
||||
<div class="card-body text-center">
|
||||
<div class="mb-3">
|
||||
<i class="bi bi-cloud-fill text-danger" style="font-size: 2rem;"></i>
|
||||
</div>
|
||||
<h5 class="card-title text-danger">Carbon Consumed</h5>
|
||||
<h2 class="mb-2">{{ impact.co2_emissions }}</h2>
|
||||
<p class="card-text text-muted">kg CO₂e consumed</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="col-md-4">
|
||||
<div class="card h-100 border-success">
|
||||
<div class="card-body text-center">
|
||||
<div class="mb-3">
|
||||
<i class="bi bi-recycle text-success" style="font-size: 2rem;"></i>
|
||||
</div>
|
||||
<h5 class="card-title text-success">Additional Impact Metric</h5>
|
||||
<h2 class="mb-2">85%</h2>
|
||||
<p class="card-text text-muted">whatever</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="row">
|
||||
<div class="col">
|
||||
<div class="card">
|
||||
<div class="card-body">
|
||||
<h5 class="card-title">Impact Details</h5>
|
||||
<div class="table-responsive">
|
||||
<table class="table table-bordered">
|
||||
<tbody>
|
||||
<tr>
|
||||
<th scope="row" class="bg-light" style="width: 30%;">Manufacturing Impact Avoided</th>
|
||||
<td>
|
||||
<span class="text-success">{{ impact.carbon_saved }}</span> kg CO₂e
|
||||
<br />
|
||||
<small class="text-muted">Based on average laptop manufacturing emissions</small>
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<div class="mt-3">
|
||||
<h6>Calculation Method</h6>
|
||||
<small class="text-muted">Based on industry standards X Y and Z</small>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{% if dpps %}
|
||||
<div class="tab-pane fade" id="dpps">
|
||||
<h5 class="card-title">{% trans 'List of dpps' %}</h5>
|
||||
|
|
|
@ -9,6 +9,5 @@ urlpatterns = [
|
|||
path("<str:pk>/", views.DetailsView.as_view(), name="details"),
|
||||
path("<str:pk>/annotation/add", views.AddAnnotationView.as_view(), name="add_annotation"),
|
||||
path("<str:pk>/document/add", views.AddDocumentView.as_view(), name="add_document"),
|
||||
path("<str:pk>/public/", views.PublicDeviceWebView.as_view(), name="device_web"),
|
||||
|
||||
path("<str:pk>/public/", views.PublicDeviceWebView.as_view(), name="device_web")
|
||||
]
|
||||
|
|
|
@ -14,6 +14,7 @@ from evidence.models import Annotation
|
|||
from lot.models import LotTag
|
||||
from device.models import Device
|
||||
from device.forms import DeviceFormSet
|
||||
from environmental_impact.algorithms.algorithm_factory import FactoryEnvironmentImpactAlgorithm
|
||||
if settings.DPP:
|
||||
from dpp.models import Proof
|
||||
from dpp.api_dlt import PROOF_TYPE
|
||||
|
@ -110,10 +111,16 @@ class DetailsView(DashboardView, TemplateView):
|
|||
uuid__in=self.object.uuids,
|
||||
type=PROOF_TYPE["IssueDPP"]
|
||||
)
|
||||
enviromental_impact_algorithm = FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
|
||||
"dummy_calc"
|
||||
)
|
||||
enviromental_impact = enviromental_impact_algorithm.get_device_environmental_impact(
|
||||
self.object)
|
||||
context.update({
|
||||
'object': self.object,
|
||||
'snapshot': self.object.get_last_evidence(),
|
||||
'lot_tags': lot_tags,
|
||||
'impact': enviromental_impact,
|
||||
'dpps': dpps,
|
||||
})
|
||||
return context
|
||||
|
|
|
@ -89,6 +89,7 @@ INSTALLED_APPS = [
|
|||
"dashboard",
|
||||
"admin",
|
||||
"api",
|
||||
"environmental_impact"
|
||||
]
|
||||
|
||||
DPP = config("DPP", default=False, cast=bool)
|
||||
|
|
|
@ -5,14 +5,13 @@ services:
|
|||
dockerfile: docker/devicehub-django.Dockerfile
|
||||
environment:
|
||||
- DEBUG=${DEBUG:-false}
|
||||
- DOMAIN=${DH_DOMAIN:-localhost}
|
||||
- PORT=${DH_PORT:-8000}
|
||||
- ALLOWED_HOSTS=${DH_ALLOWED_HOSTS:-$DH_DOMAIN}
|
||||
- DOMAIN=${DOMAIN:-localhost}
|
||||
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN}
|
||||
- DEMO=${DEMO:-false}
|
||||
- PREDEFINED_TOKEN=${PREDEFINED_TOKEN:-}
|
||||
- DPP=${DPP:-false}
|
||||
volumes:
|
||||
- .:/opt/devicehub-django
|
||||
ports:
|
||||
- ${DH_PORT}:${DH_PORT}
|
||||
- 8000:8000
|
||||
|
||||
|
|
0
environmental_impact/__init__.py
Normal file
0
environmental_impact/__init__.py
Normal file
3
environmental_impact/admin.py
Normal file
3
environmental_impact/admin.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
0
environmental_impact/algorithms/__init__.py
Normal file
0
environmental_impact/algorithms/__init__.py
Normal file
30
environmental_impact/algorithms/algorithm_factory.py
Normal file
30
environmental_impact/algorithms/algorithm_factory.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .dummy_calculator import DummyEnvironmentalImpactAlgorithm
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .algorithm_interface import EnvironmentImpactAlgorithm
|
||||
|
||||
|
||||
class AlgorithmNames():
|
||||
"""
|
||||
Enum class for the different types of algorithms.
|
||||
"""
|
||||
|
||||
DUMMY_CALC = "dummy_calc"
|
||||
|
||||
algorithm_names = {
|
||||
DUMMY_CALC: DummyEnvironmentalImpactAlgorithm()
|
||||
}
|
||||
|
||||
|
||||
class FactoryEnvironmentImpactAlgorithm():
|
||||
|
||||
@staticmethod
|
||||
def run_environmental_impact_calculation(algorithm_name: str) -> EnvironmentImpactAlgorithm:
|
||||
try:
|
||||
return AlgorithmNames.algorithm_names[algorithm_name]
|
||||
except KeyError:
|
||||
raise ValueError("Invalid algorithm name. Valid options are: " +
|
||||
", ".join(AlgorithmNames.algorithm_names.keys()))
|
11
environmental_impact/algorithms/algorithm_interface.py
Normal file
11
environmental_impact/algorithms/algorithm_interface.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from functools import lru_cache
|
||||
from device.models import Device
|
||||
from environmental_impact.models import EnvironmentalImpact
|
||||
|
||||
|
||||
class EnvironmentImpactAlgorithm(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def get_device_environmental_impact(self, device: Device) -> EnvironmentalImpact:
|
||||
pass
|
33
environmental_impact/algorithms/dummy_calculator.py
Normal file
33
environmental_impact/algorithms/dummy_calculator.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
from device.models import Device
|
||||
from .algorithm_interface import EnvironmentImpactAlgorithm
|
||||
from environmental_impact.models import EnvironmentalImpact
|
||||
|
||||
|
||||
class DummyEnvironmentalImpactAlgorithm(EnvironmentImpactAlgorithm):
|
||||
|
||||
def get_device_environmental_impact(self, device: Device) -> EnvironmentalImpact:
|
||||
# TODO Make a constants file / class
|
||||
avg_watts = 40 # Arbitrary laptop average consumption
|
||||
co2_per_kwh = 0.475
|
||||
power_on_hours = self.get_power_on_hours_from(device)
|
||||
energy_kwh = (power_on_hours * avg_watts) / 1000
|
||||
co2_emissions = energy_kwh * co2_per_kwh
|
||||
return EnvironmentalImpact(co2_emissions=co2_emissions)
|
||||
|
||||
def get_power_on_hours_from(self, device: Device) -> int:
|
||||
# TODO how do I check if the device is a legacy workbench? Is there a better way?
|
||||
is_legacy_workbench = False if device.last_evidence.inxi else True
|
||||
if not is_legacy_workbench:
|
||||
storage_components = device.components[9]
|
||||
str_time = storage_components.get('time of used', -1)
|
||||
else:
|
||||
str_time = ""
|
||||
uptime_in_hours = self.convert_str_time_to_hours(str_time, is_legacy_workbench)
|
||||
return uptime_in_hours
|
||||
|
||||
def convert_str_time_to_hours(self, time_str: str, is_legacy_workbench: bool) -> int:
|
||||
if is_legacy_workbench:
|
||||
return -1 # TODO Power on hours not available in legacy workbench
|
||||
else:
|
||||
multipliers = {'y': 365 * 24, 'd': 24, 'h': 1}
|
||||
return sum(int(part[:-1]) * multipliers[part[-1]] for part in time_str.split())
|
6
environmental_impact/apps.py
Normal file
6
environmental_impact/apps.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class EnvironmentalImpactConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "environmental_impact"
|
0
environmental_impact/migrations/__init__.py
Normal file
0
environmental_impact/migrations/__init__.py
Normal file
8
environmental_impact/models.py
Normal file
8
environmental_impact/models.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
from dataclasses import dataclass
|
||||
from django.db import models
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnvironmentalImpact:
|
||||
carbon_saved: float = 0.0
|
||||
co2_emissions: float = 0.0
|
0
environmental_impact/tests/__init__.py
Normal file
0
environmental_impact/tests/__init__.py
Normal file
44
environmental_impact/tests/test_dummy_calculator.py
Normal file
44
environmental_impact/tests/test_dummy_calculator.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
from unittest.mock import patch
|
||||
import uuid
|
||||
from django.test import TestCase
|
||||
from device.models import Device
|
||||
from environmental_impact.models import EnvironmentalImpact
|
||||
from environmental_impact.algorithms.dummy_calculator import DummyEnvironmentalImpactAlgorithm
|
||||
from evidence.models import Evidence
|
||||
|
||||
|
||||
class DummyEnvironmentalImpactAlgorithmTests(TestCase):
|
||||
|
||||
@patch('evidence.models.Evidence.get_doc', return_value={'credentialSubject': {}})
|
||||
@patch('evidence.models.Evidence.get_time', return_value=None)
|
||||
def setUp(self, mock_get_time, mock_get_doc):
|
||||
self.device = Device(id='1')
|
||||
evidence = self.device.last_evidence = Evidence(uuid=uuid.uuid4())
|
||||
evidence.inxi = True
|
||||
evidence.doc = {'credentialSubject': {}}
|
||||
self.algorithm = DummyEnvironmentalImpactAlgorithm()
|
||||
|
||||
def test_get_power_on_hours_from_legacy_device(self):
|
||||
# TODO is there a way to check that?
|
||||
pass
|
||||
|
||||
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
|
||||
def test_get_power_on_hours_from_inxi_device(self, mock_get_components):
|
||||
hours = self.algorithm.get_power_on_hours_from(self.device)
|
||||
self.assertEqual(
|
||||
hours, 8811, "Inxi-parsed devices should correctly compute power-on hours")
|
||||
|
||||
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
|
||||
def test_convert_str_time_to_hours(self, mock_get_components):
|
||||
result = self.algorithm.convert_str_time_to_hours('1y 2d 3h', False)
|
||||
self.assertEqual(
|
||||
result, 8811, "String to hours conversion should match expected output")
|
||||
|
||||
@patch('evidence.models.Evidence.get_components', return_value=[0, 0, 0, 0, 0, 0, 0, 0, 0, {'time of used': '1y 2d 3h'}])
|
||||
def test_environmental_impact_calculation(self, mock_get_components):
|
||||
impact = self.algorithm.get_device_environmental_impact(self.device)
|
||||
self.assertIsInstance(impact, EnvironmentalImpact,
|
||||
"Output should be an EnvironmentalImpact instance")
|
||||
expected_co2 = 8811 * 40 * 0.475 / 1000
|
||||
self.assertAlmostEqual(impact.co2_emissions, expected_co2,
|
||||
2, "CO2 emissions calculation should be accurate")
|
|
@ -0,0 +1,17 @@
|
|||
from environmental_impact.algorithms.algorithm_factory import FactoryEnvironmentImpactAlgorithm
|
||||
from django.test import TestCase
|
||||
from environmental_impact.algorithms.dummy_calculator import DummyEnvironmentalImpactAlgorithm
|
||||
|
||||
|
||||
class FactoryEnvironmentImpactAlgorithmTests(TestCase):
|
||||
|
||||
def test_valid_algorithm_name(self):
|
||||
algorithm = FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
|
||||
'dummy_calc')
|
||||
self.assertIsInstance(algorithm, DummyEnvironmentalImpactAlgorithm,
|
||||
"Factory should return a DummyEnvironmentalImpactAlgorithm instance")
|
||||
|
||||
def test_invalid_algorithm_name(self):
|
||||
with self.assertRaises(ValueError):
|
||||
FactoryEnvironmentImpactAlgorithm.run_environmental_impact_calculation(
|
||||
'invalid_calc')
|
3
environmental_impact/views.py
Normal file
3
environmental_impact/views.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
|
@ -29,8 +29,7 @@ class UploadForm(forms.Form):
|
|||
|
||||
try:
|
||||
file_json = json.loads(file_data)
|
||||
build = Build
|
||||
snap = build(file_json, None, check=True)
|
||||
snap = Build(file_json, None, check=True)
|
||||
exist_annotation = Annotation.objects.filter(
|
||||
uuid=snap.uuid
|
||||
).first()
|
||||
|
@ -58,9 +57,7 @@ class UploadForm(forms.Form):
|
|||
|
||||
for ev in self.evidences:
|
||||
path_name = save_in_disk(ev[1], user.institution.name)
|
||||
build = Build
|
||||
file_json = ev[1]
|
||||
build(file_json, user)
|
||||
Build(ev[1], user)
|
||||
move_json(path_name, user.institution.name)
|
||||
|
||||
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from dmidecode import DMIParse
|
||||
from json_repair import repair_json
|
||||
from evidence.mixin_parse import BuildMix
|
||||
from evidence.legacy_parse_details import get_lshw_child, ParseSnapshot
|
||||
from utils.constants import CHASSIS_DH
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_mac(lshw):
|
||||
try:
|
||||
if type(lshw) is dict:
|
||||
hw = lshw
|
||||
else:
|
||||
hw = json.loads(lshw)
|
||||
except json.decoder.JSONDecodeError:
|
||||
hw = json.loads(repair_json(lshw))
|
||||
|
||||
nets = []
|
||||
get_lshw_child(hw, nets, 'network')
|
||||
|
||||
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
|
||||
|
||||
if nets_sorted:
|
||||
mac = nets_sorted[0]['serial']
|
||||
logger.debug("The snapshot has the following MAC: %s" , mac)
|
||||
return mac
|
||||
|
||||
|
||||
class Build(BuildMix):
|
||||
# This parse is for get info from snapshots created with
|
||||
# workbench-script but builded for send to devicehub-teal
|
||||
|
||||
def get_details(self):
|
||||
dmidecode_raw = self.json["data"]["dmidecode"]
|
||||
self.dmi = DMIParse(dmidecode_raw)
|
||||
|
||||
self.manufacturer = self.dmi.manufacturer().strip()
|
||||
self.model = self.dmi.model().strip()
|
||||
self.chassis = self.get_chassis_dh()
|
||||
self.serial_number = self.dmi.serial_number()
|
||||
self.sku = self.get_sku()
|
||||
self.typ = self.chassis
|
||||
self.version = self.get_version()
|
||||
|
||||
def get_chassis_dh(self):
|
||||
chassis = self.get_chassis()
|
||||
lower_type = chassis.lower()
|
||||
for k, v in CHASSIS_DH.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_sku(self):
|
||||
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
|
||||
|
||||
def get_chassis(self):
|
||||
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
|
||||
|
||||
def get_version(self):
|
||||
return self.dmi.get("System")[0].get("Verson", '_virtual')
|
||||
|
||||
def _get_components(self):
|
||||
data = ParseSnapshot(self.json)
|
||||
self.components = data.components
|
|
@ -1,503 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
|
||||
from datetime import datetime
|
||||
from dmidecode import DMIParse
|
||||
from json_repair import repair_json
|
||||
|
||||
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_lshw_child(child, nets, component):
|
||||
try:
|
||||
if child.get('id') == component:
|
||||
nets.append(child)
|
||||
if child.get('children'):
|
||||
[get_lshw_child(x, nets, component) for x in child['children']]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
|
||||
self.smart_raw = snapshot["data"].get("disks", [])
|
||||
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
|
||||
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
|
||||
self.lscpi_raw = snapshot["data"].get("lspci", "")
|
||||
self.device = {"actions": []}
|
||||
self.components = []
|
||||
self.monitors = []
|
||||
|
||||
self.dmi = DMIParse(self.dmidecode_raw)
|
||||
self.smart = self.loads(self.smart_raw)
|
||||
self.lshw = self.loads(self.lshw_raw)
|
||||
self.hwinfo = self.parse_hwinfo()
|
||||
|
||||
self.set_computer()
|
||||
self.get_hwinfo_monitors()
|
||||
self.set_components()
|
||||
self.snapshot_json = {
|
||||
"type": "Snapshot",
|
||||
"device": self.device,
|
||||
"software": snapshot["software"],
|
||||
"components": self.components,
|
||||
"uuid": snapshot['uuid'],
|
||||
"version": snapshot['version'],
|
||||
"endTime": snapshot["timestamp"],
|
||||
"elapsed": 1,
|
||||
}
|
||||
|
||||
def set_computer(self):
|
||||
self.device['manufacturer'] = self.dmi.manufacturer().strip()
|
||||
self.device['model'] = self.dmi.model().strip()
|
||||
self.device['serialNumber'] = self.dmi.serial_number()
|
||||
self.device['type'] = self.get_type()
|
||||
self.device['sku'] = self.get_sku()
|
||||
self.device['version'] = self.get_version()
|
||||
self.device['system_uuid'] = self.get_uuid()
|
||||
self.device['family'] = self.get_family()
|
||||
self.device['chassis'] = self.get_chassis_dh()
|
||||
|
||||
def set_components(self):
|
||||
self.get_cpu()
|
||||
self.get_ram()
|
||||
self.get_mother_board()
|
||||
self.get_graphic()
|
||||
self.get_data_storage()
|
||||
self.get_display()
|
||||
self.get_sound_card()
|
||||
self.get_networks()
|
||||
|
||||
def get_cpu(self):
|
||||
for cpu in self.dmi.get('Processor'):
|
||||
serial = cpu.get('Serial Number')
|
||||
if serial == 'Not Specified' or not serial:
|
||||
serial = cpu.get('ID').replace(' ', '')
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Processor",
|
||||
"speed": self.get_cpu_speed(cpu),
|
||||
"cores": int(cpu.get('Core Count', 1)),
|
||||
"model": cpu.get('Version'),
|
||||
"threads": int(cpu.get('Thread Count', 1)),
|
||||
"manufacturer": cpu.get('Manufacturer'),
|
||||
"serialNumber": serial,
|
||||
"brand": cpu.get('Family'),
|
||||
"address": self.get_cpu_address(cpu),
|
||||
"bogomips": self.get_bogomips(),
|
||||
}
|
||||
)
|
||||
|
||||
def get_ram(self):
|
||||
for ram in self.dmi.get("Memory Device"):
|
||||
if ram.get('size') == 'No Module Installed':
|
||||
continue
|
||||
if not ram.get("Speed"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "RamModule",
|
||||
"size": self.get_ram_size(ram),
|
||||
"speed": self.get_ram_speed(ram),
|
||||
"manufacturer": ram.get("Manufacturer", self.default),
|
||||
"serialNumber": ram.get("Serial Number", self.default),
|
||||
"interface": ram.get("Type", "DDR"),
|
||||
"format": ram.get("Form Factor", "DIMM"),
|
||||
"model": ram.get("Part Number", self.default),
|
||||
}
|
||||
)
|
||||
|
||||
def get_mother_board(self):
|
||||
for moder_board in self.dmi.get("Baseboard"):
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Motherboard",
|
||||
"version": moder_board.get("Version"),
|
||||
"serialNumber": moder_board.get("Serial Number", "").strip(),
|
||||
"manufacturer": moder_board.get("Manufacturer", "").strip(),
|
||||
"biosDate": self.get_bios_date(),
|
||||
"ramMaxSize": self.get_max_ram_size(),
|
||||
"ramSlots": len(self.dmi.get("Memory Device")),
|
||||
"slots": self.get_ram_slots(),
|
||||
"model": moder_board.get("Product Name", "").strip(),
|
||||
"firewire": self.get_firmware_num(),
|
||||
"pcmcia": self.get_pcmcia_num(),
|
||||
"serial": self.get_serial_num(),
|
||||
"usb": self.get_usb_num(),
|
||||
}
|
||||
)
|
||||
|
||||
def get_graphic(self):
|
||||
displays = []
|
||||
get_lshw_child(self.lshw, displays, 'display')
|
||||
|
||||
for c in displays:
|
||||
if not c['configuration'].get('driver', None):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "GraphicCard",
|
||||
"memory": "",
|
||||
"manufacturer": c.get("vendor", self.default),
|
||||
"model": c.get("product", self.default),
|
||||
"serialNumber": c.get("serial", self.default),
|
||||
}
|
||||
)
|
||||
|
||||
def get_data_storage(self):
|
||||
for sm in self.smart:
|
||||
if sm.get('smartctl', {}).get('exit_status') == 1:
|
||||
continue
|
||||
model = sm.get('model_name')
|
||||
manufacturer = None
|
||||
hours = sm.get("power_on_time", {}).get("hours", 0)
|
||||
if model and len(model.split(" ")) > 1:
|
||||
mm = model.split(" ")
|
||||
model = mm[-1]
|
||||
manufacturer = " ".join(mm[:-1])
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": self.sanitize(sm),
|
||||
"type": self.get_data_storage_type(sm),
|
||||
"model": model,
|
||||
"manufacturer": manufacturer,
|
||||
"serialNumber": sm.get('serial_number'),
|
||||
"size": self.get_data_storage_size(sm),
|
||||
"variant": sm.get("firmware_version"),
|
||||
"interface": self.get_data_storage_interface(sm),
|
||||
"hours": hours,
|
||||
}
|
||||
)
|
||||
|
||||
def sanitize(self, action):
|
||||
return []
|
||||
|
||||
def get_bogomips(self):
|
||||
if not self.hwinfo:
|
||||
return self.default
|
||||
|
||||
bogomips = 0
|
||||
for row in self.hwinfo:
|
||||
for cel in row:
|
||||
if 'BogoMips' in cel:
|
||||
try:
|
||||
bogomips += float(cel.split(":")[-1])
|
||||
except Exception:
|
||||
pass
|
||||
return bogomips
|
||||
|
||||
def get_networks(self):
|
||||
networks = []
|
||||
get_lshw_child(self.lshw, networks, 'network')
|
||||
|
||||
for c in networks:
|
||||
capacity = c.get('capacity')
|
||||
wireless = bool(c.get('configuration', {}).get('wireless', False))
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "NetworkAdapter",
|
||||
"model": c.get('product'),
|
||||
"manufacturer": c.get('vendor'),
|
||||
"serialNumber": c.get('serial'),
|
||||
"speed": capacity,
|
||||
"variant": c.get('version', 1),
|
||||
"wireless": wireless or False,
|
||||
"integrated": "PCI:0000:00" in c.get("businfo", ""),
|
||||
}
|
||||
)
|
||||
|
||||
def get_sound_card(self):
|
||||
multimedias = []
|
||||
get_lshw_child(self.lshw, multimedias, 'multimedia')
|
||||
|
||||
for c in multimedias:
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "SoundCard",
|
||||
"model": c.get('product'),
|
||||
"manufacturer": c.get('vendor'),
|
||||
"serialNumber": c.get('serial'),
|
||||
}
|
||||
)
|
||||
|
||||
def get_display(self): # noqa: C901
|
||||
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
|
||||
|
||||
for c in self.monitors:
|
||||
resolution_width, resolution_height = (None,) * 2
|
||||
refresh, serial, model, manufacturer, size = (None,) * 5
|
||||
year, week, production_date = (None,) * 3
|
||||
|
||||
for x in c:
|
||||
if "Vendor: " in x:
|
||||
manufacturer = x.split('Vendor: ')[-1].strip()
|
||||
if "Model: " in x:
|
||||
model = x.split('Model: ')[-1].strip()
|
||||
if "Serial ID: " in x:
|
||||
serial = x.split('Serial ID: ')[-1].strip()
|
||||
if " Resolution: " in x:
|
||||
rs = x.split(' Resolution: ')[-1].strip()
|
||||
if 'x' in rs:
|
||||
resolution_width, resolution_height = [
|
||||
int(r) for r in rs.split('x')
|
||||
]
|
||||
if "Frequencies: " in x:
|
||||
try:
|
||||
refresh = int(float(x.split(',')[-1].strip()[:-3]))
|
||||
except Exception:
|
||||
pass
|
||||
if 'Year of Manufacture' in x:
|
||||
year = x.split(': ')[1]
|
||||
|
||||
if 'Week of Manufacture' in x:
|
||||
week = x.split(': ')[1]
|
||||
|
||||
if "Size: " in x:
|
||||
size = self.get_size_monitor(x)
|
||||
technology = next((t for t in TECHS if t in c[0]), None)
|
||||
|
||||
if year and week:
|
||||
d = '{} {} 0'.format(year, week)
|
||||
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Display",
|
||||
"model": model,
|
||||
"manufacturer": manufacturer,
|
||||
"serialNumber": serial,
|
||||
'size': size,
|
||||
'resolutionWidth': resolution_width,
|
||||
'resolutionHeight': resolution_height,
|
||||
"productionDate": production_date,
|
||||
'technology': technology,
|
||||
'refreshRate': refresh,
|
||||
}
|
||||
)
|
||||
|
||||
def get_hwinfo_monitors(self):
|
||||
for c in self.hwinfo:
|
||||
monitor = None
|
||||
external = None
|
||||
for x in c:
|
||||
if 'Hardware Class: monitor' in x:
|
||||
monitor = c
|
||||
if 'Driver Info' in x:
|
||||
external = c
|
||||
|
||||
if monitor and not external:
|
||||
self.monitors.append(c)
|
||||
|
||||
def get_size_monitor(self, x):
|
||||
i = 1 / 25.4
|
||||
t = x.split('Size: ')[-1].strip()
|
||||
tt = t.split('mm')
|
||||
if not tt:
|
||||
return 0
|
||||
sizes = tt[0].strip()
|
||||
if 'x' not in sizes:
|
||||
return 0
|
||||
w, h = [int(x) for x in sizes.split('x')]
|
||||
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
|
||||
|
||||
def get_cpu_address(self, cpu):
|
||||
default = 64
|
||||
|
||||
try:
|
||||
for ch in self.lshw.get('children', []):
|
||||
for c in ch.get('children', []):
|
||||
if c['class'] == 'processor':
|
||||
return c.get('width', default)
|
||||
except:
|
||||
return default
|
||||
return default
|
||||
|
||||
def get_usb_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "USB" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_serial_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "SERIAL" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_firmware_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "FIRMWARE" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_pcmcia_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "PCMCIA" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_bios_date(self):
|
||||
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
|
||||
|
||||
def get_firmware(self):
|
||||
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
|
||||
|
||||
def get_max_ram_size(self):
|
||||
size = 0
|
||||
for slot in self.dmi.get("Physical Memory Array"):
|
||||
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
|
||||
size += int(capacity)
|
||||
|
||||
return size
|
||||
|
||||
def get_ram_slots(self):
|
||||
slots = 0
|
||||
for x in self.dmi.get("Physical Memory Array"):
|
||||
slots += int(x.get("Number Of Devices", 0))
|
||||
return slots
|
||||
|
||||
def get_ram_size(self, ram):
|
||||
memory = ram.get("Size", "0")
|
||||
return memory
|
||||
|
||||
def get_ram_speed(self, ram):
|
||||
size = ram.get("Speed", "0")
|
||||
return size
|
||||
|
||||
def get_cpu_speed(self, cpu):
|
||||
speed = cpu.get('Max Speed', "0")
|
||||
return speed
|
||||
|
||||
def get_sku(self):
|
||||
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
|
||||
|
||||
def get_version(self):
|
||||
return self.dmi.get("System")[0].get("Version", self.default).strip()
|
||||
|
||||
def get_uuid(self):
|
||||
return self.dmi.get("System")[0].get("UUID", '').strip()
|
||||
|
||||
def get_family(self):
|
||||
return self.dmi.get("System")[0].get("Family", '')
|
||||
|
||||
def get_chassis(self):
|
||||
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
|
||||
|
||||
def get_type(self):
|
||||
chassis_type = self.get_chassis()
|
||||
return self.translation_to_devicehub(chassis_type)
|
||||
|
||||
def translation_to_devicehub(self, original_type):
|
||||
lower_type = original_type.lower()
|
||||
CHASSIS_TYPE = {
|
||||
'Desktop': [
|
||||
'desktop',
|
||||
'low-profile',
|
||||
'tower',
|
||||
'docking',
|
||||
'all-in-one',
|
||||
'pizzabox',
|
||||
'mini-tower',
|
||||
'space-saving',
|
||||
'lunchbox',
|
||||
'mini',
|
||||
'stick',
|
||||
],
|
||||
'Laptop': [
|
||||
'portable',
|
||||
'laptop',
|
||||
'convertible',
|
||||
'tablet',
|
||||
'detachable',
|
||||
'notebook',
|
||||
'handheld',
|
||||
'sub-notebook',
|
||||
],
|
||||
'Server': ['server'],
|
||||
'Computer': ['_virtual'],
|
||||
}
|
||||
for k, v in CHASSIS_TYPE.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_chassis_dh(self):
|
||||
chassis = self.get_chassis()
|
||||
lower_type = chassis.lower()
|
||||
for k, v in CHASSIS_DH.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_data_storage_type(self, x):
|
||||
# TODO @cayop add more SSDS types
|
||||
SSDS = ["nvme"]
|
||||
SSD = 'SolidStateDrive'
|
||||
HDD = 'HardDrive'
|
||||
type_dev = x.get('device', {}).get('type')
|
||||
trim = x.get('trim', {}).get("supported") in [True, "true"]
|
||||
return SSD if type_dev in SSDS or trim else HDD
|
||||
|
||||
def get_data_storage_interface(self, x):
|
||||
interface = x.get('device', {}).get('protocol', 'ATA')
|
||||
if interface.upper() in DATASTORAGEINTERFACE:
|
||||
return interface.upper()
|
||||
|
||||
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
|
||||
self.sid, interface
|
||||
)
|
||||
self.errors("{}".format(txt))
|
||||
|
||||
def get_data_storage_size(self, x):
|
||||
return x.get('user_capacity', {}).get('bytes')
|
||||
|
||||
def parse_hwinfo(self):
|
||||
hw_blocks = self.hwinfo_raw.split("\n\n")
|
||||
return [x.split("\n") for x in hw_blocks]
|
||||
|
||||
def loads(self, x):
|
||||
if isinstance(x, str):
|
||||
try:
|
||||
try:
|
||||
hw = json.loads(x)
|
||||
except json.decoder.JSONDecodeError:
|
||||
hw = json.loads(repair_json(x))
|
||||
return hw
|
||||
except Exception as ss:
|
||||
logger.warning("%s", ss)
|
||||
return {}
|
||||
return x
|
||||
|
||||
def errors(self, txt=None):
|
||||
if not txt:
|
||||
return self._errors
|
||||
|
||||
logger.error(txt)
|
||||
self._errors.append("%s", txt)
|
|
@ -1,58 +0,0 @@
|
|||
import logging
|
||||
from django.conf import settings
|
||||
|
||||
from utils.constants import ALGOS
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class BuildMix:
|
||||
def __init__(self, evidence_json):
|
||||
self.json = evidence_json
|
||||
self.uuid = self.json.get('uuid')
|
||||
self.manufacturer = ""
|
||||
self.model = ""
|
||||
self.serial_number = ""
|
||||
self.chassis = ""
|
||||
self.sku = ""
|
||||
self.mac = ""
|
||||
self.tpy = ""
|
||||
self.version = ""
|
||||
self.get_details()
|
||||
self.generate_chids()
|
||||
|
||||
def get_hid(self, algo):
|
||||
algorithm = ALGOS.get(algo, [])
|
||||
hid = ""
|
||||
for f in algorithm:
|
||||
if hasattr(self, f):
|
||||
hid += getattr(self, f)
|
||||
return hid
|
||||
|
||||
def generate_chids(self):
|
||||
self.algorithms = {
|
||||
'hidalgo1': self.get_hid('hidalgo1'),
|
||||
}
|
||||
if settings.DPP:
|
||||
self.algorithms["legacy_dpp"] = self.get_hid("legacy_dpp")
|
||||
|
||||
def get_doc(self):
|
||||
self._get_components()
|
||||
for c in self.components:
|
||||
c.pop("actions", None)
|
||||
|
||||
components = sorted(self.components, key=lambda x: x.get("type"))
|
||||
device = self.algorithms.get('legacy_dpp')
|
||||
|
||||
doc = [("computer", device)]
|
||||
|
||||
for c in components:
|
||||
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
|
||||
|
||||
def get_id_hw_dpp(self, d):
|
||||
algorithm = ALGOS.get("legacy_dpp", [])
|
||||
hid = ""
|
||||
for f in algorithm:
|
||||
hid += d.get(f, '')
|
||||
return hid
|
|
@ -6,8 +6,7 @@ from django.db import models
|
|||
|
||||
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
|
||||
from evidence.xapian import search
|
||||
from evidence.parse_details import ParseSnapshot
|
||||
from evidence.normal_parse_details import get_inxi, get_inxi_key
|
||||
from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key
|
||||
from user.models import User, Institution
|
||||
|
||||
|
||||
|
@ -93,7 +92,7 @@ class Evidence:
|
|||
self.inxi = ev["output"]
|
||||
else:
|
||||
dmidecode_raw = self.doc["data"]["dmidecode"]
|
||||
inxi_raw = self.doc.get("data", {}).get("inxi")
|
||||
inxi_raw = self.doc["data"]["inxi"]
|
||||
self.dmi = DMIParse(dmidecode_raw)
|
||||
try:
|
||||
self.inxi = json.loads(inxi_raw)
|
||||
|
@ -135,7 +134,7 @@ class Evidence:
|
|||
return list(self.doc.get('kv').values())[0]
|
||||
|
||||
if self.is_legacy():
|
||||
return self.doc.get('device', {}).get('manufacturer', '')
|
||||
return self.doc['device']['manufacturer']
|
||||
|
||||
if self.inxi:
|
||||
return self.device_manufacturer
|
||||
|
@ -150,7 +149,7 @@ class Evidence:
|
|||
return list(self.doc.get('kv').values())[1]
|
||||
|
||||
if self.is_legacy():
|
||||
return self.doc.get('device', {}).get('model', '')
|
||||
return self.doc['device']['model']
|
||||
|
||||
if self.inxi:
|
||||
return self.device_model
|
||||
|
@ -159,7 +158,7 @@ class Evidence:
|
|||
|
||||
def get_chassis(self):
|
||||
if self.is_legacy():
|
||||
return self.doc.get('device', {}).get('model', '')
|
||||
return self.doc['device']['model']
|
||||
|
||||
if self.inxi:
|
||||
return self.device_chassis
|
||||
|
@ -174,7 +173,7 @@ class Evidence:
|
|||
|
||||
def get_serial_number(self):
|
||||
if self.is_legacy():
|
||||
return self.doc.get('device', {}).get('serialNumber', '')
|
||||
return self.doc['device']['serialNumber']
|
||||
|
||||
if self.inxi:
|
||||
return self.device_serial_number
|
||||
|
@ -196,7 +195,8 @@ class Evidence:
|
|||
).order_by("-created").values_list("uuid", "created").distinct()
|
||||
|
||||
def set_components(self):
|
||||
self.components = ParseSnapshot(self.doc).components
|
||||
snapshot = ParseSnapshot(self.doc).snapshot_json
|
||||
self.components = snapshot['components']
|
||||
|
||||
def is_legacy(self):
|
||||
if self.doc.get("credentialSubject"):
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from evidence.mixin_parse import BuildMix
|
||||
from evidence.normal_parse_details import get_inxi_key, get_inxi, ParseSnapshot
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_mac(inxi):
|
||||
nets = get_inxi_key(inxi, "Network")
|
||||
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
|
||||
|
||||
for n, iface in networks:
|
||||
if get_inxi(n, "port"):
|
||||
return get_inxi(iface, 'mac')
|
||||
|
||||
|
||||
class Build(BuildMix):
|
||||
|
||||
def get_details(self):
|
||||
self.from_credential()
|
||||
try:
|
||||
self.inxi = self.json["data"]["inxi"]
|
||||
if isinstance(self.inxi, str):
|
||||
self.inxi = json.loads(self.inxi)
|
||||
except Exception:
|
||||
logger.error("No inxi in snapshot %s", self.uuid)
|
||||
return ""
|
||||
|
||||
machine = get_inxi_key(self.inxi, 'Machine')
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
self.manufacturer = system
|
||||
self.model = get_inxi(m, "product")
|
||||
self.serial_number = get_inxi(m, "serial")
|
||||
self.chassis = get_inxi(m, "Type")
|
||||
else:
|
||||
self.sku = get_inxi(m, "part-nu")
|
||||
|
||||
self.mac = get_mac(self.inxi) or ""
|
||||
if not self.mac:
|
||||
txt = "Could not retrieve MAC address in snapshot %s"
|
||||
logger.warning(txt, self.uuid)
|
||||
|
||||
def from_credential(self):
|
||||
if not self.json.get("credentialSubject"):
|
||||
return
|
||||
|
||||
self.uuid = self.json.get("credentialSubject", {}).get("uuid")
|
||||
self.json.update(self.json["credentialSubject"])
|
||||
if self.json.get("evidence"):
|
||||
self.json["data"] = {}
|
||||
for ev in self.json["evidence"]:
|
||||
k = ev.get("operation")
|
||||
if not k:
|
||||
continue
|
||||
self.json["data"][k] = ev.get("output")
|
||||
|
||||
def _get_components(self):
|
||||
data = ParseSnapshot(self.json)
|
||||
self.components = data.components
|
|
@ -1,402 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from dmidecode import DMIParse
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_inxi_key(inxi, component):
|
||||
for n in inxi:
|
||||
for k, v in n.items():
|
||||
if component in k:
|
||||
return v
|
||||
|
||||
|
||||
def get_inxi(n, name):
|
||||
for k, v in n.items():
|
||||
if f"#{name}" in k:
|
||||
return v
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
|
||||
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
|
||||
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
|
||||
for ev in snapshot.get("evidence", []):
|
||||
if "dmidecode" == ev.get("operation"):
|
||||
self.dmidecode_raw = ev["output"]
|
||||
if "inxi" == ev.get("operation"):
|
||||
self.inxi_raw = ev["output"]
|
||||
if "smartctl" == ev.get("operation"):
|
||||
self.smart_raw = ev["output"]
|
||||
data = snapshot
|
||||
if snapshot.get("credentialSubject"):
|
||||
data = snapshot["credentialSubject"]
|
||||
|
||||
self.device = {"actions": []}
|
||||
self.components = []
|
||||
|
||||
self.dmi = DMIParse(self.dmidecode_raw)
|
||||
self.smart = self.loads(self.smart_raw)
|
||||
self.inxi = self.loads(self.inxi_raw)
|
||||
|
||||
self.set_computer()
|
||||
self.set_components()
|
||||
self.snapshot_json = {
|
||||
"type": "Snapshot",
|
||||
"device": self.device,
|
||||
"software": data["software"],
|
||||
"components": self.components,
|
||||
"uuid": data['uuid'],
|
||||
"endTime": data["timestamp"],
|
||||
"elapsed": 1,
|
||||
}
|
||||
|
||||
def set_computer(self):
|
||||
machine = get_inxi_key(self.inxi, 'Machine') or []
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
self.device['manufacturer'] = system
|
||||
self.device['model'] = get_inxi(m, "product")
|
||||
self.device['serialNumber'] = get_inxi(m, "serial")
|
||||
self.device['type'] = get_inxi(m, "Type")
|
||||
self.device['chassis'] = self.device['type']
|
||||
self.device['version'] = get_inxi(m, "v")
|
||||
else:
|
||||
self.device['system_uuid'] = get_inxi(m, "uuid")
|
||||
self.device['sku'] = get_inxi(m, "part-nu")
|
||||
|
||||
def set_components(self):
|
||||
self.get_mother_board()
|
||||
self.get_cpu()
|
||||
self.get_ram()
|
||||
self.get_graphic()
|
||||
self.get_display()
|
||||
self.get_networks()
|
||||
self.get_sound_card()
|
||||
self.get_data_storage()
|
||||
self.get_battery()
|
||||
|
||||
def get_mother_board(self):
|
||||
machine = get_inxi_key(self.inxi, 'Machine') or []
|
||||
mb = {"type": "Motherboard",}
|
||||
for m in machine:
|
||||
bios_date = get_inxi(m, "date")
|
||||
if not bios_date:
|
||||
continue
|
||||
mb["manufacturer"] = get_inxi(m, "Mobo")
|
||||
mb["model"] = get_inxi(m, "model")
|
||||
mb["serialNumber"] = get_inxi(m, "serial")
|
||||
mb["version"] = get_inxi(m, "v")
|
||||
mb["biosDate"] = bios_date
|
||||
mb["biosVersion"] = self.get_bios_version()
|
||||
mb["firewire"]: self.get_firmware_num()
|
||||
mb["pcmcia"]: self.get_pcmcia_num()
|
||||
mb["serial"]: self.get_serial_num()
|
||||
mb["usb"]: self.get_usb_num()
|
||||
|
||||
self.get_ram_slots(mb)
|
||||
|
||||
self.components.append(mb)
|
||||
|
||||
def get_ram_slots(self, mb):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
for m in memory:
|
||||
slots = get_inxi(m, "slots")
|
||||
if not slots:
|
||||
continue
|
||||
mb["slots"] = slots
|
||||
mb["ramSlots"] = get_inxi(m, "modules")
|
||||
mb["ramMaxSize"] = get_inxi(m, "capacity")
|
||||
|
||||
|
||||
def get_cpu(self):
|
||||
cpu = get_inxi_key(self.inxi, 'CPU') or []
|
||||
cp = {"type": "Processor"}
|
||||
vulnerabilities = []
|
||||
for c in cpu:
|
||||
base = get_inxi(c, "model")
|
||||
if base:
|
||||
cp["model"] = get_inxi(c, "model")
|
||||
cp["arch"] = get_inxi(c, "arch")
|
||||
cp["bits"] = get_inxi(c, "bits")
|
||||
cp["gen"] = get_inxi(c, "gen")
|
||||
cp["family"] = get_inxi(c, "family")
|
||||
cp["date"] = get_inxi(c, "built")
|
||||
continue
|
||||
des = get_inxi(c, "L1")
|
||||
if des:
|
||||
cp["L1"] = des
|
||||
cp["L2"] = get_inxi(c, "L2")
|
||||
cp["L3"] = get_inxi(c, "L3")
|
||||
cp["cpus"] = get_inxi(c, "cpus")
|
||||
cp["cores"] = get_inxi(c, "cores")
|
||||
cp["threads"] = get_inxi(c, "threads")
|
||||
continue
|
||||
bogo = get_inxi(c, "bogomips")
|
||||
if bogo:
|
||||
cp["bogomips"] = bogo
|
||||
cp["base/boost"] = get_inxi(c, "base/boost")
|
||||
cp["min/max"] = get_inxi(c, "min/max")
|
||||
cp["ext-clock"] = get_inxi(c, "ext-clock")
|
||||
cp["volts"] = get_inxi(c, "volts")
|
||||
continue
|
||||
ctype = get_inxi(c, "Type")
|
||||
if ctype:
|
||||
v = {"Type": ctype}
|
||||
status = get_inxi(c, "status")
|
||||
if status:
|
||||
v["status"] = status
|
||||
mitigation = get_inxi(c, "mitigation")
|
||||
if mitigation:
|
||||
v["mitigation"] = mitigation
|
||||
vulnerabilities.append(v)
|
||||
|
||||
self.components.append(cp)
|
||||
|
||||
|
||||
def get_ram(self):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
mem = {"type": "RamModule"}
|
||||
|
||||
for m in memory:
|
||||
base = get_inxi(m, "System RAM")
|
||||
if base:
|
||||
mem["size"] = get_inxi(m, "total")
|
||||
slot = get_inxi(m, "manufacturer")
|
||||
if slot:
|
||||
mem["manufacturer"] = slot
|
||||
mem["model"] = get_inxi(m, "part-no")
|
||||
mem["serialNumber"] = get_inxi(m, "serial")
|
||||
mem["speed"] = get_inxi(m, "speed")
|
||||
mem["bits"] = get_inxi(m, "data")
|
||||
mem["interface"] = get_inxi(m, "type")
|
||||
module = get_inxi(m, "modules")
|
||||
if module:
|
||||
mem["modules"] = module
|
||||
|
||||
self.components.append(mem)
|
||||
|
||||
def get_graphic(self):
|
||||
graphics = get_inxi_key(self.inxi, 'Graphics') or []
|
||||
|
||||
for c in graphics:
|
||||
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "GraphicCard",
|
||||
"memory": self.get_memory_video(c),
|
||||
"manufacturer": get_inxi(c, "vendor"),
|
||||
"model": get_inxi(c, "Device"),
|
||||
"arch": get_inxi(c, "arch"),
|
||||
"serialNumber": get_inxi(c, "serial"),
|
||||
"integrated": True if get_inxi(c, "port") else False
|
||||
}
|
||||
)
|
||||
|
||||
def get_battery(self):
|
||||
bats = get_inxi_key(self.inxi, 'Battery') or []
|
||||
for b in bats:
|
||||
self.components.append(
|
||||
{
|
||||
"type": "Battery",
|
||||
"model": get_inxi(b, "model"),
|
||||
"serialNumber": get_inxi(b, "serial"),
|
||||
"condition": get_inxi(b, "condition"),
|
||||
"cycles": get_inxi(b, "cycles"),
|
||||
"volts": get_inxi(b, "volts")
|
||||
}
|
||||
)
|
||||
|
||||
def get_memory_video(self, c):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
|
||||
for m in memory:
|
||||
igpu = get_inxi(m, "igpu")
|
||||
agpu = get_inxi(m, "agpu")
|
||||
ngpu = get_inxi(m, "ngpu")
|
||||
gpu = get_inxi(m, "gpu")
|
||||
if igpu or agpu or gpu or ngpu:
|
||||
return igpu or agpu or gpu or ngpu
|
||||
|
||||
return self.default
|
||||
|
||||
def get_data_storage(self):
|
||||
hdds= get_inxi_key(self.inxi, 'Drives') or []
|
||||
for d in hdds:
|
||||
usb = get_inxi(d, "type")
|
||||
if usb == "USB":
|
||||
continue
|
||||
|
||||
serial = get_inxi(d, "serial")
|
||||
if serial:
|
||||
hd = {
|
||||
"type": "Storage",
|
||||
"manufacturer": get_inxi(d, "vendor"),
|
||||
"model": get_inxi(d, "model"),
|
||||
"serialNumber": get_inxi(d, "serial"),
|
||||
"size": get_inxi(d, "size"),
|
||||
"speed": get_inxi(d, "speed"),
|
||||
"interface": get_inxi(d, "tech"),
|
||||
"firmware": get_inxi(d, "fw-rev")
|
||||
}
|
||||
rpm = get_inxi(d, "rpm")
|
||||
if rpm:
|
||||
hd["rpm"] = rpm
|
||||
|
||||
family = get_inxi(d, "family")
|
||||
if family:
|
||||
hd["family"] = family
|
||||
|
||||
sata = get_inxi(d, "sata")
|
||||
if sata:
|
||||
hd["sata"] = sata
|
||||
|
||||
continue
|
||||
|
||||
|
||||
cycles = get_inxi(d, "cycles")
|
||||
if cycles:
|
||||
hd['cycles'] = cycles
|
||||
hd["health"] = get_inxi(d, "health")
|
||||
hd["time of used"] = get_inxi(d, "on")
|
||||
hd["read used"] = get_inxi(d, "read-units")
|
||||
hd["written used"] = get_inxi(d, "written-units")
|
||||
|
||||
self.components.append(hd)
|
||||
continue
|
||||
|
||||
hd = {}
|
||||
|
||||
def sanitize(self, action):
|
||||
return []
|
||||
|
||||
def get_networks(self):
|
||||
nets = get_inxi_key(self.inxi, "Network") or []
|
||||
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
|
||||
|
||||
for n, iface in networks:
|
||||
model = get_inxi(n, "Device")
|
||||
if not model:
|
||||
continue
|
||||
|
||||
interface = ''
|
||||
for k in n.keys():
|
||||
if "port" in k:
|
||||
interface = "Integrated"
|
||||
if "pcie" in k:
|
||||
interface = "PciExpress"
|
||||
if get_inxi(n, "type") == "USB":
|
||||
interface = "USB"
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "NetworkAdapter",
|
||||
"model": model,
|
||||
"manufacturer": get_inxi(n, 'vendor'),
|
||||
"serialNumber": get_inxi(iface, 'mac'),
|
||||
"speed": get_inxi(n, "speed"),
|
||||
"interface": interface,
|
||||
}
|
||||
)
|
||||
|
||||
def get_sound_card(self):
|
||||
audio = get_inxi_key(self.inxi, "Audio") or []
|
||||
|
||||
for c in audio:
|
||||
model = get_inxi(c, "Device")
|
||||
if not model:
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "SoundCard",
|
||||
"model": model,
|
||||
"manufacturer": get_inxi(c, 'vendor'),
|
||||
"serialNumber": get_inxi(c, 'serial'),
|
||||
}
|
||||
)
|
||||
|
||||
def get_display(self):
|
||||
graphics = get_inxi_key(self.inxi, "Graphics") or []
|
||||
for c in graphics:
|
||||
if not get_inxi(c, "Monitor"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "Display",
|
||||
"model": get_inxi(c, "model"),
|
||||
"manufacturer": get_inxi(c, "vendor"),
|
||||
"serialNumber": get_inxi(c, "serial"),
|
||||
'size': get_inxi(c, "size"),
|
||||
'diagonal': get_inxi(c, "diag"),
|
||||
'resolution': get_inxi(c, "res"),
|
||||
"date": get_inxi(c, "built"),
|
||||
'ratio': get_inxi(c, "ratio"),
|
||||
}
|
||||
)
|
||||
|
||||
def get_usb_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "USB" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_serial_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "SERIAL" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_firmware_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "FIRMWARE" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_pcmcia_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "PCMCIA" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_bios_version(self):
|
||||
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
|
||||
|
||||
def loads(self, x):
|
||||
if isinstance(x, str):
|
||||
try:
|
||||
return json.loads(x)
|
||||
except Exception as ss:
|
||||
logger.warning("%s", ss)
|
||||
return {}
|
||||
return x
|
||||
|
||||
def errors(self, txt=None):
|
||||
if not txt:
|
||||
return self._errors
|
||||
|
||||
logger.error(txt)
|
||||
self._errors.append("%s", txt)
|
|
@ -1,22 +0,0 @@
|
|||
import logging
|
||||
|
||||
from evidence.mixin_parse import BuildMix
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class Build(BuildMix):
|
||||
# This parse is for get info from snapshots created with old workbench
|
||||
# normaly is worbench 11
|
||||
|
||||
def get_details(self):
|
||||
device = self.json.get('device', {})
|
||||
self.manufacturer = device.get("manufacturer", '')
|
||||
self.model = device.get("model", '')
|
||||
self.chassis = device.get("chassis", '')
|
||||
self.serial_number = device.get("serialNumber", '')
|
||||
self.sku = device.get("sku", '')
|
||||
|
||||
def _get_components(self):
|
||||
self.components = self.json.get("components", [])
|
|
@ -1,13 +0,0 @@
|
|||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.snapshot_json = snapshot
|
||||
|
||||
self.device = snapshot.get("device")
|
||||
self.components = snapshot.get("components")
|
|
@ -2,14 +2,12 @@ import json
|
|||
import hashlib
|
||||
import logging
|
||||
|
||||
from evidence import legacy_parse
|
||||
from evidence import old_parse
|
||||
from evidence import normal_parse
|
||||
from dmidecode import DMIParse
|
||||
from evidence.parse_details import ParseSnapshot
|
||||
|
||||
from evidence.models import Annotation
|
||||
from evidence.xapian import index
|
||||
from evidence.normal_parse_details import get_inxi_key, get_inxi
|
||||
from evidence.parse_details import get_inxi_key, get_inxi
|
||||
from django.conf import settings
|
||||
|
||||
if settings.DPP:
|
||||
|
@ -26,83 +24,9 @@ def get_mac(inxi):
|
|||
if get_inxi(n, "port"):
|
||||
return get_inxi(iface, 'mac')
|
||||
|
||||
|
||||
class Build:
|
||||
def __init__(self, evidence_json, user, check=False):
|
||||
"""
|
||||
This Build do the save in xapian as document, in Annotations and do
|
||||
register in dlt if is configured for that.
|
||||
|
||||
We have 4 cases for parser diferents snapshots than come from workbench.
|
||||
1) worbench 11 is old_parse.
|
||||
2) legacy is the worbench-script when create a snapshot for devicehub-teal
|
||||
3) some snapshots come as a credential. In this case is parsed as normal_parse
|
||||
4) normal snapshot from worbench-script is the most basic and is parsed as normal_parse
|
||||
"""
|
||||
self.evidence = evidence_json.copy()
|
||||
self.uuid = self.evidence.get('uuid')
|
||||
self.user = user
|
||||
|
||||
if evidence_json.get("credentialSubject"):
|
||||
self.build = normal_parse.Build(evidence_json)
|
||||
self.uuid = evidence_json.get("credentialSubject", {}).get("uuid")
|
||||
elif evidence_json.get("software") != "workbench-script":
|
||||
self.build = old_parse.Build(evidence_json)
|
||||
elif evidence_json.get("data",{}).get("lshw"):
|
||||
self.build = legacy_parse.Build(evidence_json)
|
||||
else:
|
||||
self.build = normal_parse.Build(evidence_json)
|
||||
|
||||
if check:
|
||||
return
|
||||
|
||||
self.index()
|
||||
self.create_annotations()
|
||||
if settings.DPP:
|
||||
self.register_device_dlt()
|
||||
|
||||
def index(self):
|
||||
snap = json.dumps(self.evidence)
|
||||
index(self.user.institution, self.uuid, snap)
|
||||
|
||||
def create_annotations(self):
|
||||
annotation = Annotation.objects.filter(
|
||||
uuid=self.uuid,
|
||||
owner=self.user.institution,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
)
|
||||
|
||||
if annotation:
|
||||
txt = "Warning: Snapshot %s already registered (annotation exists)"
|
||||
logger.warning(txt, self.uuid)
|
||||
return
|
||||
|
||||
for k, v in self.build.algorithms.items():
|
||||
Annotation.objects.create(
|
||||
uuid=self.uuid,
|
||||
owner=self.user.institution,
|
||||
user=self.user,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
key=k,
|
||||
value=self.sign(v)
|
||||
)
|
||||
|
||||
def sign(self, doc):
|
||||
return hashlib.sha3_256(doc.encode()).hexdigest()
|
||||
|
||||
def register_device_dlt(self):
|
||||
legacy_dpp = self.build.algorithms.get('legacy_dpp')
|
||||
chid = self.sign(legacy_dpp)
|
||||
phid = self.sign(json.dumps(self.build.get_doc()))
|
||||
register_device_dlt(chid, phid, self.uuid, self.user)
|
||||
register_passport_dlt(chid, phid, self.uuid, self.user)
|
||||
|
||||
|
||||
class Build2:
|
||||
def __init__(self, evidence_json, user, check=False):
|
||||
if evidence_json.get("data",{}).get("lshw"):
|
||||
if evidence_json.get("software") == "workbench-script":
|
||||
return legacy_parse.Build(evidence_json, user, check=check)
|
||||
|
||||
self.evidence = evidence_json.copy()
|
||||
self.json = evidence_json.copy()
|
||||
|
||||
|
|
|
@ -1,38 +1,407 @@
|
|||
import re
|
||||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
|
||||
from evidence import (
|
||||
legacy_parse_details,
|
||||
normal_parse_details,
|
||||
old_parse_details
|
||||
)
|
||||
from datetime import datetime
|
||||
from dmidecode import DMIParse
|
||||
|
||||
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_inxi_key(inxi, component):
|
||||
for n in inxi:
|
||||
for k, v in n.items():
|
||||
if component in k:
|
||||
return v
|
||||
|
||||
|
||||
def get_inxi(n, name):
|
||||
for k, v in n.items():
|
||||
if f"#{name}" in k:
|
||||
return v
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
|
||||
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
|
||||
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
|
||||
for ev in snapshot.get("evidence", []):
|
||||
if "dmidecode" == ev.get("operation"):
|
||||
self.dmidecode_raw = ev["output"]
|
||||
if "inxi" == ev.get("operation"):
|
||||
self.inxi_raw = ev["output"]
|
||||
if "smartctl" == ev.get("operation"):
|
||||
self.smart_raw = ev["output"]
|
||||
data = snapshot
|
||||
if snapshot.get("credentialSubject"):
|
||||
self.build = normal_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
elif snapshot.get("software") != "workbench-script":
|
||||
self.build = old_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
elif snapshot.get("data",{}).get("lshw"):
|
||||
self.build = legacy_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
data = snapshot["credentialSubject"]
|
||||
|
||||
self.device = {"actions": []}
|
||||
self.components = []
|
||||
|
||||
self.dmi = DMIParse(self.dmidecode_raw)
|
||||
self.smart = self.loads(self.smart_raw)
|
||||
self.inxi = self.loads(self.inxi_raw)
|
||||
|
||||
self.set_computer()
|
||||
self.set_components()
|
||||
self.snapshot_json = {
|
||||
"type": "Snapshot",
|
||||
"device": self.device,
|
||||
"software": data["software"],
|
||||
"components": self.components,
|
||||
"uuid": data['uuid'],
|
||||
"endTime": data["timestamp"],
|
||||
"elapsed": 1,
|
||||
}
|
||||
|
||||
def set_computer(self):
|
||||
machine = get_inxi_key(self.inxi, 'Machine') or []
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
self.device['manufacturer'] = system
|
||||
self.device['model'] = get_inxi(m, "product")
|
||||
self.device['serialNumber'] = get_inxi(m, "serial")
|
||||
self.device['type'] = get_inxi(m, "Type")
|
||||
self.device['chassis'] = self.device['type']
|
||||
self.device['version'] = get_inxi(m, "v")
|
||||
else:
|
||||
self.build = normal_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
self.device['system_uuid'] = get_inxi(m, "uuid")
|
||||
self.device['sku'] = get_inxi(m, "part-nu")
|
||||
|
||||
def set_components(self):
|
||||
self.get_mother_board()
|
||||
self.get_cpu()
|
||||
self.get_ram()
|
||||
self.get_graphic()
|
||||
self.get_display()
|
||||
self.get_networks()
|
||||
self.get_sound_card()
|
||||
self.get_data_storage()
|
||||
self.get_battery()
|
||||
|
||||
def get_mother_board(self):
|
||||
machine = get_inxi_key(self.inxi, 'Machine') or []
|
||||
mb = {"type": "Motherboard",}
|
||||
for m in machine:
|
||||
bios_date = get_inxi(m, "date")
|
||||
if not bios_date:
|
||||
continue
|
||||
mb["manufacturer"] = get_inxi(m, "Mobo")
|
||||
mb["model"] = get_inxi(m, "model")
|
||||
mb["serialNumber"] = get_inxi(m, "serial")
|
||||
mb["version"] = get_inxi(m, "v")
|
||||
mb["biosDate"] = bios_date
|
||||
mb["biosVersion"] = self.get_bios_version()
|
||||
mb["firewire"]: self.get_firmware_num()
|
||||
mb["pcmcia"]: self.get_pcmcia_num()
|
||||
mb["serial"]: self.get_serial_num()
|
||||
mb["usb"]: self.get_usb_num()
|
||||
|
||||
self.get_ram_slots(mb)
|
||||
|
||||
self.components.append(mb)
|
||||
|
||||
def get_ram_slots(self, mb):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
for m in memory:
|
||||
slots = get_inxi(m, "slots")
|
||||
if not slots:
|
||||
continue
|
||||
mb["slots"] = slots
|
||||
mb["ramSlots"] = get_inxi(m, "modules")
|
||||
mb["ramMaxSize"] = get_inxi(m, "capacity")
|
||||
|
||||
|
||||
def get_cpu(self):
|
||||
cpu = get_inxi_key(self.inxi, 'CPU') or []
|
||||
cp = {"type": "Processor"}
|
||||
vulnerabilities = []
|
||||
for c in cpu:
|
||||
base = get_inxi(c, "model")
|
||||
if base:
|
||||
cp["model"] = get_inxi(c, "model")
|
||||
cp["arch"] = get_inxi(c, "arch")
|
||||
cp["bits"] = get_inxi(c, "bits")
|
||||
cp["gen"] = get_inxi(c, "gen")
|
||||
cp["family"] = get_inxi(c, "family")
|
||||
cp["date"] = get_inxi(c, "built")
|
||||
continue
|
||||
des = get_inxi(c, "L1")
|
||||
if des:
|
||||
cp["L1"] = des
|
||||
cp["L2"] = get_inxi(c, "L2")
|
||||
cp["L3"] = get_inxi(c, "L3")
|
||||
cp["cpus"] = get_inxi(c, "cpus")
|
||||
cp["cores"] = get_inxi(c, "cores")
|
||||
cp["threads"] = get_inxi(c, "threads")
|
||||
continue
|
||||
bogo = get_inxi(c, "bogomips")
|
||||
if bogo:
|
||||
cp["bogomips"] = bogo
|
||||
cp["base/boost"] = get_inxi(c, "base/boost")
|
||||
cp["min/max"] = get_inxi(c, "min/max")
|
||||
cp["ext-clock"] = get_inxi(c, "ext-clock")
|
||||
cp["volts"] = get_inxi(c, "volts")
|
||||
continue
|
||||
ctype = get_inxi(c, "Type")
|
||||
if ctype:
|
||||
v = {"Type": ctype}
|
||||
status = get_inxi(c, "status")
|
||||
if status:
|
||||
v["status"] = status
|
||||
mitigation = get_inxi(c, "mitigation")
|
||||
if mitigation:
|
||||
v["mitigation"] = mitigation
|
||||
vulnerabilities.append(v)
|
||||
|
||||
self.components.append(cp)
|
||||
|
||||
|
||||
def get_ram(self):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
mem = {"type": "RamModule"}
|
||||
|
||||
for m in memory:
|
||||
base = get_inxi(m, "System RAM")
|
||||
if base:
|
||||
mem["size"] = get_inxi(m, "total")
|
||||
slot = get_inxi(m, "manufacturer")
|
||||
if slot:
|
||||
mem["manufacturer"] = slot
|
||||
mem["model"] = get_inxi(m, "part-no")
|
||||
mem["serialNumber"] = get_inxi(m, "serial")
|
||||
mem["speed"] = get_inxi(m, "speed")
|
||||
mem["bits"] = get_inxi(m, "data")
|
||||
mem["interface"] = get_inxi(m, "type")
|
||||
module = get_inxi(m, "modules")
|
||||
if module:
|
||||
mem["modules"] = module
|
||||
|
||||
self.components.append(mem)
|
||||
|
||||
def get_graphic(self):
|
||||
graphics = get_inxi_key(self.inxi, 'Graphics') or []
|
||||
|
||||
for c in graphics:
|
||||
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "GraphicCard",
|
||||
"memory": self.get_memory_video(c),
|
||||
"manufacturer": get_inxi(c, "vendor"),
|
||||
"model": get_inxi(c, "Device"),
|
||||
"arch": get_inxi(c, "arch"),
|
||||
"serialNumber": get_inxi(c, "serial"),
|
||||
"integrated": True if get_inxi(c, "port") else False
|
||||
}
|
||||
)
|
||||
|
||||
self.default = default
|
||||
self.device = self.build.snapshot_json.get("device")
|
||||
self.components = self.build.snapshot_json.get("components")
|
||||
def get_battery(self):
|
||||
bats = get_inxi_key(self.inxi, 'Battery') or []
|
||||
for b in bats:
|
||||
self.components.append(
|
||||
{
|
||||
"type": "Battery",
|
||||
"model": get_inxi(b, "model"),
|
||||
"serialNumber": get_inxi(b, "serial"),
|
||||
"condition": get_inxi(b, "condition"),
|
||||
"cycles": get_inxi(b, "cycles"),
|
||||
"volts": get_inxi(b, "volts")
|
||||
}
|
||||
)
|
||||
|
||||
def get_memory_video(self, c):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
|
||||
for m in memory:
|
||||
igpu = get_inxi(m, "igpu")
|
||||
agpu = get_inxi(m, "agpu")
|
||||
ngpu = get_inxi(m, "ngpu")
|
||||
gpu = get_inxi(m, "gpu")
|
||||
if igpu or agpu or gpu or ngpu:
|
||||
return igpu or agpu or gpu or ngpu
|
||||
|
||||
return self.default
|
||||
|
||||
def get_data_storage(self):
|
||||
hdds= get_inxi_key(self.inxi, 'Drives') or []
|
||||
for d in hdds:
|
||||
usb = get_inxi(d, "type")
|
||||
if usb == "USB":
|
||||
continue
|
||||
|
||||
serial = get_inxi(d, "serial")
|
||||
if serial:
|
||||
hd = {
|
||||
"type": "Storage",
|
||||
"manufacturer": get_inxi(d, "vendor"),
|
||||
"model": get_inxi(d, "model"),
|
||||
"serialNumber": get_inxi(d, "serial"),
|
||||
"size": get_inxi(d, "size"),
|
||||
"speed": get_inxi(d, "speed"),
|
||||
"interface": get_inxi(d, "tech"),
|
||||
"firmware": get_inxi(d, "fw-rev")
|
||||
}
|
||||
rpm = get_inxi(d, "rpm")
|
||||
if rpm:
|
||||
hd["rpm"] = rpm
|
||||
|
||||
family = get_inxi(d, "family")
|
||||
if family:
|
||||
hd["family"] = family
|
||||
|
||||
sata = get_inxi(d, "sata")
|
||||
if sata:
|
||||
hd["sata"] = sata
|
||||
|
||||
continue
|
||||
|
||||
|
||||
cycles = get_inxi(d, "cycles")
|
||||
if cycles:
|
||||
hd['cycles'] = cycles
|
||||
hd["health"] = get_inxi(d, "health")
|
||||
hd["time of used"] = get_inxi(d, "on")
|
||||
hd["read used"] = get_inxi(d, "read-units")
|
||||
hd["written used"] = get_inxi(d, "written-units")
|
||||
|
||||
self.components.append(hd)
|
||||
continue
|
||||
|
||||
hd = {}
|
||||
|
||||
def sanitize(self, action):
|
||||
return []
|
||||
|
||||
def get_networks(self):
|
||||
nets = get_inxi_key(self.inxi, "Network") or []
|
||||
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
|
||||
|
||||
for n, iface in networks:
|
||||
model = get_inxi(n, "Device")
|
||||
if not model:
|
||||
continue
|
||||
|
||||
interface = ''
|
||||
for k in n.keys():
|
||||
if "port" in k:
|
||||
interface = "Integrated"
|
||||
if "pcie" in k:
|
||||
interface = "PciExpress"
|
||||
if get_inxi(n, "type") == "USB":
|
||||
interface = "USB"
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "NetworkAdapter",
|
||||
"model": model,
|
||||
"manufacturer": get_inxi(n, 'vendor'),
|
||||
"serialNumber": get_inxi(iface, 'mac'),
|
||||
"speed": get_inxi(n, "speed"),
|
||||
"interface": interface,
|
||||
}
|
||||
)
|
||||
|
||||
def get_sound_card(self):
|
||||
audio = get_inxi_key(self.inxi, "Audio") or []
|
||||
|
||||
for c in audio:
|
||||
model = get_inxi(c, "Device")
|
||||
if not model:
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "SoundCard",
|
||||
"model": model,
|
||||
"manufacturer": get_inxi(c, 'vendor'),
|
||||
"serialNumber": get_inxi(c, 'serial'),
|
||||
}
|
||||
)
|
||||
|
||||
def get_display(self):
|
||||
graphics = get_inxi_key(self.inxi, "Graphics") or []
|
||||
for c in graphics:
|
||||
if not get_inxi(c, "Monitor"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "Display",
|
||||
"model": get_inxi(c, "model"),
|
||||
"manufacturer": get_inxi(c, "vendor"),
|
||||
"serialNumber": get_inxi(c, "serial"),
|
||||
'size': get_inxi(c, "size"),
|
||||
'diagonal': get_inxi(c, "diag"),
|
||||
'resolution': get_inxi(c, "res"),
|
||||
"date": get_inxi(c, "built"),
|
||||
'ratio': get_inxi(c, "ratio"),
|
||||
}
|
||||
)
|
||||
|
||||
def get_usb_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "USB" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_serial_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "SERIAL" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_firmware_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "FIRMWARE" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_pcmcia_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "PCMCIA" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_bios_version(self):
|
||||
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
|
||||
|
||||
def loads(self, x):
|
||||
if isinstance(x, str):
|
||||
try:
|
||||
return json.loads(x)
|
||||
except Exception as ss:
|
||||
logger.warning("%s", ss)
|
||||
return {}
|
||||
return x
|
||||
|
||||
def errors(self, txt=None):
|
||||
if not txt:
|
||||
return self._errors
|
||||
|
||||
logger.error(txt)
|
||||
self._errors.append("%s", txt)
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -13,7 +13,7 @@ HID_ALGO1 = [
|
|||
"manufacturer",
|
||||
"model",
|
||||
"chassis",
|
||||
"serial_number",
|
||||
"serialNumber",
|
||||
"sku"
|
||||
]
|
||||
|
||||
|
@ -21,7 +21,7 @@ LEGACY_DPP = [
|
|||
"manufacturer",
|
||||
"model",
|
||||
"chassis",
|
||||
"serial_number",
|
||||
"serialNumber",
|
||||
"sku",
|
||||
"type",
|
||||
"version"
|
||||
|
|
Loading…
Reference in a new issue