Merge branch 'feature/document-erased' into feature/container
This commit is contained in:
commit
9217f7392a
|
@ -10,6 +10,7 @@ ml).
|
||||||
|
|
||||||
## testing
|
## testing
|
||||||
[1.0.9-beta]
|
[1.0.9-beta]
|
||||||
|
- [addend] #159 external document as proof of erase of disk
|
||||||
|
|
||||||
|
|
||||||
## [1.0.8-beta]
|
## [1.0.8-beta]
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
"""documents
|
||||||
|
|
||||||
|
Revision ID: 7ecb8ff7abad
|
||||||
|
Revises: 3a3601ac8224
|
||||||
|
Create Date: 2021-07-19 14:46:42.375331
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
import sqlalchemy_utils
|
||||||
|
import citext
|
||||||
|
import teal
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
from alembic import context
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '7ecb8ff7abad'
|
||||||
|
down_revision = '0103a9c96b2d'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_inv():
|
||||||
|
INV = context.get_x_argument(as_dictionary=True).get('inventory')
|
||||||
|
if not INV:
|
||||||
|
raise ValueError("Inventory value is not specified")
|
||||||
|
return INV
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# Document table
|
||||||
|
op.create_table('document',
|
||||||
|
sa.Column('id', sa.BigInteger(), nullable=False),
|
||||||
|
sa.Column('updated', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'),
|
||||||
|
nullable=False,
|
||||||
|
comment='The last time Document recorded a change for \n this thing.\n '),
|
||||||
|
sa.Column('created', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'),
|
||||||
|
nullable=False, comment='When Document created this.'),
|
||||||
|
sa.Column('type', sa.Unicode(), nullable=False),
|
||||||
|
sa.Column('date', sa.TIMESTAMP(timezone=True), nullable=True),
|
||||||
|
sa.Column('id_document', sa.Unicode(), nullable=True),
|
||||||
|
sa.Column('owner_id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||||
|
sa.Column('file_name', sa.Unicode(), nullable=False),
|
||||||
|
sa.Column('file_hash', sa.Unicode(), nullable=False),
|
||||||
|
sa.Column('url', sa.Unicode(), nullable=True),
|
||||||
|
|
||||||
|
sa.ForeignKeyConstraint(['owner_id'], ['common.user.id'], ),
|
||||||
|
sa.PrimaryKeyConstraint('id'),
|
||||||
|
schema=f'{get_inv()}'
|
||||||
|
)
|
||||||
|
op.create_index('generic_document_id', 'document', ['id'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
|
||||||
|
op.create_index(op.f('ix_document_created'), 'document', ['created'], unique=False, schema=f'{get_inv()}')
|
||||||
|
op.create_index(op.f('ix_document_updated'), 'document', ['updated'], unique=False, schema=f'{get_inv()}')
|
||||||
|
op.create_index('document_type_index', 'document', ['type'], unique=False, postgresql_using='hash', schema=f'{get_inv()}')
|
||||||
|
|
||||||
|
|
||||||
|
# DataWipeDocument table
|
||||||
|
op.create_table('data_wipe_document',
|
||||||
|
sa.Column('id', sa.BigInteger(), nullable=False),
|
||||||
|
sa.Column('software', sa.Unicode(), nullable=True),
|
||||||
|
sa.Column('success', sa.Boolean(), nullable=False),
|
||||||
|
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.document.id'], ),
|
||||||
|
sa.PrimaryKeyConstraint('id'),
|
||||||
|
schema=f'{get_inv()}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# DataWipe table
|
||||||
|
op.create_table('data_wipe',
|
||||||
|
sa.Column('document_id', sa.BigInteger(), nullable=False),
|
||||||
|
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||||
|
sa.ForeignKeyConstraint(['document_id'], [f'{get_inv()}.document.id'], ),
|
||||||
|
sa.ForeignKeyConstraint(['id'], [f'{get_inv()}.action.id'], ),
|
||||||
|
sa.PrimaryKeyConstraint('id'),
|
||||||
|
schema=f'{get_inv()}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.drop_table('data_wipe', schema=f'{get_inv()}')
|
||||||
|
op.drop_table('data_wipe_document', schema=f'{get_inv()}')
|
||||||
|
op.drop_table('document', schema=f'{get_inv()}')
|
|
@ -199,6 +199,11 @@ class ToPrepareDef(ActionDef):
|
||||||
SCHEMA = schemas.ToPrepare
|
SCHEMA = schemas.ToPrepare
|
||||||
|
|
||||||
|
|
||||||
|
class DataWipeDef(ActionDef):
|
||||||
|
VIEW = None
|
||||||
|
SCHEMA = schemas.DataWipe
|
||||||
|
|
||||||
|
|
||||||
class AllocateDef(ActionDef):
|
class AllocateDef(ActionDef):
|
||||||
VIEW = AllocateView
|
VIEW = AllocateView
|
||||||
SCHEMA = schemas.Allocate
|
SCHEMA = schemas.Allocate
|
||||||
|
|
|
@ -1327,6 +1327,20 @@ class ToPrepare(ActionWithMultipleDevices):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class DataWipe(JoinedTableMixin, ActionWithMultipleDevices):
|
||||||
|
"""The device has been selected for insert one proof of erease disk.
|
||||||
|
"""
|
||||||
|
document_comment = """The user that gets the device due this deal."""
|
||||||
|
document_id = db.Column(BigInteger,
|
||||||
|
db.ForeignKey('data_wipe_document.id'),
|
||||||
|
nullable=False)
|
||||||
|
document = db.relationship('DataWipeDocument',
|
||||||
|
backref=backref('actions',
|
||||||
|
lazy=True,
|
||||||
|
cascade=CASCADE_OWN),
|
||||||
|
primaryjoin='DataWipe.document_id == DataWipeDocument.id')
|
||||||
|
|
||||||
|
|
||||||
class Prepare(ActionWithMultipleDevices):
|
class Prepare(ActionWithMultipleDevices):
|
||||||
"""Work has been performed to the device to a defined point of
|
"""Work has been performed to the device to a defined point of
|
||||||
acceptance.
|
acceptance.
|
||||||
|
|
|
@ -17,6 +17,7 @@ from ereuse_devicehub.resources.action import models as m
|
||||||
from ereuse_devicehub.resources.agent import schemas as s_agent
|
from ereuse_devicehub.resources.agent import schemas as s_agent
|
||||||
from ereuse_devicehub.resources.device import schemas as s_device
|
from ereuse_devicehub.resources.device import schemas as s_device
|
||||||
from ereuse_devicehub.resources.tradedocument import schemas as s_document
|
from ereuse_devicehub.resources.tradedocument import schemas as s_document
|
||||||
|
from ereuse_devicehub.resources.documents import schemas as s_generic_document
|
||||||
from ereuse_devicehub.resources.enums import AppearanceRange, BiosAccessRange, FunctionalityRange, \
|
from ereuse_devicehub.resources.enums import AppearanceRange, BiosAccessRange, FunctionalityRange, \
|
||||||
PhysicalErasureMethod, R_POSITIVE, RatingRange, \
|
PhysicalErasureMethod, R_POSITIVE, RatingRange, \
|
||||||
Severity, SnapshotSoftware, TestDataStorageLength
|
Severity, SnapshotSoftware, TestDataStorageLength
|
||||||
|
@ -430,6 +431,11 @@ class Prepare(ActionWithMultipleDevices):
|
||||||
__doc__ = m.Prepare.__doc__
|
__doc__ = m.Prepare.__doc__
|
||||||
|
|
||||||
|
|
||||||
|
class DataWipe(ActionWithMultipleDevices):
|
||||||
|
__doc__ = m.DataWipe.__doc__
|
||||||
|
document = NestedOn(s_generic_document.DataWipeDocument, only_query='id')
|
||||||
|
|
||||||
|
|
||||||
class Live(ActionWithOneDevice):
|
class Live(ActionWithOneDevice):
|
||||||
__doc__ = m.Live.__doc__
|
__doc__ = m.Live.__doc__
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
import copy
|
||||||
|
|
||||||
|
from ereuse_devicehub.db import db
|
||||||
|
from ereuse_devicehub.resources.action.models import DataWipe
|
||||||
|
from ereuse_devicehub.resources.documents.models import DataWipeDocument
|
||||||
|
from ereuse_devicehub.resources.device.models import DataStorage
|
||||||
|
from ereuse_devicehub.resources.documents.schemas import DataWipeDocument as sh_document
|
||||||
|
from ereuse_devicehub.resources.hash_reports import ReportHash
|
||||||
|
|
||||||
|
|
||||||
|
class ErasedView():
|
||||||
|
"""Handler for manager the action register for add to a device one proof of erase
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, data, schema):
|
||||||
|
self.schema = schema
|
||||||
|
self.insert_document(copy.copy(data))
|
||||||
|
self.insert_action(copy.copy(data))
|
||||||
|
|
||||||
|
def post(self):
|
||||||
|
db.session().final_flush()
|
||||||
|
from flask import jsonify
|
||||||
|
ret = jsonify(self.erased)
|
||||||
|
ret.status_code = 201
|
||||||
|
db.session.commit()
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def insert_document(self, data):
|
||||||
|
schema = sh_document()
|
||||||
|
[data.pop(x, None) for x in ['severity', 'devices', 'name', 'description']]
|
||||||
|
doc_data = schema.load(data)
|
||||||
|
doc_data['type'] = 'DataWipe'
|
||||||
|
self.document = DataWipeDocument(**doc_data)
|
||||||
|
db.session.add(self.document)
|
||||||
|
|
||||||
|
db_hash = ReportHash(hash3=self.document.file_hash)
|
||||||
|
db.session.add(db_hash)
|
||||||
|
|
||||||
|
def insert_action(self, data):
|
||||||
|
[data.pop(x, None) for x in ['url', 'documentId', 'filename', 'hash', 'software', 'success']]
|
||||||
|
self.data = self.schema.load(data)
|
||||||
|
|
||||||
|
for dev in self.data['devices']:
|
||||||
|
if not hasattr(dev, 'components'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
for component in dev.components:
|
||||||
|
if isinstance(component, DataStorage):
|
||||||
|
self.data['devices'].add(component)
|
||||||
|
|
||||||
|
self.data['document'] = self.document
|
||||||
|
self.erased = DataWipe(**self.data)
|
||||||
|
db.session.add(self.erased)
|
|
@ -18,6 +18,7 @@ from ereuse_devicehub.resources.action.models import (Action, Snapshot, VisualTe
|
||||||
Trade, Confirm, ConfirmRevoke, Revoke)
|
Trade, Confirm, ConfirmRevoke, Revoke)
|
||||||
from ereuse_devicehub.resources.action.views import trade as trade_view
|
from ereuse_devicehub.resources.action.views import trade as trade_view
|
||||||
from ereuse_devicehub.resources.action.views.snapshot import SnapshotView, save_json, move_json
|
from ereuse_devicehub.resources.action.views.snapshot import SnapshotView, save_json, move_json
|
||||||
|
from ereuse_devicehub.resources.action.views.documents import ErasedView
|
||||||
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
|
from ereuse_devicehub.resources.device.models import Device, Computer, DataStorage
|
||||||
from ereuse_devicehub.resources.enums import Severity
|
from ereuse_devicehub.resources.enums import Severity
|
||||||
|
|
||||||
|
@ -250,6 +251,10 @@ class ActionView(View):
|
||||||
confirm_revoke = trade_view.ConfirmRevokeDocumentView(json, resource_def, self.schema)
|
confirm_revoke = trade_view.ConfirmRevokeDocumentView(json, resource_def, self.schema)
|
||||||
return confirm_revoke.post()
|
return confirm_revoke.post()
|
||||||
|
|
||||||
|
if json['type'] == 'DataWipe':
|
||||||
|
erased = ErasedView(json, resource_def.schema)
|
||||||
|
return erased.post()
|
||||||
|
|
||||||
a = resource_def.schema.load(json)
|
a = resource_def.schema.load(json)
|
||||||
Model = db.Model._decl_class_registry.data[json['type']]()
|
Model = db.Model._decl_class_registry.data[json['type']]()
|
||||||
action = Model(**a)
|
action = Model(**a)
|
||||||
|
|
|
@ -696,6 +696,25 @@ class Computer(Device):
|
||||||
if privacy
|
if privacy
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def external_document_erasure(self):
|
||||||
|
"""Returns the external ``DataStorage`` proof of erasure.
|
||||||
|
"""
|
||||||
|
from ereuse_devicehub.resources.action.models import DataWipe
|
||||||
|
urls = set()
|
||||||
|
try:
|
||||||
|
ev = self.last_action_of(DataWipe)
|
||||||
|
urls.add(ev.document.url.to_text())
|
||||||
|
except LookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
for comp in self.components:
|
||||||
|
if isinstance(comp, DataStorage):
|
||||||
|
doc = comp.external_document_erasure
|
||||||
|
if doc:
|
||||||
|
urls.add(doc)
|
||||||
|
return urls
|
||||||
|
|
||||||
def add_mac_to_hid(self, components_snap=None):
|
def add_mac_to_hid(self, components_snap=None):
|
||||||
"""Returns the Naming.hid with the first mac of network adapter,
|
"""Returns the Naming.hid with the first mac of network adapter,
|
||||||
following an alphabetical order.
|
following an alphabetical order.
|
||||||
|
@ -879,6 +898,17 @@ class DataStorage(JoinedComponentTableMixin, Component):
|
||||||
v += ' – {} GB'.format(self.size // 1000 if self.size else '?')
|
v += ' – {} GB'.format(self.size // 1000 if self.size else '?')
|
||||||
return v
|
return v
|
||||||
|
|
||||||
|
@property
|
||||||
|
def external_document_erasure(self):
|
||||||
|
"""Returns the external ``DataStorage`` proof of erasure.
|
||||||
|
"""
|
||||||
|
from ereuse_devicehub.resources.action.models import DataWipe
|
||||||
|
try:
|
||||||
|
ev = self.last_action_of(DataWipe)
|
||||||
|
return ev.document.url.to_text()
|
||||||
|
except LookupError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class HardDrive(DataStorage):
|
class HardDrive(DataStorage):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -52,7 +52,8 @@ class DeviceRow(OrderedDict):
|
||||||
self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = ''
|
self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = ''
|
||||||
self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = ''
|
self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = ''
|
||||||
for i, tag in zip(range(1, 3), device.tags):
|
for i, tag in zip(range(1, 3), device.tags):
|
||||||
self['Tag {} Type'.format(i)] = 'unamed' if tag.provider else 'named'
|
self['Tag {} Type'.format(
|
||||||
|
i)] = 'unamed' if tag.provider else 'named'
|
||||||
self['Tag {} ID'.format(i)] = tag.id
|
self['Tag {} ID'.format(i)] = tag.id
|
||||||
self['Tag {} Organization'.format(i)] = tag.org.name
|
self['Tag {} Organization'.format(i)] = tag.org.name
|
||||||
|
|
||||||
|
@ -70,11 +71,13 @@ class DeviceRow(OrderedDict):
|
||||||
self['Updated in (web)'] = ''
|
self['Updated in (web)'] = ''
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self['Physical state'] = device.last_action_of(*states.Physical.actions()).t
|
self['Physical state'] = device.last_action_of(
|
||||||
|
*states.Physical.actions()).t
|
||||||
except LookupError:
|
except LookupError:
|
||||||
self['Physical state'] = ''
|
self['Physical state'] = ''
|
||||||
try:
|
try:
|
||||||
self['Trading state'] = device.last_action_of(*states.Trading.actions()).t
|
self['Trading state'] = device.last_action_of(
|
||||||
|
*states.Trading.actions()).t
|
||||||
except LookupError:
|
except LookupError:
|
||||||
self['Trading state'] = ''
|
self['Trading state'] = ''
|
||||||
if isinstance(device, d.Computer):
|
if isinstance(device, d.Computer):
|
||||||
|
@ -108,7 +111,7 @@ class DeviceRow(OrderedDict):
|
||||||
self['RAM Range'] = ''
|
self['RAM Range'] = ''
|
||||||
self['Data Storage Rate'] = ''
|
self['Data Storage Rate'] = ''
|
||||||
self['Data Storage Range'] = ''
|
self['Data Storage Range'] = ''
|
||||||
|
|
||||||
self['Price'] = none2str(device.price)
|
self['Price'] = none2str(device.price)
|
||||||
|
|
||||||
benchram = get_action(device, 'BenchmarkRamSysbench')
|
benchram = get_action(device, 'BenchmarkRamSysbench')
|
||||||
|
@ -120,7 +123,7 @@ class DeviceRow(OrderedDict):
|
||||||
def components(self):
|
def components(self):
|
||||||
"""Function to get all components information of a device."""
|
"""Function to get all components information of a device."""
|
||||||
assert isinstance(self.device, d.Computer)
|
assert isinstance(self.device, d.Computer)
|
||||||
for ctype in self.ORDER_COMPONENTS: # ctype: str
|
for ctype in self.ORDER_COMPONENTS: # ctype: str
|
||||||
cmax = self.NUMS.get(ctype, 4)
|
cmax = self.NUMS.get(ctype, 4)
|
||||||
i = 1
|
i = 1
|
||||||
l_ctype = [ctype]
|
l_ctype = [ctype]
|
||||||
|
@ -148,9 +151,11 @@ class DeviceRow(OrderedDict):
|
||||||
self['{} {} Model'.format(ctype, i)] = ''
|
self['{} {} Model'.format(ctype, i)] = ''
|
||||||
self['{} {} Serial Number'.format(ctype, i)] = ''
|
self['{} {} Serial Number'.format(ctype, i)] = ''
|
||||||
else:
|
else:
|
||||||
self['{} {} Manufacturer'.format(ctype, i)] = none2str(component.manufacturer)
|
self['{} {} Manufacturer'.format(ctype, i)] = none2str(
|
||||||
|
component.manufacturer)
|
||||||
self['{} {} Model'.format(ctype, i)] = none2str(component.model)
|
self['{} {} Model'.format(ctype, i)] = none2str(component.model)
|
||||||
self['{} {} Serial Number'.format(ctype, i)] = none2str(component.serial_number)
|
self['{} {} Serial Number'.format(ctype, i)] = none2str(
|
||||||
|
component.serial_number)
|
||||||
|
|
||||||
if ctype == d.Processor.t:
|
if ctype == d.Processor.t:
|
||||||
self.get_processor(ctype, i, component)
|
self.get_processor(ctype, i, component)
|
||||||
|
@ -170,10 +175,12 @@ class DeviceRow(OrderedDict):
|
||||||
self['{} {} Number of cores'.format(ctype, i)] = ''
|
self['{} {} Number of cores'.format(ctype, i)] = ''
|
||||||
self['{} {} Speed (GHz)'.format(ctype, i)] = ''
|
self['{} {} Speed (GHz)'.format(ctype, i)] = ''
|
||||||
self['Benchmark {} {} (points)'.format(ctype, i)] = ''
|
self['Benchmark {} {} (points)'.format(ctype, i)] = ''
|
||||||
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = ''
|
self['Benchmark ProcessorSysbench {} {} (points)'.format(
|
||||||
|
ctype, i)] = ''
|
||||||
return
|
return
|
||||||
|
|
||||||
self['{} {} Number of cores'.format(ctype, i)] = none2str(component.cores)
|
self['{} {} Number of cores'.format(
|
||||||
|
ctype, i)] = none2str(component.cores)
|
||||||
self['{} {} Speed (GHz)'.format(ctype, i)] = none2str(component.speed)
|
self['{} {} Speed (GHz)'.format(ctype, i)] = none2str(component.speed)
|
||||||
|
|
||||||
benchmark = get_action(component, 'BenchmarkProcessor')
|
benchmark = get_action(component, 'BenchmarkProcessor')
|
||||||
|
@ -184,9 +191,11 @@ class DeviceRow(OrderedDict):
|
||||||
|
|
||||||
sysbench = get_action(component, 'BenchmarkProcessorSysbench')
|
sysbench = get_action(component, 'BenchmarkProcessorSysbench')
|
||||||
if not sysbench:
|
if not sysbench:
|
||||||
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = ''
|
self['Benchmark ProcessorSysbench {} {} (points)'.format(
|
||||||
|
ctype, i)] = ''
|
||||||
return
|
return
|
||||||
self['Benchmark ProcessorSysbench {} {} (points)'.format(ctype, i)] = sysbench.rate
|
self['Benchmark ProcessorSysbench {} {} (points)'.format(
|
||||||
|
ctype, i)] = sysbench.rate
|
||||||
|
|
||||||
def get_ram(self, ctype, i, component):
|
def get_ram(self, ctype, i, component):
|
||||||
"""Particular fields for component Ram Module."""
|
"""Particular fields for component Ram Module."""
|
||||||
|
@ -202,6 +211,7 @@ class DeviceRow(OrderedDict):
|
||||||
"""Particular fields for component DataStorage.
|
"""Particular fields for component DataStorage.
|
||||||
A DataStorage can be HardDrive or SolidStateDrive.
|
A DataStorage can be HardDrive or SolidStateDrive.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if component is None:
|
if component is None:
|
||||||
self['{} {} Size (MB)'.format(ctype, i)] = ''
|
self['{} {} Size (MB)'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {}'.format(ctype, i)] = ''
|
self['Erasure {} {}'.format(ctype, i)] = ''
|
||||||
|
@ -209,6 +219,7 @@ class DeviceRow(OrderedDict):
|
||||||
self['Erasure {} {} Size (MB)'.format(ctype, i)] = ''
|
self['Erasure {} {} Size (MB)'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Software'.format(ctype, i)] = ''
|
self['Erasure {} {} Software'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Result'.format(ctype, i)] = ''
|
self['Erasure {} {} Result'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Type'.format(ctype, i)] = ''
|
self['Erasure {} {} Type'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Method'.format(ctype, i)] = ''
|
self['Erasure {} {} Method'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
|
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
|
||||||
|
@ -222,7 +233,8 @@ class DeviceRow(OrderedDict):
|
||||||
self['Test {} {} Type'.format(ctype, i)] = ''
|
self['Test {} {} Type'.format(ctype, i)] = ''
|
||||||
self['Test {} {} Result'.format(ctype, i)] = ''
|
self['Test {} {} Result'.format(ctype, i)] = ''
|
||||||
self['Test {} {} Power on (hours used)'.format(ctype, i)] = ''
|
self['Test {} {} Power on (hours used)'.format(ctype, i)] = ''
|
||||||
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = ''
|
self['Test {} {} Lifetime remaining (percentage)'.format(
|
||||||
|
ctype, i)] = ''
|
||||||
return
|
return
|
||||||
|
|
||||||
snapshot = get_action(component, 'Snapshot')
|
snapshot = get_action(component, 'Snapshot')
|
||||||
|
@ -233,15 +245,38 @@ class DeviceRow(OrderedDict):
|
||||||
|
|
||||||
self['{} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
|
self['{} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
|
||||||
|
|
||||||
erasures = [a for a in component.actions if a.type in ['EraseBasic', 'EraseSectors']]
|
erasures = [a for a in component.actions if a.type in [
|
||||||
|
'EraseBasic', 'EraseSectors', 'DataWipe']]
|
||||||
erasure = erasures[-1] if erasures else None
|
erasure = erasures[-1] if erasures else None
|
||||||
if not erasure:
|
if not erasure:
|
||||||
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
|
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
|
||||||
serial_number = none2str(component.serial_number)
|
serial_number = none2str(component.serial_number)
|
||||||
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
|
self['Erasure {} {} Serial Number'.format(
|
||||||
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
|
ctype, i)] = serial_number
|
||||||
|
self['Erasure {} {} Size (MB)'.format(
|
||||||
|
ctype, i)] = none2str(component.size)
|
||||||
self['Erasure {} {} Software'.format(ctype, i)] = ''
|
self['Erasure {} {} Software'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Result'.format(ctype, i)] = ''
|
self['Erasure {} {} Result'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Type'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Method'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Date'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Steps'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Steps Start Time'.format(ctype, i)] = ''
|
||||||
|
self['Erasure {} {} Steps End Time'.format(ctype, i)] = ''
|
||||||
|
elif hasattr(erasure, 'type') and erasure.type == 'DataWipe':
|
||||||
|
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
|
||||||
|
serial_number = none2str(component.serial_number)
|
||||||
|
self['Erasure {} {} Serial Number'.format(
|
||||||
|
ctype, i)] = serial_number
|
||||||
|
self['Erasure {} {} Size (MB)'.format(
|
||||||
|
ctype, i)] = none2str(component.size)
|
||||||
|
self['Erasure {} {} Software'.format(
|
||||||
|
ctype, i)] = erasure.document.software
|
||||||
|
self['Erasure {} {} Result'.format(ctype, i)] = get_result(erasure)
|
||||||
|
self['Erasure {} {} Certificate URL'.format(
|
||||||
|
ctype, i)] = erasure.document.url.to_text()
|
||||||
self['Erasure {} {} Type'.format(ctype, i)] = ''
|
self['Erasure {} {} Type'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Method'.format(ctype, i)] = ''
|
self['Erasure {} {} Method'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
|
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = ''
|
||||||
|
@ -252,22 +287,31 @@ class DeviceRow(OrderedDict):
|
||||||
else:
|
else:
|
||||||
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
|
self['Erasure {} {}'.format(ctype, i)] = none2str(component.hid)
|
||||||
serial_number = none2str(component.serial_number)
|
serial_number = none2str(component.serial_number)
|
||||||
self['Erasure {} {} Serial Number'.format(ctype, i)] = serial_number
|
self['Erasure {} {} Serial Number'.format(
|
||||||
self['Erasure {} {} Size (MB)'.format(ctype, i)] = none2str(component.size)
|
ctype, i)] = serial_number
|
||||||
|
self['Erasure {} {} Size (MB)'.format(
|
||||||
|
ctype, i)] = none2str(component.size)
|
||||||
self['Erasure {} {} Software'.format(ctype, i)] = software
|
self['Erasure {} {} Software'.format(ctype, i)] = software
|
||||||
|
|
||||||
result = get_result(erasure.severity)
|
result = get_result(erasure)
|
||||||
self['Erasure {} {} Result'.format(ctype, i)] = result
|
self['Erasure {} {} Result'.format(ctype, i)] = result
|
||||||
|
self['Erasure {} {} Certificate URL'.format(ctype, i)] = ''
|
||||||
self['Erasure {} {} Type'.format(ctype, i)] = erasure.type
|
self['Erasure {} {} Type'.format(ctype, i)] = erasure.type
|
||||||
self['Erasure {} {} Method'.format(ctype, i)] = erasure.method
|
self['Erasure {} {} Method'.format(ctype, i)] = erasure.method
|
||||||
self['Erasure {} {} Elapsed (hours)'.format(ctype, i)] = format(erasure.elapsed)
|
self['Erasure {} {} Elapsed (hours)'.format(
|
||||||
self['Erasure {} {} Date'.format(ctype, i)] = format(erasure.created)
|
ctype, i)] = format(erasure.elapsed)
|
||||||
|
self['Erasure {} {} Date'.format(
|
||||||
|
ctype, i)] = format(erasure.created)
|
||||||
steps = ','.join((format(x) for x in erasure.steps))
|
steps = ','.join((format(x) for x in erasure.steps))
|
||||||
self['Erasure {} {} Steps'.format(ctype, i)] = steps
|
self['Erasure {} {} Steps'.format(ctype, i)] = steps
|
||||||
steps_start_time = ','.join((format(x.start_time) for x in erasure.steps))
|
steps_start_time = ','.join(
|
||||||
self['Erasure {} {} Steps Start Time'.format(ctype, i)] = steps_start_time
|
(format(x.start_time) for x in erasure.steps))
|
||||||
steps_end_time = ','.join((format(x.end_time) for x in erasure.steps))
|
self['Erasure {} {} Steps Start Time'.format(
|
||||||
self['Erasure {} {} Steps End Time'.format(ctype, i)] = steps_end_time
|
ctype, i)] = steps_start_time
|
||||||
|
steps_end_time = ','.join((format(x.end_time)
|
||||||
|
for x in erasure.steps))
|
||||||
|
self['Erasure {} {} Steps End Time'.format(
|
||||||
|
ctype, i)] = steps_end_time
|
||||||
|
|
||||||
benchmark = get_action(component, 'BenchmarkDataStorage')
|
benchmark = get_action(component, 'BenchmarkDataStorage')
|
||||||
if not benchmark:
|
if not benchmark:
|
||||||
|
@ -285,12 +329,14 @@ class DeviceRow(OrderedDict):
|
||||||
self['Test {} {} Type'.format(ctype, i)] = ''
|
self['Test {} {} Type'.format(ctype, i)] = ''
|
||||||
self['Test {} {} Result'.format(ctype, i)] = ''
|
self['Test {} {} Result'.format(ctype, i)] = ''
|
||||||
self['Test {} {} Power on (hours used)'.format(ctype, i)] = ''
|
self['Test {} {} Power on (hours used)'.format(ctype, i)] = ''
|
||||||
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = ''
|
self['Test {} {} Lifetime remaining (percentage)'.format(
|
||||||
|
ctype, i)] = ''
|
||||||
return
|
return
|
||||||
|
|
||||||
self['Test {} {} Software'.format(ctype, i)] = software
|
self['Test {} {} Software'.format(ctype, i)] = software
|
||||||
self['Test {} {} Type'.format(ctype, i)] = test_storage.length.value
|
self['Test {} {} Type'.format(ctype, i)] = test_storage.length.value
|
||||||
self['Test {} {} Result'.format(ctype, i)] = get_result(test_storage.severity)
|
self['Test {} {} Result'.format(ctype, i)] = get_result(
|
||||||
|
test_storage)
|
||||||
self['Test {} {} Power on (hours used)'.format(ctype, i)] = none2str(
|
self['Test {} {} Power on (hours used)'.format(ctype, i)] = none2str(
|
||||||
test_storage.power_cycle_count)
|
test_storage.power_cycle_count)
|
||||||
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = none2str(
|
self['Test {} {} Lifetime remaining (percentage)'.format(ctype, i)] = none2str(
|
||||||
|
@ -319,7 +365,8 @@ class StockRow(OrderedDict):
|
||||||
self['Manufacturer'] = none2str(device.manufacturer)
|
self['Manufacturer'] = none2str(device.manufacturer)
|
||||||
self['Registered in'] = format(device.created, '%c')
|
self['Registered in'] = format(device.created, '%c')
|
||||||
try:
|
try:
|
||||||
self['Physical state'] = device.last_action_of(*states.Physical.actions()).t
|
self['Physical state'] = device.last_action_of(
|
||||||
|
*states.Physical.actions()).t
|
||||||
except LookupError:
|
except LookupError:
|
||||||
self['Physical state'] = ''
|
self['Physical state'] = ''
|
||||||
try:
|
try:
|
||||||
|
@ -343,15 +390,21 @@ class StockRow(OrderedDict):
|
||||||
self['Data Storage Range'] = rate.data_storage_range
|
self['Data Storage Range'] = rate.data_storage_range
|
||||||
|
|
||||||
|
|
||||||
def get_result(severity):
|
def get_result(erasure):
|
||||||
""" For the csv is necessary simplify the message of results """
|
""" For the csv is necessary simplify the message of results """
|
||||||
|
if hasattr(erasure, 'type') and erasure.type == 'DataWipe':
|
||||||
|
if erasure.document.success:
|
||||||
|
return 'Success'
|
||||||
|
return 'Failure'
|
||||||
|
|
||||||
|
|
||||||
type_of_results = {
|
type_of_results = {
|
||||||
Severity.Error: 'Failure',
|
Severity.Error: 'Failure',
|
||||||
Severity.Warning: 'Success with Warnings',
|
Severity.Warning: 'Success with Warnings',
|
||||||
Severity.Notice: 'Success',
|
Severity.Notice: 'Success',
|
||||||
Severity.Info: 'Success'
|
Severity.Info: 'Success'
|
||||||
}
|
}
|
||||||
return type_of_results[severity]
|
return type_of_results[erasure.severity]
|
||||||
|
|
||||||
|
|
||||||
def none2str(string):
|
def none2str(string):
|
||||||
|
|
|
@ -90,6 +90,7 @@ class DocumentView(DeviceView):
|
||||||
res = flask.make_response(template)
|
res = flask.make_response(template)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def erasure(query: db.Query):
|
def erasure(query: db.Query):
|
||||||
def erasures():
|
def erasures():
|
||||||
|
@ -291,7 +292,6 @@ class InternalStatsView(DeviceView):
|
||||||
evs.Action.type.in_(('Snapshot', 'Live', 'Allocate', 'Deallocate')))
|
evs.Action.type.in_(('Snapshot', 'Live', 'Allocate', 'Deallocate')))
|
||||||
return self.generate_post_csv(query)
|
return self.generate_post_csv(query)
|
||||||
|
|
||||||
|
|
||||||
def generate_post_csv(self, query):
|
def generate_post_csv(self, query):
|
||||||
d = {}
|
d = {}
|
||||||
for ac in query:
|
for ac in query:
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
from citext import CIText
|
||||||
|
from flask import g
|
||||||
|
from sqlalchemy import BigInteger, Column, Sequence, Unicode, Boolean, ForeignKey
|
||||||
|
from sqlalchemy.ext.declarative import declared_attr
|
||||||
|
from sqlalchemy.dialects.postgresql import UUID
|
||||||
|
from teal.db import URL
|
||||||
|
from ereuse_devicehub.db import db
|
||||||
|
from ereuse_devicehub.resources.user.models import User
|
||||||
|
from ereuse_devicehub.resources.models import Thing, STR_SM_SIZE
|
||||||
|
|
||||||
|
|
||||||
|
class Document(Thing):
|
||||||
|
"""This represent a generic document."""
|
||||||
|
|
||||||
|
id = Column(BigInteger, Sequence('device_seq'), primary_key=True)
|
||||||
|
id.comment = """The identifier of the device for this database. Used only
|
||||||
|
internally for software; users should not use this.
|
||||||
|
"""
|
||||||
|
type = Column(Unicode(STR_SM_SIZE), nullable=False)
|
||||||
|
date = Column(db.DateTime, nullable=True)
|
||||||
|
date.comment = """The date of document, some documents need to have one date
|
||||||
|
"""
|
||||||
|
id_document = Column(CIText(), nullable=True)
|
||||||
|
id_document.comment = """The id of one document like invoice so they can be linked."""
|
||||||
|
owner_id = db.Column(UUID(as_uuid=True),
|
||||||
|
db.ForeignKey(User.id),
|
||||||
|
nullable=False,
|
||||||
|
default=lambda: g.user.id)
|
||||||
|
owner = db.relationship(User, primaryjoin=owner_id == User.id)
|
||||||
|
file_name = Column(db.CIText(), nullable=False)
|
||||||
|
file_name.comment = """This is the name of the file when user up the document."""
|
||||||
|
file_hash = Column(db.CIText(), nullable=False)
|
||||||
|
file_hash.comment = """This is the hash of the file produced from frontend."""
|
||||||
|
url = db.Column(URL(), nullable=True)
|
||||||
|
url.comment = """This is the url where resides the document."""
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return '{0.file_name}'.format(self)
|
||||||
|
|
||||||
|
|
||||||
|
class JoinedTableMixin:
|
||||||
|
# noinspection PyMethodParameters
|
||||||
|
@declared_attr
|
||||||
|
def id(cls):
|
||||||
|
return Column(BigInteger, ForeignKey(Document.id), primary_key=True)
|
||||||
|
|
||||||
|
|
||||||
|
class DataWipeDocument(JoinedTableMixin, Document):
|
||||||
|
"""This represent a generic document."""
|
||||||
|
|
||||||
|
software = Column(CIText(), nullable=True)
|
||||||
|
software.comment = """Which software is used"""
|
||||||
|
success = Column(Boolean, default=False)
|
||||||
|
success.comment = """If the erase was success"""
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return '{0.file_name}'.format(self)
|
|
@ -0,0 +1,28 @@
|
||||||
|
from marshmallow.fields import DateTime, Integer, validate, Boolean
|
||||||
|
from teal.marshmallow import SanitizedStr, URL
|
||||||
|
from ereuse_devicehub.resources.schemas import Thing
|
||||||
|
from ereuse_devicehub.resources.documents import models as m
|
||||||
|
|
||||||
|
|
||||||
|
class DataWipeDocument(Thing):
|
||||||
|
__doc__ = m.DataWipeDocument.__doc__
|
||||||
|
id = Integer(description=m.DataWipeDocument.id.comment, dump_only=True)
|
||||||
|
type = SanitizedStr(default='DataWipeDocument')
|
||||||
|
url = URL(required= False, description=m.DataWipeDocument.url.comment)
|
||||||
|
success = Boolean(required=False, default=False, description=m.DataWipeDocument.success.comment)
|
||||||
|
software = SanitizedStr(description=m.DataWipeDocument.software.comment)
|
||||||
|
date = DateTime(data_key='endTime',
|
||||||
|
required=False,
|
||||||
|
description=m.DataWipeDocument.date.comment)
|
||||||
|
id_document = SanitizedStr(data_key='documentId',
|
||||||
|
required=False,
|
||||||
|
default='',
|
||||||
|
description=m.DataWipeDocument.id_document.comment)
|
||||||
|
file_name = SanitizedStr(data_key='filename',
|
||||||
|
default='',
|
||||||
|
description=m.DataWipeDocument.file_name.comment,
|
||||||
|
validate=validate.Length(max=100))
|
||||||
|
file_hash = SanitizedStr(data_key='hash',
|
||||||
|
default='',
|
||||||
|
description=m.DataWipeDocument.file_hash.comment,
|
||||||
|
validate=validate.Length(max=64))
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -6,6 +6,7 @@ import copy
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
from io import BytesIO
|
||||||
from dateutil.tz import tzutc
|
from dateutil.tz import tzutc
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from typing import Tuple, Type
|
from typing import Tuple, Type
|
||||||
|
@ -2408,3 +2409,32 @@ def test_trade_case14(user: UserClient, user2: UserClient):
|
||||||
assert device.actions[-4].user == trade.user_from
|
assert device.actions[-4].user == trade.user_from
|
||||||
assert device.actions[-5].t == 'Trade'
|
assert device.actions[-5].t == 'Trade'
|
||||||
assert device.actions[-5].author == trade.user_to
|
assert device.actions[-5].author == trade.user_to
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.mvp
|
||||||
|
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||||
|
def test_action_web_erase(user: UserClient, client: Client):
|
||||||
|
import hashlib
|
||||||
|
from ereuse_devicehub.resources.documents import documents
|
||||||
|
bfile = BytesIO(b'abc')
|
||||||
|
hash3 = hashlib.sha3_256(bfile.read()).hexdigest()
|
||||||
|
snap, _ = user.post(file('acer.happy.battery.snapshot'), res=models.Snapshot)
|
||||||
|
request = {'type': 'DataWipe', 'devices': [snap['device']['id']], 'name': 'borrado universal', 'severity': 'Info', 'description': 'nada que describir', 'url': 'http://www.google.com/', 'documentId': '33', 'endTime': '2021-07-07T22:00:00.000Z', 'filename': 'Certificado de borrado1.pdf', 'hash': hash3, 'success': 1, 'software': "Blanco"}
|
||||||
|
|
||||||
|
user.post(res=models.Action, data=request)
|
||||||
|
action = models.DataWipe.query.one()
|
||||||
|
for dev in action.devices:
|
||||||
|
assert action in dev.actions
|
||||||
|
|
||||||
|
assert action.document.file_hash == request['hash']
|
||||||
|
|
||||||
|
bfile = BytesIO(b'abc')
|
||||||
|
response, _ = client.post(res=documents.DocumentDef.t,
|
||||||
|
item='stamps/',
|
||||||
|
content_type='multipart/form-data',
|
||||||
|
accept='text/html',
|
||||||
|
data={'docUpload': [(bfile, 'example.csv')]},
|
||||||
|
status=200)
|
||||||
|
assert "alert alert-info" in response
|
||||||
|
assert "100% coincidence." in response
|
||||||
|
assert not "alert alert-danger" in response
|
||||||
|
|
|
@ -122,4 +122,4 @@ def test_api_docs(client: Client):
|
||||||
'scheme': 'basic',
|
'scheme': 'basic',
|
||||||
'name': 'Authorization'
|
'name': 'Authorization'
|
||||||
}
|
}
|
||||||
assert len(docs['definitions']) == 125
|
assert len(docs['definitions']) == 126
|
||||||
|
|
|
@ -48,7 +48,8 @@ def test_erasure_certificate_public_one(user: UserClient, client: Client):
|
||||||
accept='application/pdf')
|
accept='application/pdf')
|
||||||
assert 'application/pdf' == response.content_type
|
assert 'application/pdf' == response.content_type
|
||||||
|
|
||||||
erasure = next(e for e in snapshot['actions'] if e['type'] == 'EraseSectors')
|
erasure = next(e for e in snapshot['actions']
|
||||||
|
if e['type'] == 'EraseSectors')
|
||||||
|
|
||||||
doc, response = client.get(res=documents.DocumentDef.t,
|
doc, response = client.get(res=documents.DocumentDef.t,
|
||||||
item='erasures/{}'.format(erasure['id']),
|
item='erasures/{}'.format(erasure['id']),
|
||||||
|
@ -68,7 +69,8 @@ def test_erasure_certificate_private_query(user: UserClient):
|
||||||
|
|
||||||
doc, response = user.get(res=documents.DocumentDef.t,
|
doc, response = user.get(res=documents.DocumentDef.t,
|
||||||
item='erasures/',
|
item='erasures/',
|
||||||
query=[('filter', {'id': [snapshot['device']['id']]})],
|
query=[
|
||||||
|
('filter', {'id': [snapshot['device']['id']]})],
|
||||||
accept=ANY)
|
accept=ANY)
|
||||||
assert 'html' in response.content_type
|
assert 'html' in response.content_type
|
||||||
assert '<html' in doc
|
assert '<html' in doc
|
||||||
|
@ -77,7 +79,8 @@ def test_erasure_certificate_private_query(user: UserClient):
|
||||||
doc, response = user.get(res=documents.DocumentDef.t,
|
doc, response = user.get(res=documents.DocumentDef.t,
|
||||||
item='erasures/',
|
item='erasures/',
|
||||||
query=[
|
query=[
|
||||||
('filter', {'id': [snapshot['device']['id']]}),
|
('filter', {
|
||||||
|
'id': [snapshot['device']['id']]}),
|
||||||
('format', 'PDF')
|
('format', 'PDF')
|
||||||
],
|
],
|
||||||
accept='application/pdf')
|
accept='application/pdf')
|
||||||
|
@ -95,19 +98,19 @@ def test_export_csv_permitions(user: UserClient, user2: UserClient, client: Clie
|
||||||
"""test export device information in a csv file with others users."""
|
"""test export device information in a csv file with others users."""
|
||||||
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
|
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
|
||||||
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
||||||
item='devices/',
|
item='devices/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})])
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
|
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
|
||||||
item='devices/',
|
item='devices/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})])
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
_, res = client.get(res=documents.DocumentDef.t,
|
_, res = client.get(res=documents.DocumentDef.t,
|
||||||
item='devices/',
|
item='devices/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})], status=401)
|
query=[('filter', {'type': ['Computer']})], status=401)
|
||||||
assert res.status_code == 401
|
assert res.status_code == 401
|
||||||
|
|
||||||
assert len(csv_user) > 0
|
assert len(csv_user) > 0
|
||||||
|
@ -125,30 +128,31 @@ def test_export_csv_actions(user: UserClient, user2: UserClient, client: Client)
|
||||||
"finalUserCode": "abcdefjhi",
|
"finalUserCode": "abcdefjhi",
|
||||||
"startTime": "2020-11-01T02:00:00+00:00",
|
"startTime": "2020-11-01T02:00:00+00:00",
|
||||||
"endTime": "2020-12-01T02:00:00+00:00"
|
"endTime": "2020-12-01T02:00:00+00:00"
|
||||||
}
|
}
|
||||||
|
|
||||||
user.post(res=Allocate, data=post_request)
|
user.post(res=Allocate, data=post_request)
|
||||||
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
|
||||||
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
|
hdd_action = [a for a in hdd['actions']
|
||||||
|
if a['type'] == 'TestDataStorage'][0]
|
||||||
hdd_action['lifetime'] += 1000
|
hdd_action['lifetime'] += 1000
|
||||||
acer.pop('elapsed')
|
acer.pop('elapsed')
|
||||||
acer['licence_version'] = '1.0.0'
|
acer['licence_version'] = '1.0.0'
|
||||||
snapshot, _ = client.post(acer, res=Live)
|
snapshot, _ = client.post(acer, res=Live)
|
||||||
|
|
||||||
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
||||||
item='actions/',
|
item='actions/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})])
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
|
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
|
||||||
item='actions/',
|
item='actions/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})])
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
_, res = client.get(res=documents.DocumentDef.t,
|
_, res = client.get(res=documents.DocumentDef.t,
|
||||||
item='actions/',
|
item='actions/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})], status=401)
|
query=[('filter', {'type': ['Computer']})], status=401)
|
||||||
assert res.status_code == 401
|
assert res.status_code == 401
|
||||||
|
|
||||||
assert len(csv_user) > 0
|
assert len(csv_user) > 0
|
||||||
|
@ -167,21 +171,22 @@ def test_live_export_csv2(user: UserClient, client: Client, app: Devicehub):
|
||||||
"finalUserCode": "abcdefjhi",
|
"finalUserCode": "abcdefjhi",
|
||||||
"startTime": "2020-11-01T02:00:00+00:00",
|
"startTime": "2020-11-01T02:00:00+00:00",
|
||||||
"endTime": "2020-12-01T02:00:00+00:00"
|
"endTime": "2020-12-01T02:00:00+00:00"
|
||||||
}
|
}
|
||||||
|
|
||||||
user.post(res=Allocate, data=post_request)
|
user.post(res=Allocate, data=post_request)
|
||||||
|
|
||||||
acer = yaml2json('acer-happy.live-test1')
|
acer = yaml2json('acer-happy.live-test1')
|
||||||
live, _ = client.post(acer, res=Live)
|
live, _ = client.post(acer, res=Live)
|
||||||
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
||||||
item='actions/',
|
item='actions/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})])
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
assert "4692" in csv_user
|
assert "4692" in csv_user
|
||||||
assert "8692" in csv_user
|
assert "8692" in csv_user
|
||||||
assert "DevicehubID" in csv_user
|
assert "DevicehubID" in csv_user
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||||
def test_live_example2(user: UserClient, client: Client, app: Devicehub):
|
def test_live_example2(user: UserClient, client: Client, app: Devicehub):
|
||||||
|
@ -214,6 +219,7 @@ def test_export_basic_snapshot(user: UserClient):
|
||||||
item='devices/',
|
item='devices/',
|
||||||
accept='text/csv',
|
accept='text/csv',
|
||||||
query=[('filter', {'type': ['Computer']})])
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
f = StringIO(csv_str)
|
f = StringIO(csv_str)
|
||||||
obj_csv = csv.reader(f, f, delimiter=';', quotechar='"')
|
obj_csv = csv.reader(f, f, delimiter=';', quotechar='"')
|
||||||
export_csv = list(obj_csv)
|
export_csv = list(obj_csv)
|
||||||
|
@ -278,9 +284,6 @@ def test_export_extended(app: Devicehub, user: UserClient):
|
||||||
obj_csv = csv.reader(f, f, delimiter=';', quotechar='"')
|
obj_csv = csv.reader(f, f, delimiter=';', quotechar='"')
|
||||||
export_csv = list(obj_csv)
|
export_csv = list(obj_csv)
|
||||||
|
|
||||||
ff= open('ba.csv', 'w')
|
|
||||||
ff.write(csv_str)
|
|
||||||
ff.close()
|
|
||||||
# Open fixture csv and transform to list
|
# Open fixture csv and transform to list
|
||||||
with Path(__file__).parent.joinpath('files').joinpath(
|
with Path(__file__).parent.joinpath('files').joinpath(
|
||||||
'proposal_extended_csv_report.csv').open() as csv_file:
|
'proposal_extended_csv_report.csv').open() as csv_file:
|
||||||
|
@ -293,18 +296,18 @@ def test_export_extended(app: Devicehub, user: UserClient):
|
||||||
assert fixture_csv[0] == export_csv[0], 'Headers are not equal'
|
assert fixture_csv[0] == export_csv[0], 'Headers are not equal'
|
||||||
assert fixture_csv[1][:19] == export_csv[1][:19], 'Computer information are not equal'
|
assert fixture_csv[1][:19] == export_csv[1][:19], 'Computer information are not equal'
|
||||||
assert fixture_csv[1][20] == export_csv[1][20], 'Computer information are not equal'
|
assert fixture_csv[1][20] == export_csv[1][20], 'Computer information are not equal'
|
||||||
assert fixture_csv[1][22:81] == export_csv[1][22:81], 'Computer information are not equal'
|
assert fixture_csv[1][22:82] == export_csv[1][22:82], 'Computer information are not equal'
|
||||||
assert fixture_csv[1][82] == export_csv[1][82], 'Computer information are not equal'
|
assert fixture_csv[1][83] == export_csv[1][83], 'Computer information are not equal'
|
||||||
assert fixture_csv[1][85:] == export_csv[1][85:], 'Computer information are not equal'
|
assert fixture_csv[1][86:] == export_csv[1][86:], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][:19] == export_csv[2][:19], 'Computer information are not equal'
|
assert fixture_csv[2][:19] == export_csv[2][:19], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][20] == export_csv[2][20], 'Computer information are not equal'
|
assert fixture_csv[2][20] == export_csv[2][20], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][22:81] == export_csv[2][22:81], 'Computer information are not equal'
|
assert fixture_csv[2][22:82] == export_csv[2][22:82], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][82] == export_csv[2][82], 'Computer information are not equal'
|
assert fixture_csv[2][83] == export_csv[2][83], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][85:105] == export_csv[2][85:105], 'Computer information are not equal'
|
assert fixture_csv[2][86:106] == export_csv[2][86:106], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][106] == export_csv[2][106], 'Computer information are not equal'
|
assert fixture_csv[2][108] == export_csv[2][108], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][109:129] == export_csv[2][109:129], 'Computer information are not equal'
|
assert fixture_csv[2][111:131] == export_csv[2][111:131], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][130] == export_csv[2][130], 'Computer information are not equal'
|
assert fixture_csv[2][131] == export_csv[2][131], 'Computer information are not equal'
|
||||||
assert fixture_csv[2][133:] == export_csv[2][133:], 'Computer information are not equal'
|
assert fixture_csv[2][136:] == export_csv[2][136:], 'Computer information are not equal'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
|
|
Reference in New Issue