add sanitize actions

This commit is contained in:
Cayo Puigdefabregas 2023-07-14 17:40:18 +02:00
parent 0d957ecc4a
commit d8d2931720
2 changed files with 82 additions and 10 deletions

View File

@ -26,6 +26,7 @@ class ParseSnapshot:
self.hwinfo_raw = snapshot["hwmd"]["hwinfo"] self.hwinfo_raw = snapshot["hwmd"]["hwinfo"]
self.lshw_raw = snapshot["hwmd"]["lshw"] self.lshw_raw = snapshot["hwmd"]["lshw"]
self.lscpi_raw = snapshot["hwmd"]["lspci"] self.lscpi_raw = snapshot["hwmd"]["lspci"]
self.sanitize_raw = snapshot["sanitize"]
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
self.monitors = [] self.monitors = []
@ -73,6 +74,7 @@ class ParseSnapshot:
self.get_display() self.get_display()
self.get_sound_card() self.get_sound_card()
self.get_networks() self.get_networks()
self.get_networks()
def get_cpu(self): def get_cpu(self):
for cpu in self.dmi.get('Processor'): for cpu in self.dmi.get('Processor'):
@ -169,7 +171,7 @@ class ParseSnapshot:
self.components.append( self.components.append(
{ {
"actions": [], "actions": self.sanitize(sm),
"type": self.get_data_storage_type(sm), "type": self.get_data_storage_type(sm),
"model": model, "model": model,
"manufacturer": manufacturer, "manufacturer": manufacturer,
@ -180,6 +182,41 @@ class ParseSnapshot:
} }
) )
def sanitize(self, disk):
disk_sanitize = None
for d in self.sanitize_raw:
s = d.get('device_info', {}).get('export_data', {})
s = s.get('block', {}).get('serial')
if s == disk.get('serial_number'):
disk_sanitize = d
break
if not disk_sanitize:
return []
steps = []
erase = {
'type': 'EraseBasic',
'severity': disk_sanitize['severity'].name,
'steps': steps,
'startTime': None,
'endTime': None,
}
for step in disk_sanitize.get('steps', []):
steps.append(
{
'severity': step['severity'].name,
'startTime': step['start_time'].isoformat(),
'endTime': step['end_time'].isoformat(),
'type': 'StepRandom',
}
)
erase['endTime'] = step['end_time'].isoformat()
if not erase['startTime']:
erase['startTime'] = step['start_time'].isoformat()
return [erase]
def get_networks(self): def get_networks(self):
nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'network') nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'network')
for c in nodes: for c in nodes:

View File

@ -1,8 +1,12 @@
from datetime import datetime
from flask import current_app as app from flask import current_app as app
from marshmallow import Schema as MarshmallowSchema from marshmallow import Schema as MarshmallowSchema
from marshmallow import ValidationError, validates_schema from marshmallow import ValidationError, pre_load, validates_schema
from marshmallow.fields import Dict, List, Nested, String from marshmallow.fields import DateTime, Dict, Integer, List, Nested, String
from marshmallow_enum import EnumField
from ereuse_devicehub.resources.enums import Severity, SnapshotSoftware
from ereuse_devicehub.resources.schemas import Thing from ereuse_devicehub.resources.schemas import Thing
# from marshmallow_enum import EnumField # from marshmallow_enum import EnumField
@ -21,20 +25,51 @@ class Test(MarshmallowSchema):
type = String(required=True) type = String(required=True)
class Steps(MarshmallowSchema):
num = Integer(data_key='step', required=True)
start_time = DateTime(data_key='date_init', required=True)
end_time = DateTime(data_key='date_end', required=True)
severity = EnumField(Severity)
@pre_load
def preload_datas(self, data: dict):
data['severity'] = Severity.Info.name
if not data.pop('success', False):
data['severity'] = Severity.Error.name
data.pop('duration', None)
data.pop('commands', None)
if data.get('date_init'):
data['date_init'] = datetime.fromtimestamp(data['date_init']).isoformat()
data['date_end'] = datetime.fromtimestamp(data['date_end']).isoformat()
class Sanitize(MarshmallowSchema): class Sanitize(MarshmallowSchema):
type = String(required=True) steps = Nested(Steps, many=True, required=True, data_key='erasure_steps')
validation = Dict()
device_info = Dict()
method = Dict(required=True)
sanitize_version = String()
severity = EnumField(Severity, required=True)
@pre_load
def preload_datas(self, data: dict):
data['severity'] = Severity.Info.name
if not data.pop('result', False):
data['severity'] = Severity.Error.name
class Snapshot_lite(Thing): class Snapshot_lite(Thing):
uuid = String(required=True) uuid = String(required=True)
version = String(required=True) version = String(required=True)
schema_api = String(required=True) schema_api = String(required=True)
software = String(required=True) software = EnumField(
# software = EnumField( SnapshotSoftware,
# SnapshotSoftware, required=True,
# required=True, description='The software that generated this Snapshot.',
# description='The software that generated this Snapshot.', )
# )
sid = String(required=True) sid = String(required=True)
timestamp = String(required=True) timestamp = String(required=True)
settings_version = String(required=False) settings_version = String(required=False)