Merge pull request #83 from eReuse/feature/77-add-owner-device
Feature/77 add owner device
This commit is contained in:
commit
5b95cb9ae0
|
@ -0,0 +1,67 @@
|
||||||
|
"""adding owner_id in device
|
||||||
|
|
||||||
|
Revision ID: 68a5c025ab8e
|
||||||
|
Revises: b9b0ee7d9dca
|
||||||
|
Create Date: 2020-10-30 11:48:34.992498
|
||||||
|
|
||||||
|
"""
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import context
|
||||||
|
from alembic import op
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '68a5c025ab8e'
|
||||||
|
down_revision = 'e93aec8fc41f'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_inv():
|
||||||
|
INV = context.get_x_argument(as_dictionary=True).get('inventory')
|
||||||
|
if not INV:
|
||||||
|
raise ValueError("Inventory value is not specified")
|
||||||
|
return INV
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade_data():
|
||||||
|
con = op.get_bind()
|
||||||
|
computers = con.execute(f"select id, owner_id from {get_inv()}.computer")
|
||||||
|
for c in computers:
|
||||||
|
id_dev = c.id
|
||||||
|
id_owner = c.owner_id
|
||||||
|
sql = f"update {get_inv()}.device set owner_id='{id_owner}' where id={id_dev};"
|
||||||
|
con.execute(sql)
|
||||||
|
|
||||||
|
values = f"{get_inv()}.component.id, {get_inv()}.computer.owner_id"
|
||||||
|
table = f"{get_inv()}.component"
|
||||||
|
joins = f"inner join {get_inv()}.computer"
|
||||||
|
on = f"on {table}.parent_id={get_inv()}.computer.id"
|
||||||
|
sql = f"select {values} from {table} {joins} {on}"
|
||||||
|
|
||||||
|
components = con.execute(sql)
|
||||||
|
for c in components:
|
||||||
|
id = c.id
|
||||||
|
id_owner = c.owner_id
|
||||||
|
sql = f"update {get_inv()}.device set owner_id='{id_owner}' where id={id};"
|
||||||
|
con.execute(sql)
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# We need get the actual computers with owner_id
|
||||||
|
# because when add a column in device this reset the values of the owner_id
|
||||||
|
# in the computer tables
|
||||||
|
op.add_column('device', sa.Column('owner_id', postgresql.UUID(),
|
||||||
|
nullable=True), schema=f'{get_inv()}')
|
||||||
|
op.create_foreign_key("fk_device_owner_id_user_id",
|
||||||
|
"device", "user",
|
||||||
|
["owner_id"], ["id"],
|
||||||
|
ondelete="SET NULL",
|
||||||
|
source_schema=f'{get_inv()}', referent_schema='common')
|
||||||
|
|
||||||
|
upgrade_data()
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.drop_constraint("fk_device_owner_id_user_id", "device", type_="foreignkey", schema=f'{get_inv()}')
|
||||||
|
op.drop_column('device', 'owner_id', schema=f'{get_inv()}')
|
|
@ -237,12 +237,14 @@ class ActionView(View):
|
||||||
def live(self, snapshot):
|
def live(self, snapshot):
|
||||||
"""If the device.allocated == True, then this snapshot create an action live."""
|
"""If the device.allocated == True, then this snapshot create an action live."""
|
||||||
device = snapshot.get('device') # type: Computer
|
device = snapshot.get('device') # type: Computer
|
||||||
# TODO @cayop dependency of pulls 85 and 83
|
# TODO @cayop dependency of pulls 85
|
||||||
# if the pr/85 and pr/83 is merged, then you need change this way for get the device
|
# if the pr/85 is merged, then you need change this way for get the device
|
||||||
if not device.hid or not Device.query.filter(Device.hid==device.hid).count():
|
if not device.hid or not Device.query.filter(
|
||||||
|
Device.hid==device.hid, Device.owner_id==g.user.id).count():
|
||||||
return None
|
return None
|
||||||
|
|
||||||
device = Device.query.filter(Device.hid==device.hid).one()
|
device = Device.query.filter(
|
||||||
|
Device.hid==device.hid, Device.owner_id==g.user.id).one()
|
||||||
|
|
||||||
if not device.allocated:
|
if not device.allocated:
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -32,6 +32,7 @@ from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing, listener_reset
|
||||||
from ereuse_devicehub.resources.user.models import User
|
from ereuse_devicehub.resources.user.models import User
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Device(Thing):
|
class Device(Thing):
|
||||||
"""Base class for any type of physical object that can be identified.
|
"""Base class for any type of physical object that can be identified.
|
||||||
|
|
||||||
|
@ -106,6 +107,11 @@ class Device(Thing):
|
||||||
image = db.Column(db.URL)
|
image = db.Column(db.URL)
|
||||||
image.comment = "An image of the device."
|
image.comment = "An image of the device."
|
||||||
|
|
||||||
|
owner_id = db.Column(UUID(as_uuid=True),
|
||||||
|
db.ForeignKey(User.id),
|
||||||
|
nullable=False,
|
||||||
|
default=lambda: g.user.id)
|
||||||
|
owner = db.relationship(User, primaryjoin=owner_id == User.id)
|
||||||
allocated = db.Column(Boolean, default=False)
|
allocated = db.Column(Boolean, default=False)
|
||||||
allocated.comment = "device is allocated or not."
|
allocated.comment = "device is allocated or not."
|
||||||
|
|
||||||
|
@ -115,6 +121,7 @@ class Device(Thing):
|
||||||
'created',
|
'created',
|
||||||
'updated',
|
'updated',
|
||||||
'parent_id',
|
'parent_id',
|
||||||
|
'owner_id',
|
||||||
'hid',
|
'hid',
|
||||||
'production_date',
|
'production_date',
|
||||||
'color', # these are only user-input thus volatile
|
'color', # these are only user-input thus volatile
|
||||||
|
@ -593,7 +600,8 @@ class Component(Device):
|
||||||
"""
|
"""
|
||||||
assert self.hid is None, 'Don\'t use this method with a component that has HID'
|
assert self.hid is None, 'Don\'t use this method with a component that has HID'
|
||||||
component = self.__class__.query \
|
component = self.__class__.query \
|
||||||
.filter_by(parent=parent, hid=None, **self.physical_properties) \
|
.filter_by(parent=parent, hid=None, owner_id=self.owner_id,
|
||||||
|
**self.physical_properties) \
|
||||||
.filter(~Component.id.in_(blacklist)) \
|
.filter(~Component.id.in_(blacklist)) \
|
||||||
.first()
|
.first()
|
||||||
if not component:
|
if not component:
|
||||||
|
|
|
@ -4,6 +4,7 @@ from itertools import groupby
|
||||||
from typing import Iterable, Set
|
from typing import Iterable, Set
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
from flask import g
|
||||||
from sqlalchemy import inspect
|
from sqlalchemy import inspect
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
from sqlalchemy.util import OrderedSet
|
from sqlalchemy.util import OrderedSet
|
||||||
|
@ -101,7 +102,7 @@ class Sync:
|
||||||
assert inspect(component).transient, 'Component should not be synced from DB'
|
assert inspect(component).transient, 'Component should not be synced from DB'
|
||||||
try:
|
try:
|
||||||
if component.hid:
|
if component.hid:
|
||||||
db_component = Device.query.filter_by(hid=component.hid).one()
|
db_component = Device.query.filter_by(hid=component.hid, owner_id=g.user.id).one()
|
||||||
assert isinstance(db_component, Device), \
|
assert isinstance(db_component, Device), \
|
||||||
'{} must be a component'.format(db_component)
|
'{} must be a component'.format(db_component)
|
||||||
else:
|
else:
|
||||||
|
@ -153,7 +154,7 @@ class Sync:
|
||||||
db_device = None
|
db_device = None
|
||||||
if device.hid:
|
if device.hid:
|
||||||
with suppress(ResourceNotFound):
|
with suppress(ResourceNotFound):
|
||||||
db_device = Device.query.filter_by(hid=device.hid).one()
|
db_device = Device.query.filter_by(hid=device.hid, owner_id=g.user.id).one()
|
||||||
if db_device and db_device.allocated:
|
if db_device and db_device.allocated:
|
||||||
raise ResourceNotFound('device is actually allocated {}'.format(device))
|
raise ResourceNotFound('device is actually allocated {}'.format(device))
|
||||||
try:
|
try:
|
||||||
|
@ -204,6 +205,9 @@ class Sync:
|
||||||
|
|
||||||
This method mutates db_device.
|
This method mutates db_device.
|
||||||
"""
|
"""
|
||||||
|
if db_device.owner_id != g.user.id:
|
||||||
|
return
|
||||||
|
|
||||||
for field_name, value in device.physical_properties.items():
|
for field_name, value in device.physical_properties.items():
|
||||||
if value is not None:
|
if value is not None:
|
||||||
setattr(db_device, field_name, value)
|
setattr(db_device, field_name, value)
|
||||||
|
@ -234,8 +238,11 @@ class Sync:
|
||||||
return component.parent or Device(id=0) # Computer with id 0 is our Identity
|
return component.parent or Device(id=0) # Computer with id 0 is our Identity
|
||||||
|
|
||||||
for parent, _components in groupby(sorted(adding, key=g_parent), key=g_parent):
|
for parent, _components in groupby(sorted(adding, key=g_parent), key=g_parent):
|
||||||
if parent.id != 0: # Is not Computer Identity
|
set_components = OrderedSet(_components)
|
||||||
actions.add(Remove(device=parent, components=OrderedSet(_components)))
|
check_owners = (x.owner_id == g.user.id for x in set_components)
|
||||||
|
# Is not Computer Identity and all components have the correct owner
|
||||||
|
if parent.id != 0 and all(check_owners):
|
||||||
|
actions.add(Remove(device=parent, components=set_components))
|
||||||
return actions
|
return actions
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ class DeviceView(View):
|
||||||
return super().get(id)
|
return super().get(id)
|
||||||
|
|
||||||
def patch(self, id):
|
def patch(self, id):
|
||||||
dev = Device.query.filter_by(id=id).one()
|
dev = Device.query.filter_by(id=id, owner_id=g.user.id).one()
|
||||||
if isinstance(dev, Computer):
|
if isinstance(dev, Computer):
|
||||||
resource_def = app.resources['Computer']
|
resource_def = app.resources['Computer']
|
||||||
# TODO check how to handle the 'actions_one'
|
# TODO check how to handle the 'actions_one'
|
||||||
|
@ -131,9 +131,9 @@ class DeviceView(View):
|
||||||
|
|
||||||
@auth.Auth.requires_auth
|
@auth.Auth.requires_auth
|
||||||
def one_private(self, id: int):
|
def one_private(self, id: int):
|
||||||
device = Device.query.filter_by(id=id).one()
|
device = Device.query.filter_by(id=id, owner_id=g.user.id).first()
|
||||||
if hasattr(device, 'owner_id') and device.owner_id != g.user.id:
|
if not device:
|
||||||
device = {}
|
return self.one_public(id)
|
||||||
return self.schema.jsonify(device)
|
return self.schema.jsonify(device)
|
||||||
|
|
||||||
@auth.Auth.requires_auth
|
@auth.Auth.requires_auth
|
||||||
|
@ -149,7 +149,7 @@ class DeviceView(View):
|
||||||
)
|
)
|
||||||
|
|
||||||
def query(self, args):
|
def query(self, args):
|
||||||
query = Device.query.distinct() # todo we should not force to do this if the query is ok
|
query = Device.query.filter((Device.owner_id == g.user.id)).distinct()
|
||||||
search_p = args.get('search', None)
|
search_p = args.get('search', None)
|
||||||
if search_p:
|
if search_p:
|
||||||
properties = DeviceSearch.properties
|
properties = DeviceSearch.properties
|
||||||
|
@ -159,17 +159,8 @@ class DeviceView(View):
|
||||||
).order_by(
|
).order_by(
|
||||||
search.Search.rank(properties, search_p) + search.Search.rank(tags, search_p)
|
search.Search.rank(properties, search_p) + search.Search.rank(tags, search_p)
|
||||||
)
|
)
|
||||||
query = self.visibility_filter(query)
|
|
||||||
return query.filter(*args['filter']).order_by(*args['sort'])
|
return query.filter(*args['filter']).order_by(*args['sort'])
|
||||||
|
|
||||||
def visibility_filter(self, query):
|
|
||||||
filterqs = request.args.get('filter', None)
|
|
||||||
if (filterqs and
|
|
||||||
'lot' not in filterqs):
|
|
||||||
query = query.filter((Computer.id == Device.id), (Computer.owner_id == g.user.id))
|
|
||||||
pass
|
|
||||||
return query
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceMergeView(View):
|
class DeviceMergeView(View):
|
||||||
"""View for merging two devices
|
"""View for merging two devices
|
||||||
|
@ -194,8 +185,13 @@ class DeviceMergeView(View):
|
||||||
many models in session.
|
many models in session.
|
||||||
"""
|
"""
|
||||||
# base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
|
# base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
|
||||||
self.base_device = Device.query.filter_by(id=dev1_id).one()
|
self.base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
|
||||||
self.with_device = Device.query.filter_by(id=dev2_id).one()
|
self.with_device = Device.query.filter_by(id=dev2_id, owner_id=g.user.id).one()
|
||||||
|
|
||||||
|
if self.base_device.allocated or self.with_device.allocated:
|
||||||
|
# Validation than any device is allocated
|
||||||
|
msg = 'The device is allocated, please deallocated before merge.'
|
||||||
|
raise ValidationError(msg)
|
||||||
|
|
||||||
if not self.base_device.type == self.with_device.type:
|
if not self.base_device.type == self.with_device.type:
|
||||||
# Validation than we are speaking of the same kind of devices
|
# Validation than we are speaking of the same kind of devices
|
||||||
|
|
|
@ -3,7 +3,7 @@ from uuid import UUID
|
||||||
import pytest
|
import pytest
|
||||||
from marshmallow import ValidationError
|
from marshmallow import ValidationError
|
||||||
from sqlalchemy_utils import PhoneNumber
|
from sqlalchemy_utils import PhoneNumber
|
||||||
from teal.db import UniqueViolation
|
from teal.db import UniqueViolation, DBError
|
||||||
from teal.enums import Country
|
from teal.enums import Country
|
||||||
|
|
||||||
from ereuse_devicehub.config import DevicehubConfig
|
from ereuse_devicehub.config import DevicehubConfig
|
||||||
|
@ -80,7 +80,7 @@ def test_membership_repeated():
|
||||||
db.session.add(person)
|
db.session.add(person)
|
||||||
|
|
||||||
person.member_of.add(Membership(org, person))
|
person.member_of.add(Membership(org, person))
|
||||||
with pytest.raises(UniqueViolation):
|
with pytest.raises(DBError):
|
||||||
db.session.flush()
|
db.session.flush()
|
||||||
|
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ def test_membership_repeating_id():
|
||||||
person2 = Person(name='Tommy')
|
person2 = Person(name='Tommy')
|
||||||
person2.member_of.add(Membership(org, person2, id='acme-1'))
|
person2.member_of.add(Membership(org, person2, id='acme-1'))
|
||||||
db.session.add(person2)
|
db.session.add(person2)
|
||||||
with pytest.raises(UniqueViolation) as e:
|
with pytest.raises(DBError) as e:
|
||||||
db.session.flush()
|
db.session.flush()
|
||||||
assert 'One member id per organization' in str(e)
|
assert 'One member id per organization' in str(e)
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,6 @@ def test_physical_properties():
|
||||||
'ethereum_address': None,
|
'ethereum_address': None,
|
||||||
'manufacturer': 'bar',
|
'manufacturer': 'bar',
|
||||||
'model': 'foo',
|
'model': 'foo',
|
||||||
'owner_id': pc.owner_id,
|
|
||||||
'receiver_id': None,
|
'receiver_id': None,
|
||||||
'serial_number': 'foo-bar',
|
'serial_number': 'foo-bar',
|
||||||
'transfer_state': TransferState.Initial
|
'transfer_state': TransferState.Initial
|
||||||
|
@ -138,6 +137,7 @@ def test_physical_properties():
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
|
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
|
||||||
def test_component_similar_one():
|
def test_component_similar_one():
|
||||||
|
user = User.query.filter().first()
|
||||||
snapshot = conftest.file('pc-components.db')
|
snapshot = conftest.file('pc-components.db')
|
||||||
pc = snapshot['device']
|
pc = snapshot['device']
|
||||||
snapshot['components'][0]['serial_number'] = snapshot['components'][1]['serial_number'] = None
|
snapshot['components'][0]['serial_number'] = snapshot['components'][1]['serial_number'] = None
|
||||||
|
@ -146,7 +146,8 @@ def test_component_similar_one():
|
||||||
db.session.add(pc)
|
db.session.add(pc)
|
||||||
db.session.flush()
|
db.session.flush()
|
||||||
# Let's create a new component named 'A' similar to 1
|
# Let's create a new component named 'A' similar to 1
|
||||||
componentA = d.Component(model=component1.model, manufacturer=component1.manufacturer)
|
componentA = d.Component(model=component1.model, manufacturer=component1.manufacturer,
|
||||||
|
owner_id=user.id)
|
||||||
similar_to_a = componentA.similar_one(pc, set())
|
similar_to_a = componentA.similar_one(pc, set())
|
||||||
assert similar_to_a == component1
|
assert similar_to_a == component1
|
||||||
# d.Component B does not have the same model
|
# d.Component B does not have the same model
|
||||||
|
@ -165,16 +166,17 @@ def test_add_remove():
|
||||||
# pc has c1 and c2
|
# pc has c1 and c2
|
||||||
# pc2 has c3
|
# pc2 has c3
|
||||||
# c4 is not with any pc
|
# c4 is not with any pc
|
||||||
|
user = User.query.filter().first()
|
||||||
values = conftest.file('pc-components.db')
|
values = conftest.file('pc-components.db')
|
||||||
pc = values['device']
|
pc = values['device']
|
||||||
c1, c2 = (d.Component(**c) for c in values['components'])
|
c1, c2 = (d.Component(**c) for c in values['components'])
|
||||||
pc = d.Desktop(**pc, components=OrderedSet([c1, c2]))
|
pc = d.Desktop(**pc, components=OrderedSet([c1, c2]))
|
||||||
db.session.add(pc)
|
db.session.add(pc)
|
||||||
c3 = d.Component(serial_number='nc1')
|
c3 = d.Component(serial_number='nc1', owner_id=user.id)
|
||||||
pc2 = d.Desktop(serial_number='s2',
|
pc2 = d.Desktop(serial_number='s2',
|
||||||
components=OrderedSet([c3]),
|
components=OrderedSet([c3]),
|
||||||
chassis=ComputerChassis.Microtower)
|
chassis=ComputerChassis.Microtower)
|
||||||
c4 = d.Component(serial_number='c4s')
|
c4 = d.Component(serial_number='c4s', owner_id=user.id)
|
||||||
db.session.add(pc2)
|
db.session.add(pc2)
|
||||||
db.session.add(c4)
|
db.session.add(c4)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
@ -313,14 +315,16 @@ def test_sync_execute_register_no_hid_tag_not_linked(tag_id: str):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
|
||||||
def test_sync_execute_register_tag_does_not_exist():
|
def test_sync_execute_register_tag_does_not_exist():
|
||||||
"""Ensures not being able to register if the tag does not exist,
|
"""Ensures not being able to register if the tag does not exist,
|
||||||
even if the device has HID or it existed before.
|
even if the device has HID or it existed before.
|
||||||
|
|
||||||
Tags have to be created before trying to link them through a Snapshot.
|
Tags have to be created before trying to link them through a Snapshot.
|
||||||
"""
|
"""
|
||||||
|
user = User.query.filter().first()
|
||||||
pc = d.Desktop(**conftest.file('pc-components.db')['device'], tags=OrderedSet([Tag('foo')]))
|
pc = d.Desktop(**conftest.file('pc-components.db')['device'], tags=OrderedSet([Tag('foo')]))
|
||||||
|
pc.owner_id = user.id
|
||||||
with raises(ResourceNotFound):
|
with raises(ResourceNotFound):
|
||||||
Sync().execute_register(pc)
|
Sync().execute_register(pc)
|
||||||
|
|
||||||
|
@ -401,8 +405,9 @@ def test_get_device(app: Devicehub, user: UserClient):
|
||||||
chassis=ComputerChassis.Tower,
|
chassis=ComputerChassis.Tower,
|
||||||
owner_id=user.user['id'])
|
owner_id=user.user['id'])
|
||||||
pc.components = OrderedSet([
|
pc.components = OrderedSet([
|
||||||
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s'),
|
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s',
|
||||||
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500)
|
owner_id=user.user['id']),
|
||||||
|
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500, owner_id=user.user['id'])
|
||||||
])
|
])
|
||||||
db.session.add(pc)
|
db.session.add(pc)
|
||||||
# todo test is an abstract class. replace with another one
|
# todo test is an abstract class. replace with another one
|
||||||
|
@ -438,8 +443,10 @@ def test_get_devices(app: Devicehub, user: UserClient):
|
||||||
chassis=ComputerChassis.Tower,
|
chassis=ComputerChassis.Tower,
|
||||||
owner_id=user.user['id'])
|
owner_id=user.user['id'])
|
||||||
pc.components = OrderedSet([
|
pc.components = OrderedSet([
|
||||||
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s'),
|
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s',
|
||||||
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500)
|
owner_id=user.user['id']),
|
||||||
|
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500,
|
||||||
|
owner_id=user.user['id'])
|
||||||
])
|
])
|
||||||
pc1 = d.Desktop(model='p2mo',
|
pc1 = d.Desktop(model='p2mo',
|
||||||
manufacturer='p2ma',
|
manufacturer='p2ma',
|
||||||
|
@ -461,17 +468,21 @@ def test_get_devices(app: Devicehub, user: UserClient):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserClient):
|
def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserClient,
|
||||||
|
client: Client):
|
||||||
"""Checks GETting a d.Desktop with its components."""
|
"""Checks GETting a d.Desktop with its components."""
|
||||||
|
|
||||||
user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
|
s, _ = user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
|
||||||
pc, res = user.get("/devices/1", None)
|
pc, res = user.get(res=d.Device, item=s['device']['id'])
|
||||||
assert res.status_code == 200
|
assert res.status_code == 200
|
||||||
assert len(pc['actions']) == 9
|
assert len(pc['actions']) == 9
|
||||||
|
|
||||||
pc2, res2 = user2.get("/devices/1", None)
|
html, _ = client.get(res=d.Device, item=s['device']['id'], accept=ANY)
|
||||||
|
assert 'intel atom cpu n270 @ 1.60ghz' in html
|
||||||
|
assert '00:24:8C:7F:CF:2D – 100 Mbps' in html
|
||||||
|
pc2, res2 = user2.get(res=d.Device, item=s['device']['id'], accept=ANY)
|
||||||
assert res2.status_code == 200
|
assert res2.status_code == 200
|
||||||
assert pc2 == {}
|
assert pc2 == html
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
|
@ -551,29 +562,30 @@ def test_device_public(user: UserClient, client: Client):
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||||
def test_computer_accessory_model():
|
def test_computer_accessory_model(user: UserClient):
|
||||||
sai = d.SAI()
|
sai = d.SAI(owner_id=user.user['id'])
|
||||||
db.session.add(sai)
|
db.session.add(sai)
|
||||||
keyboard = d.Keyboard(layout=Layouts.ES)
|
keyboard = d.Keyboard(layout=Layouts.ES, owner_id=user.user['id'])
|
||||||
db.session.add(keyboard)
|
db.session.add(keyboard)
|
||||||
mouse = d.Mouse()
|
mouse = d.Mouse(owner_id=user.user['id'])
|
||||||
db.session.add(mouse)
|
db.session.add(mouse)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||||
def test_networking_model():
|
def test_networking_model(user: UserClient):
|
||||||
router = d.Router(speed=1000, wireless=True)
|
router = d.Router(speed=1000, wireless=True, owner_id=user.user['id'])
|
||||||
db.session.add(router)
|
db.session.add(router)
|
||||||
switch = d.Switch(speed=1000, wireless=False)
|
switch = d.Switch(speed=1000, wireless=False, owner_id=user.user['id'])
|
||||||
db.session.add(switch)
|
db.session.add(switch)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||||
def test_cooking_mixer():
|
def test_cooking_mixer(user: UserClient):
|
||||||
mixer = d.Mixer(serial_number='foo', model='bar', manufacturer='foobar')
|
mixer = d.Mixer(serial_number='foo', model='bar', manufacturer='foobar',
|
||||||
|
owner_id=user.user['id'])
|
||||||
db.session.add(mixer)
|
db.session.add(mixer)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
import uuid
|
||||||
from teal.utils import compiled
|
from teal.utils import compiled
|
||||||
|
|
||||||
from ereuse_devicehub.client import UserClient
|
from ereuse_devicehub.client import UserClient
|
||||||
|
@ -185,6 +186,26 @@ def test_device_query(user: UserClient):
|
||||||
assert not pc['tags']
|
assert not pc['tags']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.mvp
|
||||||
|
def test_device_query_permitions(user: UserClient, user2: UserClient):
|
||||||
|
"""Checks result of inventory for two users"""
|
||||||
|
user.post(file('basic.snapshot'), res=Snapshot)
|
||||||
|
i, _ = user.get(res=Device)
|
||||||
|
pc1 = next(d for d in i['items'] if d['type'] == 'Desktop')
|
||||||
|
|
||||||
|
i2, _ = user2.get(res=Device)
|
||||||
|
assert i2['items'] == []
|
||||||
|
|
||||||
|
basic_snapshot = file('basic.snapshot')
|
||||||
|
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
|
||||||
|
user2.post(basic_snapshot, res=Snapshot)
|
||||||
|
i2, _ = user2.get(res=Device)
|
||||||
|
pc2 = next(d for d in i2['items'] if d['type'] == 'Desktop')
|
||||||
|
|
||||||
|
assert pc1['id'] != pc2['id']
|
||||||
|
assert pc1['hid'] == pc2['hid']
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
def test_device_search_all_devices_token_if_empty(app: Devicehub, user: UserClient):
|
def test_device_search_all_devices_token_if_empty(app: Devicehub, user: UserClient):
|
||||||
"""Ensures DeviceSearch can regenerate itself when the table is empty."""
|
"""Ensures DeviceSearch can regenerate itself when the table is empty."""
|
||||||
|
|
|
@ -4,6 +4,7 @@ from io import StringIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from werkzeug.exceptions import Unauthorized
|
||||||
import teal.marshmallow
|
import teal.marshmallow
|
||||||
from ereuse_utils.test import ANY
|
from ereuse_utils.test import ANY
|
||||||
|
|
||||||
|
@ -79,6 +80,29 @@ def test_erasure_certificate_wrong_id(client: Client):
|
||||||
status=teal.marshmallow.ValidationError)
|
status=teal.marshmallow.ValidationError)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.mvp
|
||||||
|
def test_export_csv_permitions(user: UserClient, user2: UserClient, client: Client):
|
||||||
|
"""Test export device information in a csv file with others users."""
|
||||||
|
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
|
||||||
|
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
||||||
|
item='devices/',
|
||||||
|
accept='text/csv',
|
||||||
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
|
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
|
||||||
|
item='devices/',
|
||||||
|
accept='text/csv',
|
||||||
|
query=[('filter', {'type': ['Computer']})])
|
||||||
|
|
||||||
|
_, res = client.get(res=documents.DocumentDef.t,
|
||||||
|
item='devices/',
|
||||||
|
accept='text/csv',
|
||||||
|
query=[('filter', {'type': ['Computer']})], status=401)
|
||||||
|
assert res.status_code == 401
|
||||||
|
|
||||||
|
assert len(csv_user) > 0
|
||||||
|
assert len(csv_user2) == 0
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
def test_export_basic_snapshot(user: UserClient):
|
def test_export_basic_snapshot(user: UserClient):
|
||||||
"""Test export device information in a csv file."""
|
"""Test export device information in a csv file."""
|
||||||
|
|
|
@ -2,6 +2,7 @@ import os
|
||||||
import json
|
import json
|
||||||
import shutil
|
import shutil
|
||||||
import pytest
|
import pytest
|
||||||
|
import uuid
|
||||||
|
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from requests.exceptions import HTTPError
|
from requests.exceptions import HTTPError
|
||||||
|
@ -104,6 +105,24 @@ def test_snapshot_post(user: UserClient):
|
||||||
assert rate['snapshot']['id'] == snapshot['id']
|
assert rate['snapshot']['id'] == snapshot['id']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.mvp
|
||||||
|
def test_same_device_tow_users(user: UserClient, user2: UserClient):
|
||||||
|
"""Two users can up the same snapshot and the system save 2 computers"""
|
||||||
|
user.post(file('basic.snapshot'), res=Snapshot)
|
||||||
|
i, _ = user.get(res=m.Device)
|
||||||
|
pc = next(d for d in i['items'] if d['type'] == 'Desktop')
|
||||||
|
pc_id = pc['id']
|
||||||
|
assert i['items'][0]['url'] == f'/devices/{pc_id}'
|
||||||
|
|
||||||
|
basic_snapshot = file('basic.snapshot')
|
||||||
|
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
|
||||||
|
user2.post(basic_snapshot, res=Snapshot)
|
||||||
|
i2, _ = user2.get(res=m.Device)
|
||||||
|
pc2 = next(d for d in i2['items'] if d['type'] == 'Desktop')
|
||||||
|
assert pc['id'] != pc2['id']
|
||||||
|
assert pc['ownerID'] != pc2['ownerID']
|
||||||
|
assert pc['hid'] == pc2['hid']
|
||||||
|
|
||||||
@pytest.mark.mvp
|
@pytest.mark.mvp
|
||||||
def test_snapshot_update_timefield_updated(user: UserClient):
|
def test_snapshot_update_timefield_updated(user: UserClient):
|
||||||
"""
|
"""
|
||||||
|
@ -253,7 +272,9 @@ def test_snapshot_component_add_remove(user: UserClient):
|
||||||
assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s', 'p1c4s'}
|
assert {c['serialNumber'] for c in pc1['components']} == {'p1c3s', 'p1c4s'}
|
||||||
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
assert all(c['parent'] == pc1_id for c in pc1['components'])
|
||||||
# This last Action only
|
# This last Action only
|
||||||
assert get_actions_info(pc1['actions'])[-1] == ('RateComputer', ['p1c3s', 'p1c4s'])
|
act = get_actions_info(pc1['actions'])[-1]
|
||||||
|
assert 'RateComputer' in act
|
||||||
|
assert set(act[1]) == {'p1c3s', 'p1c4s'}
|
||||||
# PC2
|
# PC2
|
||||||
# We haven't changed PC2
|
# We haven't changed PC2
|
||||||
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',)
|
assert tuple(c['serialNumber'] for c in pc2['components']) == ('p2c1s',)
|
||||||
|
|
Reference in New Issue