Merge branch 'testing' into feature/lot-unassigned

This commit is contained in:
Cayo Puigdefabregas 2021-08-16 11:15:30 +02:00
commit 16b6325c1b
19 changed files with 511 additions and 88 deletions

View File

@ -6,10 +6,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
ml).
## master
[1.0.7-beta]
[1.0.8-beta]
## testing
[1.0.8-beta]
[1.0.9-beta]
- [addend] #159 external document as proof of erase of disk
## [1.0.8-beta]
- [bugfix] #161 fixing DataStorage with bigInteger
## [1.0.7-beta]
- [addend] #158 support for encrypted snapshots data

View File

@ -1 +1 @@
__version__ = "1.0.8-beta"
__version__ = "1.0.9-beta"

View File

@ -0,0 +1,38 @@
"""TestDataStorage_bigIntegers
Revision ID: 0103a9c96b2d
Revises: 3a3601ac8224
Create Date: 2021-07-21 08:56:48.342503
"""
from alembic import op, context
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '0103a9c96b2d'
down_revision = '3a3601ac8224'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
op.alter_column('test_data_storage', 'reallocated_sector_count', type_=sa.BigInteger(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'power_cycle_count', type_=sa.Integer(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'reported_uncorrectable_errors', type_=sa.BigInteger(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'current_pending_sector_count', type_=sa.BigInteger(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'offline_uncorrectable', type_=sa.BigInteger(), schema=f'{get_inv()}')
def downgrade():
op.alter_column('test_data_storage', 'reallocated_sector_count', type_=sa.SmallInteger(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'power_cycle_count', type_=sa.SmallInteger(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'reported_uncorrectable_errors', type_=sa.Integer(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'current_pending_sector_count', type_=sa.Integer(), schema=f'{get_inv()}')
op.alter_column('test_data_storage', 'offline_uncorrectable', type_=sa.Integer(), schema=f'{get_inv()}')

View File

@ -0,0 +1,85 @@
"""documents
Revision ID: 7ecb8ff7abad
Revises: 3a3601ac8224
Create Date: 2021-07-19 14:46:42.375331
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
import citext
import teal
from alembic import op
from alembic import context
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '7ecb8ff7abad'
down_revision = '0103a9c96b2d'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade():
# Document table
op.create_table('document',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('updated', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False,
comment='The last time Document recorded a change for \n this thing.\n '),
sa.Column('created', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'),
nullable=False, comment='When Document created this.'),
sa.Column('document_type', sa.Unicode(), nullable=False),
sa.Column('date', sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column('id_document', sa.Unicode(), nullable=True),
sa.Column('owner_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('file_name', sa.Unicode(), nullable=False),
sa.Column('file_hash', sa.Unicode(), nullable=False),
sa.Column('url', sa.Unicode(), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['common.user.id'], ),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
)
op.create_index('generic_document_id', 'document', ['id'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
op.create_index(op.f('ix_document_created'), 'document', ['created'], unique=False, schema=f'{get_inv()}')
op.create_index(op.f('ix_document_updated'), 'document', ['updated'], unique=False, schema=f'{get_inv()}')
op.create_index('document_type_index', 'document', ['document_type'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
# DataWipeDocument table
op.create_table('data_wipe_document',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('software', sa.Unicode(), nullable=True),
sa.Column('success', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.document.id'], ),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
)
# DataWipe table
op.create_table('data_wipe',
sa.Column('document_id', sa.BigInteger(), nullable=False),
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['document_id'], [f'{get_inv()}.document.id'], ),
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
sa.PrimaryKeyConstraint('id'),
schema=f'{get_inv()}'
)
def downgrade():
op.drop_table('data_wipe', schema=f'{get_inv()}')
op.drop_table('data_wipe_document', schema=f'{get_inv()}')
op.drop_table('document', schema=f'{get_inv()}')

View File

@ -199,6 +199,11 @@ class ToPrepareDef(ActionDef):
SCHEMA = schemas.ToPrepare
class DataWipeDef(ActionDef):
VIEW = None
SCHEMA = schemas.DataWipe
class AllocateDef(ActionDef):
VIEW = AllocateView
SCHEMA = schemas.Allocate

View File

@ -754,12 +754,12 @@ class TestDataStorage(TestMixin, Test):
status = Column(Unicode(), check_lower('status'), nullable=False)
lifetime = Column(Interval)
assessment = Column(Boolean)
reallocated_sector_count = Column(SmallInteger)
power_cycle_count = Column(SmallInteger)
_reported_uncorrectable_errors = Column('reported_uncorrectable_errors', Integer)
reallocated_sector_count = Column(BigInteger)
power_cycle_count = Column(Integer)
_reported_uncorrectable_errors = Column('reported_uncorrectable_errors', BigInteger)
command_timeout = Column(BigInteger)
current_pending_sector_count = Column(Integer)
offline_uncorrectable = Column(Integer)
current_pending_sector_count = Column(BigInteger)
offline_uncorrectable = Column(BigInteger)
remaining_lifetime_percentage = Column(SmallInteger)
elapsed = Column(Interval, nullable=False)
@ -1327,6 +1327,20 @@ class ToPrepare(ActionWithMultipleDevices):
pass
class DataWipe(JoinedTableMixin, ActionWithMultipleDevices):
"""The device has been selected for insert one proof of erease disk.
"""
document_comment = """The user that gets the device due this deal."""
document_id = db.Column(BigInteger,
db.ForeignKey('data_wipe_document.id'),
nullable=False)
document = db.relationship('DataWipeDocument',
backref=backref('actions',
lazy=True,
cascade=CASCADE_OWN),
primaryjoin='DataWipe.document_id == DataWipeDocument.id')
class Prepare(ActionWithMultipleDevices):
"""Work has been performed to the device to a defined point of
acceptance.

View File

@ -17,6 +17,7 @@ from ereuse_devicehub.resources.action import models as m
from ereuse_devicehub.resources.agent import schemas as s_agent
from ereuse_devicehub.resources.device import schemas as s_device
from ereuse_devicehub.resources.tradedocument import schemas as s_document
from ereuse_devicehub.resources.documents import schemas as s_generic_document
from ereuse_devicehub.resources.enums import AppearanceRange, BiosAccessRange, FunctionalityRange, \
PhysicalErasureMethod, R_POSITIVE, RatingRange, \
Severity, SnapshotSoftware, TestDataStorageLength
@ -430,6 +431,11 @@ class Prepare(ActionWithMultipleDevices):
__doc__ = m.Prepare.__doc__
class DataWipe(ActionWithMultipleDevices):
__doc__ = m.DataWipe.__doc__
document = NestedOn(s_generic_document.DataWipeDocument, only_query='id')
class Live(ActionWithOneDevice):
__doc__ = m.Live.__doc__
"""

View File

@ -0,0 +1,52 @@
import copy
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.action.models import DataWipe
from ereuse_devicehub.resources.documents.models import DataWipeDocument
from ereuse_devicehub.resources.device.models import DataStorage
from ereuse_devicehub.resources.documents.schemas import DataWipeDocument as sh_document
from ereuse_devicehub.resources.hash_reports import ReportHash
class ErasedView():
"""Handler for manager the action register for add to a device one proof of erase
"""
def __init__(self, data, schema):
self.schema = schema
self.insert_document(copy.copy(data))
self.insert_action(copy.copy(data))
def post(self):
db.session().final_flush()
from flask import jsonify
ret = jsonify(self.erased)
ret.status_code = 201
db.session.commit()
return ret
def insert_document(self, data):
schema = sh_document()
[data.pop(x, None) for x in ['severity', 'devices', 'name', 'description']]
doc_data = schema.load(data)
self.document = DataWipeDocument(**doc_data)
db.session.add(self.document)
db_hash = ReportHash(hash3=self.document.file_hash)
db.session.add(db_hash)
def insert_action(self, data):
[data.pop(x, None) for x in ['url', 'documentId', 'filename', 'hash', 'software', 'success']]
self.data = self.schema.load(data)
for dev in self.data['devices']:
if not hasattr(dev, 'components'):
continue
for component in dev.components:
if isinstance(component, DataStorage):
self.data['devices'].add(component)
self.data['document'] = self.document
self.erased = DataWipe(**self.data)
db.session.add(self.erased)

View File

@ -18,6 +18,7 @@ from ereuse_devicehub.resources.action.models import (Action, Snapshot, VisualTe
Trade, Confirm, ConfirmRevoke, Revoke)
from ereuse_devicehub.resources.action.views import trade as trade_view
from ereuse_devicehub.resources.action.views.snapshot import SnapshotView, save_json, move_json
from ereuse_devicehub.resources.action.views.documents import ErasedView
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
from ereuse_devicehub.resources.enums import Severity
@ -250,6 +251,10 @@ class ActionView(View):
confirm_revoke = trade_view.ConfirmRevokeDocumentView(json, resource_def, self.schema)
return confirm_revoke.post()
if json['type'] == 'DataWipe':
erased = ErasedView(json, resource_def.schema)
return erased.post()
a = resource_def.schema.load(json)
Model = db.Model._decl_class_registry.data[json['type']]()
action = Model(**a)

View File

@ -696,6 +696,25 @@ class Computer(Device):
if privacy
)
@property
def external_document_erasure(self):
"""Returns the external ``DataStorage`` proof of erasure.
"""
from ereuse_devicehub.resources.action.models import DataWipe
urls = set()
try:
ev = self.last_action_of(DataWipe)
urls.add(ev.document.url.to_text())
except LookupError:
pass
for comp in self.components:
if isinstance(comp, DataStorage):
doc = comp.external_document_erasure
if doc:
urls.add(doc)
return urls
def add_mac_to_hid(self, components_snap=None):
"""Returns the Naming.hid with the first mac of network adapter,
following an alphabetical order.
@ -879,6 +898,17 @@ class DataStorage(JoinedComponentTableMixin, Component):
v += ' {} GB'.format(self.size // 1000 if self.size else '?')
return v
@property
def external_document_erasure(self):
"""Returns the external ``DataStorage`` proof of erasure.
"""
from ereuse_devicehub.resources.action.models import DataWipe
try:
ev = self.last_action_of(DataWipe)
return ev.document.url.to_text()
except LookupError:
return None
class HardDrive(DataStorage):
pass

View File

@ -52,7 +52,8 @@ class DeviceRow(OrderedDict):
self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = ''
self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = ''
for i, tag in zip(range(1, 3), device.tags):
self['Tag {} Type'.format(i)] = 'unamed' if tag.provider else 'named'
self['Tag {} Type'.format(
i)] = 'unamed' if tag.provider else 'named'
self['Tag {} ID'.format(i)] = tag.id
self['Tag {} Organization'.format(i)] = tag.org.name
@ -70,11 +71,13 @@ class DeviceRow(OrderedDict):
self['Updated in (web)'] = ''
try:
self['Physical state'] = device.last_action_of(*states.Physical.actions()).t
self['Physical state'] = device.last_action_of(
*states.Physical.actions()).t
except LookupError:
self['Physical state'] = ''
try:
self['Trading state'] = device.last_action_of(*states.Trading.actions()).t
self['Trading state'] = device.last_action_of(
*states.Trading.actions()).t
except LookupError:
self['Trading state'] = ''
if isinstance(device, d.Computer):
@ -108,7 +111,7 @@ class DeviceRow(OrderedDict):
self['RAM Range'] = ''
self['Data Storage Rate'] = ''
self['Data Storage Range'] = ''
self['Price'] = none2str(device.price)
benchram = get_action(device, 'BenchmarkRamSysbench')
@ -120,7 +123,7 @@ class DeviceRow(OrderedDict):
def components(self):
"""Function to get all components information of a device."""
assert isinstance(self.device, d.Computer)
for ctype in self.ORDER_COMPONENTS: # ctype: str
for ctype in self.ORDER_COMPONENTS: # ctype: str
cmax = self.NUMS.get(ctype, 4)
i = 1
l_ctype = [ctype]
@ -148,9 +151,11 @@ class DeviceRow(OrderedDict):
self['{} {} Model'.format(ctype, i)] = ''
self['{} {} Serial Number'.format(ctype, i)] = ''
else:
self['{} {} Manufacturer'.format(ctype, i)] = none2str(component.manufacturer)
self['{} {} Manufacturer'.format(ctype, i)] = none2str(
component.manufacturer)
self['{} {} Model'.format(ctype, i)] = none2str(component.model)
self['{} {} Serial Number'.format(ctype, i)] = none2str(component.serial_number)
self['{} {} Serial Number'.format(ctype, i)] = none2str(
component.serial_number)
if ctype == d.Processor.t:
self.get_processor(ctype, i, component)
@ -170,10 +175,12 @@ class DeviceRow(OrderedDict):
self['{} {} Number of cores'.format(ctype, i)] = ''
self['{} {} Speed (GHz)'.format(ctype, i)] = ''
self['Benchmark {} {} (points)'.format(ctype, i)] = ''
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = ''
self['Benchmark ProcessorSysbench {} {} (points)'.format(
ctype, i)] = ''
return
self['{} {} Number of cores'.format(ctype, i)] = none2str(component.cores)
self['{} {} Number of cores'.format(
ctype, i)] = none2str(component.cores)
self['{} {} Speed (GHz)'.format(ctype, i)] = none2str(component.speed)
benchmark = get_action(component, 'BenchmarkProcessor')
@ -184,9 +191,11 @@ class DeviceRow(OrderedDict):
sysbench = get_action(component, 'BenchmarkProcessorSysbench')
if not sysbench:
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = ''
self['Benchmark ProcessorSysbench {} {} (points)'.format(
ctype, i)] = ''
return
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = sysbench.rate
self['Benchmark ProcessorSysbench {} {} (points)'.format(
ctype, i)] = sysbench.rate
def get_ram(self, ctype, i, component):
"""Particular fields for component Ram Module."""
@ -202,6 +211,7 @@ class DeviceRow(OrderedDict):
"""Particular fields for component DataStorage.
A DataStorage can be HardDrive or SolidStateDrive.
"""
if component is None:
self['{} {} Size (MB)'.format(ctype, i)] = ''
self['Erasure {} {}'.format(ctype, i)] = ''
@ -209,6 +219,7 @@ class DeviceRow(OrderedDict):
self['Erasure {} {} Size (MB)'.format(ctype, i)] = ''
self['Erasure {} {} Software'.format(ctype, i)] = ''
self['Erasure {} {} Result'.format(ctype, i)] = ''
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
self['Erasure {} {} Type'.format(ctype, i)] = ''
self['Erasure {} {} Method'.format(ctype, i)] = ''
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
@ -222,7 +233,8 @@ class DeviceRow(OrderedDict):
self['Test {} {} Type'.format(ctype, i)] = ''
self['Test {} {} Result'.format(ctype, i)] = ''
self['Test {} {} Power on (hours used)'.format(ctype, i)] = ''
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = ''
self['Test {} {} Lifetime remaining (percentage)'.format(
ctype, i)] = ''
return
snapshot = get_action(component, 'Snapshot')
@ -233,15 +245,38 @@ class DeviceRow(OrderedDict):
self['{} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
erasures = [a for a in component.actions if a.type in ['EraseBasic', 'EraseSectors']]
erasures = [a for a in component.actions if a.type in [
'EraseBasic', 'EraseSectors', 'DataWipe']]
erasure = erasures[-1] if erasures else None
if not erasure:
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
serial_number = none2str(component.serial_number)
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
self['Erasure {} {} Serial Number'.format(
ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(
ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(ctype, i)] = ''
self['Erasure {} {} Result'.format(ctype, i)] = ''
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
self['Erasure {} {} Type'.format(ctype, i)] = ''
self['Erasure {} {} Method'.format(ctype, i)] = ''
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
self['Erasure {} {} Date'.format(ctype, i)] = ''
self['Erasure {} {} Steps'.format(ctype, i)] = ''
self['Erasure {} {} Steps Start Time'.format(ctype, i)] = ''
self['Erasure {} {} Steps End Time'.format(ctype, i)] = ''
elif hasattr(erasure, 'type') and erasure.type == 'DataWipe':
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
serial_number = none2str(component.serial_number)
self['Erasure {} {} Serial Number'.format(
ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(
ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(
ctype, i)] = erasure.document.software
self['Erasure {} {} Result'.format(ctype, i)] = get_result(erasure)
self['Erasure {} {} Certificate URL'.format(
ctype, i)] = erasure.document.url.to_text()
self['Erasure {} {} Type'.format(ctype, i)] = ''
self['Erasure {} {} Method'.format(ctype, i)] = ''
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
@ -252,22 +287,31 @@ class DeviceRow(OrderedDict):
else:
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
serial_number = none2str(component.serial_number)
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
self['Erasure {} {} Serial Number'.format(
ctype, i)] = serial_number
self['Erasure {} {} Size (MB)'.format(
ctype, i)] = none2str(component.size)
self['Erasure {} {} Software'.format(ctype, i)] = software
result = get_result(erasure.severity)
result = get_result(erasure)
self['Erasure {} {} Result'.format(ctype, i)] = result
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
self['Erasure {} {} Type'.format(ctype, i)] = erasure.type
self['Erasure {} {} Method'.format(ctype, i)] = erasure.method
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = format(erasure.elapsed)
self['Erasure {} {} Date'.format(ctype, i)] = format(erasure.created)
self['Erasure {} {} Elapsed (hours)'.format(
ctype, i)] = format(erasure.elapsed)
self['Erasure {} {} Date'.format(
ctype, i)] = format(erasure.created)
steps = ','.join((format(x) for x in erasure.steps))
self['Erasure {} {} Steps'.format(ctype, i)] = steps
steps_start_time = ','.join((format(x.start_time) for x in erasure.steps))
self['Erasure {} {} Steps Start Time'.format(ctype, i)] = steps_start_time
steps_end_time = ','.join((format(x.end_time) for x in erasure.steps))
self['Erasure {} {} Steps End Time'.format(ctype, i)] = steps_end_time
steps_start_time = ','.join(
(format(x.start_time) for x in erasure.steps))
self['Erasure {} {} Steps Start Time'.format(
ctype, i)] = steps_start_time
steps_end_time = ','.join((format(x.end_time)
for x in erasure.steps))
self['Erasure {} {} Steps End Time'.format(
ctype, i)] = steps_end_time
benchmark = get_action(component, 'BenchmarkDataStorage')
if not benchmark:
@ -285,12 +329,14 @@ class DeviceRow(OrderedDict):
self['Test {} {} Type'.format(ctype, i)] = ''
self['Test {} {} Result'.format(ctype, i)] = ''
self['Test {} {} Power on (hours used)'.format(ctype, i)] = ''
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = ''
self['Test {} {} Lifetime remaining (percentage)'.format(
ctype, i)] = ''
return
self['Test {} {} Software'.format(ctype, i)] = software
self['Test {} {} Type'.format(ctype, i)] = test_storage.length.value
self['Test {} {} Result'.format(ctype, i)] = get_result(test_storage.severity)
self['Test {} {} Result'.format(ctype, i)] = get_result(
test_storage)
self['Test {} {} Power on (hours used)'.format(ctype, i)] = none2str(
test_storage.power_cycle_count)
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = none2str(
@ -319,7 +365,8 @@ class StockRow(OrderedDict):
self['Manufacturer'] = none2str(device.manufacturer)
self['Registered in'] = format(device.created, '%c')
try:
self['Physical state'] = device.last_action_of(*states.Physical.actions()).t
self['Physical state'] = device.last_action_of(
*states.Physical.actions()).t
except LookupError:
self['Physical state'] = ''
try:
@ -343,15 +390,21 @@ class StockRow(OrderedDict):
self['Data Storage Range'] = rate.data_storage_range
def get_result(severity):
def get_result(erasure):
""" For the csv is necessary simplify the message of results """
if hasattr(erasure, 'type') and erasure.type == 'DataWipe':
if erasure.document.success:
return 'Success'
return 'Failure'
type_of_results = {
Severity.Error: 'Failure',
Severity.Warning: 'Success with Warnings',
Severity.Notice: 'Success',
Severity.Info: 'Success'
}
return type_of_results[severity]
return type_of_results[erasure.severity]
def none2str(string):

View File

@ -90,6 +90,7 @@ class DocumentView(DeviceView):
res = flask.make_response(template)
return res
@staticmethod
def erasure(query: db.Query):
def erasures():
@ -274,7 +275,13 @@ class StampsView(View):
ok = '100% coincidence. The attached file contains data 100% existing in \
to our backend'
result = ('Bad', bad)
if file_check.mimetype in ['text/csv', 'application/pdf']:
mime = ['text/csv', 'application/pdf', 'text/plain','text/markdown',
'image/jpeg', 'image/png', 'text/html',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.oasis.opendocument.spreadsheet',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/msword']
if file_check.mimetype in mime:
if verify_hash(file_check):
result = ('Ok', ok)
@ -291,7 +298,6 @@ class InternalStatsView(DeviceView):
evs.Action.type.in_(('Snapshot', 'Live', 'Allocate', 'Deallocate')))
return self.generate_post_csv(query)
def generate_post_csv(self, query):
d = {}
for ac in query:

View File

@ -0,0 +1,57 @@
from citext import CIText
from flask import g
from sqlalchemy import BigInteger, Column, Sequence, Unicode, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.dialects.postgresql import UUID
from teal.db import URL
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.user.models import User
from ereuse_devicehub.resources.models import Thing, STR_SM_SIZE
class Document(Thing):
"""This represent a generic document."""
id = Column(BigInteger, Sequence('device_seq'), primary_key=True)
id.comment = """The identifier of the device for this database. Used only
internally for software; users should not use this.
"""
document_type = Column(Unicode(STR_SM_SIZE), nullable=False)
date = Column(db.DateTime, nullable=True)
date.comment = """The date of document, some documents need to have one date
"""
id_document = Column(CIText(), nullable=True)
id_document.comment = """The id of one document like invoice so they can be linked."""
owner_id = db.Column(UUID(as_uuid=True),
db.ForeignKey(User.id),
nullable=False,
default=lambda: g.user.id)
owner = db.relationship(User, primaryjoin=owner_id == User.id)
file_name = Column(db.CIText(), nullable=False)
file_name.comment = """This is the name of the file when user up the document."""
file_hash = Column(db.CIText(), nullable=False)
file_hash.comment = """This is the hash of the file produced from frontend."""
url = db.Column(URL(), nullable=True)
url.comment = """This is the url where resides the document."""
def __str__(self) -> str:
return '{0.file_name}'.format(self)
class JoinedTableMixin:
# noinspection PyMethodParameters
@declared_attr
def id(cls):
return Column(BigInteger, ForeignKey(Document.id), primary_key=True)
class DataWipeDocument(JoinedTableMixin, Document):
"""This represent a generic document."""
software = Column(CIText(), nullable=True)
software.comment = """Which software is used"""
success = Column(Boolean, default=False)
success.comment = """If the erase was success"""
def __str__(self) -> str:
return '{0.file_name}'.format(self)

View File

@ -0,0 +1,32 @@
from marshmallow.fields import DateTime, Integer, validate, Boolean
from marshmallow import post_load
from teal.marshmallow import SanitizedStr, URL
from ereuse_devicehub.resources.schemas import Thing
from ereuse_devicehub.resources.documents import models as m
class DataWipeDocument(Thing):
__doc__ = m.DataWipeDocument.__doc__
id = Integer(description=m.DataWipeDocument.id.comment, dump_only=True)
url = URL(required= False, description=m.DataWipeDocument.url.comment)
success = Boolean(required=False, default=False, description=m.DataWipeDocument.success.comment)
software = SanitizedStr(description=m.DataWipeDocument.software.comment)
date = DateTime(data_key='endTime',
required=False,
description=m.DataWipeDocument.date.comment)
id_document = SanitizedStr(data_key='documentId',
required=False,
default='',
description=m.DataWipeDocument.id_document.comment)
file_name = SanitizedStr(data_key='filename',
default='',
description=m.DataWipeDocument.file_name.comment,
validate=validate.Length(max=100))
file_hash = SanitizedStr(data_key='hash',
default='',
description=m.DataWipeDocument.file_hash.comment,
validate=validate.Length(max=64))
@post_load
def get_trade_document(self, data):
data['document_type'] = 'DataWipeDocument'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -6,6 +6,7 @@ import copy
import pytest
from datetime import datetime, timedelta
from io import BytesIO
from dateutil.tz import tzutc
from decimal import Decimal
from typing import Tuple, Type
@ -2408,3 +2409,32 @@ def test_trade_case14(user: UserClient, user2: UserClient):
assert device.actions[-4].user == trade.user_from
assert device.actions[-5].t == 'Trade'
assert device.actions[-5].author == trade.user_to
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_action_web_erase(user: UserClient, client: Client):
import hashlib
from ereuse_devicehub.resources.documents import documents
bfile = BytesIO(b'abc')
hash3 = hashlib.sha3_256(bfile.read()).hexdigest()
snap, _ = user.post(file('acer.happy.battery.snapshot'), res=models.Snapshot)
request = {'type': 'DataWipe', 'devices': [snap['device']['id']], 'name': 'borrado universal', 'severity': 'Info', 'description': 'nada que describir', 'url': 'http://www.google.com/', 'documentId': '33', 'endTime': '2021-07-07T22:00:00.000Z', 'filename': 'Certificado de borrado1.pdf', 'hash': hash3, 'success': 1, 'software': "Blanco"}
user.post(res=models.Action, data=request)
action = models.DataWipe.query.one()
for dev in action.devices:
assert action in dev.actions
assert action.document.file_hash == request['hash']
bfile = BytesIO(b'abc')
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(bfile, 'example.csv')]},
status=200)
assert "alert alert-info" in response
assert "100% coincidence." in response
assert not "alert alert-danger" in response

View File

@ -122,4 +122,4 @@ def test_api_docs(client: Client):
'scheme': 'basic',
'name': 'Authorization'
}
assert len(docs['definitions']) == 125
assert len(docs['definitions']) == 126

View File

@ -33,19 +33,22 @@ def test_erasure_certificate_public_one(user: UserClient, client: Client):
snapshot, _ = user.post(s, res=Snapshot)
doc, response = user.get(res=documents.DocumentDef.t,
item='erasures/{}'.format(snapshot['device']['id']),
accept=ANY)
item='erasures/{}'.format(
snapshot['device']['id']),
accept=ANY)
assert 'html' in response.content_type
assert '<html' in doc
assert '2018' in doc
doc, response = client.get(res=documents.DocumentDef.t,
item='erasures/{}'.format(snapshot['device']['id']),
item='erasures/{}'.format(
snapshot['device']['id']),
query=[('format', 'PDF')],
accept='application/pdf')
assert 'application/pdf' == response.content_type
erasure = next(e for e in snapshot['actions'] if e['type'] == 'EraseSectors')
erasure = next(e for e in snapshot['actions']
if e['type'] == 'EraseSectors')
doc, response = client.get(res=documents.DocumentDef.t,
item='erasures/{}'.format(erasure['id']),
@ -65,7 +68,8 @@ def test_erasure_certificate_private_query(user: UserClient):
doc, response = user.get(res=documents.DocumentDef.t,
item='erasures/',
query=[('filter', {'id': [snapshot['device']['id']]})],
query=[
('filter', {'id': [snapshot['device']['id']]})],
accept=ANY)
assert 'html' in response.content_type
assert '<html' in doc
@ -74,7 +78,8 @@ def test_erasure_certificate_private_query(user: UserClient):
doc, response = user.get(res=documents.DocumentDef.t,
item='erasures/',
query=[
('filter', {'id': [snapshot['device']['id']]}),
('filter', {
'id': [snapshot['device']['id']]}),
('format', 'PDF')
],
accept='application/pdf')
@ -92,19 +97,19 @@ def test_export_csv_permitions(user: UserClient, user2: UserClient, client: Clie
"""test export device information in a csv file with others users."""
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
csv_user, _ = user.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
_, res = client.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})], status=401)
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})], status=401)
assert res.status_code == 401
assert len(csv_user) > 0
@ -122,30 +127,31 @@ def test_export_csv_actions(user: UserClient, user2: UserClient, client: Client)
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
}
user.post(res=Allocate, data=post_request)
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action = [a for a in hdd['actions']
if a['type'] == 'TestDataStorage'][0]
hdd_action['lifetime'] += 1000
acer.pop('elapsed')
acer['licence_version'] = '1.0.0'
snapshot, _ = client.post(acer, res=Live)
csv_user, _ = user.get(res=documents.DocumentDef.t,
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
_, res = client.get(res=documents.DocumentDef.t,
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})], status=401)
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})], status=401)
assert res.status_code == 401
assert len(csv_user) > 0
@ -164,21 +170,22 @@ def test_live_export_csv2(user: UserClient, client: Client, app: Devicehub):
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
}
user.post(res=Allocate, data=post_request)
acer = yaml2json('acer-happy.live-test1')
live, _ = client.post(acer, res=Live)
csv_user, _ = user.get(res=documents.DocumentDef.t,
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
assert "4692" in csv_user
assert "8692" in csv_user
assert "DevicehubID" in csv_user
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_live_example2(user: UserClient, client: Client, app: Devicehub):
@ -211,6 +218,7 @@ def test_export_basic_snapshot(user: UserClient):
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
f = StringIO(csv_str)
obj_csv = csv.reader(f, f, delimiter=';', quotechar='"')
export_csv = list(obj_csv)
@ -275,9 +283,6 @@ def test_export_extended(app: Devicehub, user: UserClient):
obj_csv = csv.reader(f, f, delimiter=';', quotechar='"')
export_csv = list(obj_csv)
ff= open('ba.csv', 'w')
ff.write(csv_str)
ff.close()
# Open fixture csv and transform to list
with Path(__file__).parent.joinpath('files').joinpath(
'proposal_extended_csv_report.csv').open() as csv_file:
@ -290,18 +295,18 @@ def test_export_extended(app: Devicehub, user: UserClient):
assert fixture_csv[0] == export_csv[0], 'Headers are not equal'
assert fixture_csv[1][:19] == export_csv[1][:19], 'Computer information are not equal'
assert fixture_csv[1][20] == export_csv[1][20], 'Computer information are not equal'
assert fixture_csv[1][22:81] == export_csv[1][22:81], 'Computer information are not equal'
assert fixture_csv[1][82] == export_csv[1][82], 'Computer information are not equal'
assert fixture_csv[1][85:] == export_csv[1][85:], 'Computer information are not equal'
assert fixture_csv[1][22:82] == export_csv[1][22:82], 'Computer information are not equal'
assert fixture_csv[1][83] == export_csv[1][83], 'Computer information are not equal'
assert fixture_csv[1][86:] == export_csv[1][86:], 'Computer information are not equal'
assert fixture_csv[2][:19] == export_csv[2][:19], 'Computer information are not equal'
assert fixture_csv[2][20] == export_csv[2][20], 'Computer information are not equal'
assert fixture_csv[2][22:81] == export_csv[2][22:81], 'Computer information are not equal'
assert fixture_csv[2][82] == export_csv[2][82], 'Computer information are not equal'
assert fixture_csv[2][85:105] == export_csv[2][85:105], 'Computer information are not equal'
assert fixture_csv[2][106] == export_csv[2][106], 'Computer information are not equal'
assert fixture_csv[2][109:129] == export_csv[2][109:129], 'Computer information are not equal'
assert fixture_csv[2][130] == export_csv[2][130], 'Computer information are not equal'
assert fixture_csv[2][133:] == export_csv[2][133:], 'Computer information are not equal'
assert fixture_csv[2][22:82] == export_csv[2][22:82], 'Computer information are not equal'
assert fixture_csv[2][83] == export_csv[2][83], 'Computer information are not equal'
assert fixture_csv[2][86:106] == export_csv[2][86:106], 'Computer information are not equal'
assert fixture_csv[2][108] == export_csv[2][108], 'Computer information are not equal'
assert fixture_csv[2][111:131] == export_csv[2][111:131], 'Computer information are not equal'
assert fixture_csv[2][131] == export_csv[2][131], 'Computer information are not equal'
assert fixture_csv[2][136:] == export_csv[2][136:], 'Computer information are not equal'
@pytest.mark.mvp