devicehub-django/device/models.py

338 lines
9.9 KiB
Python

import json
from django.db import models, connection
from utils.constants import ALGOS
from evidence.models import Annotation, Evidence
from lot.models import DeviceLot
class Device:
class Types(models.TextChoices):
DESKTOP = "Desktop"
LAPTOP = "Laptop"
SERVER = "Server"
GRAPHICCARD = "GraphicCard"
HARDDRIVE = "HardDrive"
SOLIDSTATEDRIVE = "SolidStateDrive"
MOTHERBOARD = "Motherboard"
NETWORKADAPTER = "NetworkAdapter"
PROCESSOR = "Processor"
RAMMODULE = "RamModule"
SOUNDCARD = "SoundCard"
DISPLAY = "Display"
BATTERY = "Battery"
CAMERA = "Camera"
def __init__(self, *args, **kwargs):
# the id is the chid of the device
self.id = kwargs["id"]
self.pk = self.id
self.shortid = self.pk[:6].upper()
self.algorithm = None
self.owner = None
self.annotations = []
self.hids = []
self.uuids = []
self.evidences = []
self.lots = []
self.last_evidence = None
self.get_last_evidence()
def initial(self):
self.get_annotations()
self.get_uuids()
self.get_hids()
self.get_evidences()
self.get_lots()
def get_annotations(self):
if self.annotations:
return self.annotations
self.annotations = Annotation.objects.filter(
type=Annotation.Type.SYSTEM,
value=self.id
).order_by("-created")
if self.annotations.count():
self.algorithm = self.annotations[0].key
self.owner = self.annotations[0].owner
return self.annotations
def get_user_annotations(self):
if not self.uuids:
self.get_uuids()
annotations = Annotation.objects.filter(
uuid__in=self.uuids,
owner=self.owner,
type=Annotation.Type.USER
)
return annotations
def get_user_documents(self):
if not self.uuids:
self.get_uuids()
annotations = Annotation.objects.filter(
uuid__in=self.uuids,
owner=self.owner,
type=Annotation.Type.DOCUMENT
)
return annotations
def get_uuids(self):
for a in self.get_annotations():
if a.uuid not in self.uuids:
self.uuids.append(a.uuid)
def get_hids(self):
annotations = self.get_annotations()
algos = list(ALGOS.keys())
algos.append('CUSTOM_ID')
self.hids = list(set(annotations.filter(
type=Annotation.Type.SYSTEM,
key__in=algos,
).values_list("value", flat=True)))
def get_evidences(self):
if not self.uuids:
self.get_uuids()
self.evidences = [Evidence(u) for u in self.uuids]
def get_last_evidence(self):
annotations = self.get_annotations()
if not annotations.count():
return
annotation = annotations.first()
self.last_evidence = Evidence(annotation.uuid)
def is_eraseserver(self):
if not self.uuids:
self.get_uuids()
if not self.uuids:
return False
annotation = Annotation.objects.filter(
uuid__in=self.uuids,
owner=self.owner,
type=Annotation.Type.ERASE_SERVER
).first()
if annotation:
return True
return False
def last_uuid(self):
return self.uuids[0]
def get_lots(self):
self.lots = [
x.lot for x in DeviceLot.objects.filter(device_id=self.id)]
@classmethod
def get_unassigned(cls, institution, offset=0, limit=None):
sql = """
WITH RankedAnnotations AS (
SELECT
t1.value,
t1.key,
ROW_NUMBER() OVER (
PARTITION BY t1.uuid
ORDER BY
CASE
WHEN t1.key = 'CUSTOM_ID' THEN 1
WHEN t1.key = 'hidalgo1' THEN 2
ELSE 3
END,
t1.created DESC
) AS row_num
FROM evidence_annotation AS t1
LEFT JOIN lot_devicelot AS t2 ON t1.value = t2.device_id
WHERE t2.device_id IS NULL
AND t1.owner_id = {institution}
AND t1.type = {type}
)
SELECT DISTINCT
value
FROM
RankedAnnotations
WHERE
row_num = 1
""".format(
institution=institution.id,
type=Annotation.Type.SYSTEM,
)
if limit:
sql += " limit {} offset {}".format(int(limit), int(offset))
sql += ";"
annotations = []
with connection.cursor() as cursor:
cursor.execute(sql)
annotations = cursor.fetchall()
devices = [cls(id=x[0]) for x in annotations]
count = cls.get_unassigned_count(institution)
return devices, count
@classmethod
def get_unassigned_count(cls, institution):
sql = """
WITH RankedAnnotations AS (
SELECT
t1.value,
t1.key,
ROW_NUMBER() OVER (
PARTITION BY t1.uuid
ORDER BY
CASE
WHEN t1.key = 'CUSTOM_ID' THEN 1
WHEN t1.key = 'hidalgo1' THEN 2
ELSE 3
END,
t1.created DESC
) AS row_num
FROM evidence_annotation AS t1
LEFT JOIN lot_devicelot AS t2 ON t1.value = t2.device_id
WHERE t2.device_id IS NULL
AND t1.owner_id = {institution}
AND t1.type = {type}
)
SELECT
COUNT(DISTINCT value)
FROM
RankedAnnotations
WHERE
row_num = 1
""".format(
institution=institution.id,
type=Annotation.Type.SYSTEM,
)
with connection.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()[0][0]
@classmethod
def get_annotation_from_uuid(cls, uuid, institution):
sql = """
WITH RankedAnnotations AS (
SELECT
t1.value,
t1.key,
ROW_NUMBER() OVER (
PARTITION BY t1.uuid
ORDER BY
CASE
WHEN t1.key = 'CUSTOM_ID' THEN 1
WHEN t1.key = 'hidalgo1' THEN 2
ELSE 3
END,
t1.created DESC
) AS row_num
FROM evidence_annotation AS t1
LEFT JOIN lot_devicelot AS t2 ON t1.value = t2.device_id
WHERE t2.device_id IS NULL
AND t1.owner_id = {institution}
AND t1.type = {type}
AND t1.uuid = '{uuid}'
)
SELECT DISTINCT
value
FROM
RankedAnnotations
WHERE
row_num = 1;
""".format(
uuid=uuid.replace("-", ""),
institution=institution.id,
type=Annotation.Type.SYSTEM,
)
annotations = []
with connection.cursor() as cursor:
cursor.execute(sql)
annotations = cursor.fetchall()
return cls(id=annotations[0][0])
@property
def is_websnapshot(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.doc['type'] == "WebSnapshot"
@property
def last_user_evidence(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.doc['kv'].items()
@property
def manufacturer(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_manufacturer()
@property
def serial_number(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_serial_number()
@property
def type(self):
if self.last_evidence.doc['type'] == "WebSnapshot":
return self.last_evidence.doc.get("device", {}).get("type", "")
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_chassis()
@property
def model(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_model()
@property
def components(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_components()
def get_components_data(self, is_user_authenticated):
if is_user_authenticated:
return self.components
public_components = [component.copy() for component in self.components]
self.remove_sensitive_data_from(public_components)
return public_components
def remove_sensitive_data_from(self, components):
for component in components:
component.pop('SerialNumber', None)
component.pop('serial_number', None)
def get_device_data(self, should_include_sensitive_fields):
data = {
'id': self.id,
'shortid': self.shortid,
'uuids': self.uuids,
'hids': self.hids,
'components': self.get_components_data(should_include_sensitive_fields),
}
if should_include_sensitive_fields:
data.update({
'serial_number': self.serial_number,
})
return data