resolve conflicts
This commit is contained in:
commit
729281383e
|
@ -0,0 +1,17 @@
|
|||
# Changelog
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.ht
|
||||
ml).
|
||||
|
||||
## master
|
||||
[1.0.1-beta]
|
||||
|
||||
## testing
|
||||
[1.0.2-beta]
|
||||
|
||||
## [1.0.2-beta]
|
||||
- [addend] #87 allocate, deallocate and live actions
|
||||
- [fixed] #89 save json on disk only for shapshots
|
||||
- [addend] #83 add owner_id in all kind of device
|
|
@ -0,0 +1,67 @@
|
|||
"""adding owner_id in device
|
||||
|
||||
Revision ID: 68a5c025ab8e
|
||||
Revises: b9b0ee7d9dca
|
||||
Create Date: 2020-10-30 11:48:34.992498
|
||||
|
||||
"""
|
||||
import sqlalchemy as sa
|
||||
from alembic import context
|
||||
from alembic import op
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '68a5c025ab8e'
|
||||
down_revision = 'e93aec8fc41f'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def get_inv():
|
||||
INV = context.get_x_argument(as_dictionary=True).get('inventory')
|
||||
if not INV:
|
||||
raise ValueError("Inventory value is not specified")
|
||||
return INV
|
||||
|
||||
|
||||
def upgrade_data():
|
||||
con = op.get_bind()
|
||||
computers = con.execute(f"select id, owner_id from {get_inv()}.computer")
|
||||
for c in computers:
|
||||
id_dev = c.id
|
||||
id_owner = c.owner_id
|
||||
sql = f"update {get_inv()}.device set owner_id='{id_owner}' where id={id_dev};"
|
||||
con.execute(sql)
|
||||
|
||||
values = f"{get_inv()}.component.id, {get_inv()}.computer.owner_id"
|
||||
table = f"{get_inv()}.component"
|
||||
joins = f"inner join {get_inv()}.computer"
|
||||
on = f"on {table}.parent_id={get_inv()}.computer.id"
|
||||
sql = f"select {values} from {table} {joins} {on}"
|
||||
|
||||
components = con.execute(sql)
|
||||
for c in components:
|
||||
id = c.id
|
||||
id_owner = c.owner_id
|
||||
sql = f"update {get_inv()}.device set owner_id='{id_owner}' where id={id};"
|
||||
con.execute(sql)
|
||||
|
||||
|
||||
def upgrade():
|
||||
# We need get the actual computers with owner_id
|
||||
# because when add a column in device this reset the values of the owner_id
|
||||
# in the computer tables
|
||||
op.add_column('device', sa.Column('owner_id', postgresql.UUID(),
|
||||
nullable=True), schema=f'{get_inv()}')
|
||||
op.create_foreign_key("fk_device_owner_id_user_id",
|
||||
"device", "user",
|
||||
["owner_id"], ["id"],
|
||||
ondelete="SET NULL",
|
||||
source_schema=f'{get_inv()}', referent_schema='common')
|
||||
|
||||
upgrade_data()
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_constraint("fk_device_owner_id_user_id", "device", type_="foreignkey", schema=f'{get_inv()}')
|
||||
op.drop_column('device', 'owner_id', schema=f'{get_inv()}')
|
|
@ -260,13 +260,13 @@ class ActionView(View):
|
|||
|
||||
def live(self, snapshot):
|
||||
"""If the device.allocated == True, then this snapshot create an action live."""
|
||||
# TODO @cayop dependency of pulls 83
|
||||
# if the pr/83 is merged, then you need change this way for get the device
|
||||
hid = self.get_hid(snapshot)
|
||||
if not hid or not Device.query.filter(Device.hid==hid).count():
|
||||
if not hid or not Device.query.filter(
|
||||
Device.hid==hid, Device.owner_id==g.user.id).count():
|
||||
return None
|
||||
|
||||
device = Device.query.filter(Device.hid==hid).one()
|
||||
device = Device.query.filter(
|
||||
Device.hid==hid, Device.owner_id==g.user.id).one()
|
||||
|
||||
if not device.allocated:
|
||||
return None
|
||||
|
|
|
@ -32,6 +32,7 @@ from ereuse_devicehub.resources.models import STR_SM_SIZE, Thing, listener_reset
|
|||
from ereuse_devicehub.resources.user.models import User
|
||||
|
||||
|
||||
|
||||
class Device(Thing):
|
||||
"""Base class for any type of physical object that can be identified.
|
||||
|
||||
|
@ -106,6 +107,11 @@ class Device(Thing):
|
|||
image = db.Column(db.URL)
|
||||
image.comment = "An image of the device."
|
||||
|
||||
owner_id = db.Column(UUID(as_uuid=True),
|
||||
db.ForeignKey(User.id),
|
||||
nullable=False,
|
||||
default=lambda: g.user.id)
|
||||
owner = db.relationship(User, primaryjoin=owner_id == User.id)
|
||||
allocated = db.Column(Boolean, default=False)
|
||||
allocated.comment = "device is allocated or not."
|
||||
|
||||
|
@ -115,6 +121,7 @@ class Device(Thing):
|
|||
'created',
|
||||
'updated',
|
||||
'parent_id',
|
||||
'owner_id',
|
||||
'hid',
|
||||
'production_date',
|
||||
'color', # these are only user-input thus volatile
|
||||
|
@ -613,7 +620,8 @@ class Component(Device):
|
|||
"""
|
||||
assert self.hid is None, 'Don\'t use this method with a component that has HID'
|
||||
component = self.__class__.query \
|
||||
.filter_by(parent=parent, hid=None, **self.physical_properties) \
|
||||
.filter_by(parent=parent, hid=None, owner_id=self.owner_id,
|
||||
**self.physical_properties) \
|
||||
.filter(~Component.id.in_(blacklist)) \
|
||||
.first()
|
||||
if not component:
|
||||
|
|
|
@ -4,6 +4,7 @@ from itertools import groupby
|
|||
from typing import Iterable, Set
|
||||
|
||||
import yaml
|
||||
from flask import g
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.util import OrderedSet
|
||||
|
@ -101,7 +102,7 @@ class Sync:
|
|||
assert inspect(component).transient, 'Component should not be synced from DB'
|
||||
try:
|
||||
if component.hid:
|
||||
db_component = Device.query.filter_by(hid=component.hid).one()
|
||||
db_component = Device.query.filter_by(hid=component.hid, owner_id=g.user.id).one()
|
||||
assert isinstance(db_component, Device), \
|
||||
'{} must be a component'.format(db_component)
|
||||
else:
|
||||
|
@ -153,7 +154,7 @@ class Sync:
|
|||
db_device = None
|
||||
if device.hid:
|
||||
with suppress(ResourceNotFound):
|
||||
db_device = Device.query.filter_by(hid=device.hid).one()
|
||||
db_device = Device.query.filter_by(hid=device.hid, owner_id=g.user.id).one()
|
||||
if db_device and db_device.allocated:
|
||||
raise ResourceNotFound('device is actually allocated {}'.format(device))
|
||||
try:
|
||||
|
@ -204,6 +205,9 @@ class Sync:
|
|||
|
||||
This method mutates db_device.
|
||||
"""
|
||||
if db_device.owner_id != g.user.id:
|
||||
return
|
||||
|
||||
for field_name, value in device.physical_properties.items():
|
||||
if value is not None:
|
||||
setattr(db_device, field_name, value)
|
||||
|
@ -234,8 +238,11 @@ class Sync:
|
|||
return component.parent or Device(id=0) # Computer with id 0 is our Identity
|
||||
|
||||
for parent, _components in groupby(sorted(adding, key=g_parent), key=g_parent):
|
||||
if parent.id != 0: # Is not Computer Identity
|
||||
actions.add(Remove(device=parent, components=OrderedSet(_components)))
|
||||
set_components = OrderedSet(_components)
|
||||
check_owners = (x.owner_id == g.user.id for x in set_components)
|
||||
# Is not Computer Identity and all components have the correct owner
|
||||
if parent.id != 0 and all(check_owners):
|
||||
actions.add(Remove(device=parent, components=set_components))
|
||||
return actions
|
||||
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ class DeviceView(View):
|
|||
return super().get(id)
|
||||
|
||||
def patch(self, id):
|
||||
dev = Device.query.filter_by(id=id).one()
|
||||
dev = Device.query.filter_by(id=id, owner_id=g.user.id).one()
|
||||
if isinstance(dev, Computer):
|
||||
resource_def = app.resources['Computer']
|
||||
# TODO check how to handle the 'actions_one'
|
||||
|
@ -131,9 +131,9 @@ class DeviceView(View):
|
|||
|
||||
@auth.Auth.requires_auth
|
||||
def one_private(self, id: int):
|
||||
device = Device.query.filter_by(id=id).one()
|
||||
if hasattr(device, 'owner_id') and device.owner_id != g.user.id:
|
||||
device = {}
|
||||
device = Device.query.filter_by(id=id, owner_id=g.user.id).first()
|
||||
if not device:
|
||||
return self.one_public(id)
|
||||
return self.schema.jsonify(device)
|
||||
|
||||
@auth.Auth.requires_auth
|
||||
|
@ -149,7 +149,7 @@ class DeviceView(View):
|
|||
)
|
||||
|
||||
def query(self, args):
|
||||
query = Device.query.distinct() # todo we should not force to do this if the query is ok
|
||||
query = Device.query.filter((Device.owner_id == g.user.id)).distinct()
|
||||
search_p = args.get('search', None)
|
||||
if search_p:
|
||||
properties = DeviceSearch.properties
|
||||
|
@ -159,17 +159,8 @@ class DeviceView(View):
|
|||
).order_by(
|
||||
search.Search.rank(properties, search_p) + search.Search.rank(tags, search_p)
|
||||
)
|
||||
query = self.visibility_filter(query)
|
||||
return query.filter(*args['filter']).order_by(*args['sort'])
|
||||
|
||||
def visibility_filter(self, query):
|
||||
filterqs = request.args.get('filter', None)
|
||||
if (filterqs and
|
||||
'lot' not in filterqs):
|
||||
query = query.filter((Computer.id == Device.id), (Computer.owner_id == g.user.id))
|
||||
pass
|
||||
return query
|
||||
|
||||
|
||||
class DeviceMergeView(View):
|
||||
"""View for merging two devices
|
||||
|
@ -194,8 +185,13 @@ class DeviceMergeView(View):
|
|||
many models in session.
|
||||
"""
|
||||
# base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
|
||||
self.base_device = Device.query.filter_by(id=dev1_id).one()
|
||||
self.with_device = Device.query.filter_by(id=dev2_id).one()
|
||||
self.base_device = Device.query.filter_by(id=dev1_id, owner_id=g.user.id).one()
|
||||
self.with_device = Device.query.filter_by(id=dev2_id, owner_id=g.user.id).one()
|
||||
|
||||
if self.base_device.allocated or self.with_device.allocated:
|
||||
# Validation than any device is allocated
|
||||
msg = 'The device is allocated, please deallocated before merge.'
|
||||
raise ValidationError(msg)
|
||||
|
||||
if not self.base_device.type == self.with_device.type:
|
||||
# Validation than we are speaking of the same kind of devices
|
||||
|
|
|
@ -3,7 +3,7 @@ from uuid import UUID
|
|||
import pytest
|
||||
from marshmallow import ValidationError
|
||||
from sqlalchemy_utils import PhoneNumber
|
||||
from teal.db import UniqueViolation
|
||||
from teal.db import UniqueViolation, DBError
|
||||
from teal.enums import Country
|
||||
|
||||
from ereuse_devicehub.config import DevicehubConfig
|
||||
|
@ -80,7 +80,7 @@ def test_membership_repeated():
|
|||
db.session.add(person)
|
||||
|
||||
person.member_of.add(Membership(org, person))
|
||||
with pytest.raises(UniqueViolation):
|
||||
with pytest.raises(DBError):
|
||||
db.session.flush()
|
||||
|
||||
|
||||
|
@ -95,7 +95,7 @@ def test_membership_repeating_id():
|
|||
person2 = Person(name='Tommy')
|
||||
person2.member_of.add(Membership(org, person2, id='acme-1'))
|
||||
db.session.add(person2)
|
||||
with pytest.raises(UniqueViolation) as e:
|
||||
with pytest.raises(DBError) as e:
|
||||
db.session.flush()
|
||||
assert 'One member id per organization' in str(e)
|
||||
|
||||
|
|
|
@ -130,7 +130,6 @@ def test_physical_properties():
|
|||
'ethereum_address': None,
|
||||
'manufacturer': 'bar',
|
||||
'model': 'foo',
|
||||
'owner_id': pc.owner_id,
|
||||
'receiver_id': None,
|
||||
'serial_number': 'foo-bar',
|
||||
'transfer_state': TransferState.Initial
|
||||
|
@ -140,6 +139,7 @@ def test_physical_properties():
|
|||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
|
||||
def test_component_similar_one():
|
||||
user = User.query.filter().first()
|
||||
snapshot = conftest.file('pc-components.db')
|
||||
pc = snapshot['device']
|
||||
snapshot['components'][0]['serial_number'] = snapshot['components'][1]['serial_number'] = None
|
||||
|
@ -148,7 +148,8 @@ def test_component_similar_one():
|
|||
db.session.add(pc)
|
||||
db.session.flush()
|
||||
# Let's create a new component named 'A' similar to 1
|
||||
componentA = d.Component(model=component1.model, manufacturer=component1.manufacturer)
|
||||
componentA = d.Component(model=component1.model, manufacturer=component1.manufacturer,
|
||||
owner_id=user.id)
|
||||
similar_to_a = componentA.similar_one(pc, set())
|
||||
assert similar_to_a == component1
|
||||
# d.Component B does not have the same model
|
||||
|
@ -167,16 +168,17 @@ def test_add_remove():
|
|||
# pc has c1 and c2
|
||||
# pc2 has c3
|
||||
# c4 is not with any pc
|
||||
user = User.query.filter().first()
|
||||
values = conftest.file('pc-components.db')
|
||||
pc = values['device']
|
||||
c1, c2 = (d.Component(**c) for c in values['components'])
|
||||
pc = d.Desktop(**pc, components=OrderedSet([c1, c2]))
|
||||
db.session.add(pc)
|
||||
c3 = d.Component(serial_number='nc1')
|
||||
c3 = d.Component(serial_number='nc1', owner_id=user.id)
|
||||
pc2 = d.Desktop(serial_number='s2',
|
||||
components=OrderedSet([c3]),
|
||||
chassis=ComputerChassis.Microtower)
|
||||
c4 = d.Component(serial_number='c4s')
|
||||
c4 = d.Component(serial_number='c4s', owner_id=user.id)
|
||||
db.session.add(pc2)
|
||||
db.session.add(c4)
|
||||
db.session.commit()
|
||||
|
@ -315,14 +317,16 @@ def test_sync_execute_register_no_hid_tag_not_linked(tag_id: str):
|
|||
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
@pytest.mark.usefixtures(conftest.auth_app_context.__name__)
|
||||
def test_sync_execute_register_tag_does_not_exist():
|
||||
"""Ensures not being able to register if the tag does not exist,
|
||||
even if the device has HID or it existed before.
|
||||
|
||||
Tags have to be created before trying to link them through a Snapshot.
|
||||
"""
|
||||
user = User.query.filter().first()
|
||||
pc = d.Desktop(**conftest.file('pc-components.db')['device'], tags=OrderedSet([Tag('foo')]))
|
||||
pc.owner_id = user.id
|
||||
with raises(ResourceNotFound):
|
||||
Sync().execute_register(pc)
|
||||
|
||||
|
@ -403,8 +407,9 @@ def test_get_device(app: Devicehub, user: UserClient):
|
|||
chassis=ComputerChassis.Tower,
|
||||
owner_id=user.user['id'])
|
||||
pc.components = OrderedSet([
|
||||
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s'),
|
||||
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500)
|
||||
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s',
|
||||
owner_id=user.user['id']),
|
||||
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500, owner_id=user.user['id'])
|
||||
])
|
||||
db.session.add(pc)
|
||||
# todo test is an abstract class. replace with another one
|
||||
|
@ -440,8 +445,10 @@ def test_get_devices(app: Devicehub, user: UserClient):
|
|||
chassis=ComputerChassis.Tower,
|
||||
owner_id=user.user['id'])
|
||||
pc.components = OrderedSet([
|
||||
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s'),
|
||||
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500)
|
||||
d.NetworkAdapter(model='c1mo', manufacturer='c1ma', serial_number='c1s',
|
||||
owner_id=user.user['id']),
|
||||
d.GraphicCard(model='c2mo', manufacturer='c2ma', memory=1500,
|
||||
owner_id=user.user['id'])
|
||||
])
|
||||
pc1 = d.Desktop(model='p2mo',
|
||||
manufacturer='p2ma',
|
||||
|
@ -463,17 +470,21 @@ def test_get_devices(app: Devicehub, user: UserClient):
|
|||
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserClient):
|
||||
def test_get_device_permissions(app: Devicehub, user: UserClient, user2: UserClient,
|
||||
client: Client):
|
||||
"""Checks GETting a d.Desktop with its components."""
|
||||
|
||||
user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
|
||||
pc, res = user.get("/devices/1", None)
|
||||
s, _ = user.post(file('asus-eee-1000h.snapshot.11'), res=m.Snapshot)
|
||||
pc, res = user.get(res=d.Device, item=s['device']['id'])
|
||||
assert res.status_code == 200
|
||||
assert len(pc['actions']) == 9
|
||||
|
||||
pc2, res2 = user2.get("/devices/1", None)
|
||||
html, _ = client.get(res=d.Device, item=s['device']['id'], accept=ANY)
|
||||
assert 'intel atom cpu n270 @ 1.60ghz' in html
|
||||
assert '00:24:8C:7F:CF:2D – 100 Mbps' in html
|
||||
pc2, res2 = user2.get(res=d.Device, item=s['device']['id'], accept=ANY)
|
||||
assert res2.status_code == 200
|
||||
assert pc2 == {}
|
||||
assert pc2 == html
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
|
@ -553,29 +564,30 @@ def test_device_public(user: UserClient, client: Client):
|
|||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_computer_accessory_model():
|
||||
sai = d.SAI()
|
||||
def test_computer_accessory_model(user: UserClient):
|
||||
sai = d.SAI(owner_id=user.user['id'])
|
||||
db.session.add(sai)
|
||||
keyboard = d.Keyboard(layout=Layouts.ES)
|
||||
keyboard = d.Keyboard(layout=Layouts.ES, owner_id=user.user['id'])
|
||||
db.session.add(keyboard)
|
||||
mouse = d.Mouse()
|
||||
mouse = d.Mouse(owner_id=user.user['id'])
|
||||
db.session.add(mouse)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_networking_model():
|
||||
router = d.Router(speed=1000, wireless=True)
|
||||
def test_networking_model(user: UserClient):
|
||||
router = d.Router(speed=1000, wireless=True, owner_id=user.user['id'])
|
||||
db.session.add(router)
|
||||
switch = d.Switch(speed=1000, wireless=False)
|
||||
switch = d.Switch(speed=1000, wireless=False, owner_id=user.user['id'])
|
||||
db.session.add(switch)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(conftest.app_context.__name__)
|
||||
def test_cooking_mixer():
|
||||
mixer = d.Mixer(serial_number='foo', model='bar', manufacturer='foobar')
|
||||
def test_cooking_mixer(user: UserClient):
|
||||
mixer = d.Mixer(serial_number='foo', model='bar', manufacturer='foobar',
|
||||
owner_id=user.user['id'])
|
||||
db.session.add(mixer)
|
||||
db.session.commit()
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pytest
|
||||
import uuid
|
||||
from teal.utils import compiled
|
||||
|
||||
from ereuse_devicehub.client import UserClient
|
||||
|
@ -185,6 +186,26 @@ def test_device_query(user: UserClient):
|
|||
assert not pc['tags']
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_device_query_permitions(user: UserClient, user2: UserClient):
|
||||
"""Checks result of inventory for two users"""
|
||||
user.post(file('basic.snapshot'), res=Snapshot)
|
||||
i, _ = user.get(res=Device)
|
||||
pc1 = next(d for d in i['items'] if d['type'] == 'Desktop')
|
||||
|
||||
i2, _ = user2.get(res=Device)
|
||||
assert i2['items'] == []
|
||||
|
||||
basic_snapshot = file('basic.snapshot')
|
||||
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
|
||||
user2.post(basic_snapshot, res=Snapshot)
|
||||
i2, _ = user2.get(res=Device)
|
||||
pc2 = next(d for d in i2['items'] if d['type'] == 'Desktop')
|
||||
|
||||
assert pc1['id'] != pc2['id']
|
||||
assert pc1['hid'] == pc2['hid']
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_device_search_all_devices_token_if_empty(app: Devicehub, user: UserClient):
|
||||
"""Ensures DeviceSearch can regenerate itself when the table is empty."""
|
||||
|
|
|
@ -4,6 +4,7 @@ from io import StringIO
|
|||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
import teal.marshmallow
|
||||
from ereuse_utils.test import ANY
|
||||
|
||||
|
@ -79,6 +80,29 @@ def test_erasure_certificate_wrong_id(client: Client):
|
|||
status=teal.marshmallow.ValidationError)
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_export_csv_permitions(user: UserClient, user2: UserClient, client: Client):
|
||||
"""Test export device information in a csv file with others users."""
|
||||
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
|
||||
csv_user, _ = user.get(res=documents.DocumentDef.t,
|
||||
item='devices/',
|
||||
accept='text/csv',
|
||||
query=[('filter', {'type': ['Computer']})])
|
||||
|
||||
csv_user2, _ = user2.get(res=documents.DocumentDef.t,
|
||||
item='devices/',
|
||||
accept='text/csv',
|
||||
query=[('filter', {'type': ['Computer']})])
|
||||
|
||||
_, res = client.get(res=documents.DocumentDef.t,
|
||||
item='devices/',
|
||||
accept='text/csv',
|
||||
query=[('filter', {'type': ['Computer']})], status=401)
|
||||
assert res.status_code == 401
|
||||
|
||||
assert len(csv_user) > 0
|
||||
assert len(csv_user2) == 0
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_export_basic_snapshot(user: UserClient):
|
||||
"""Test export device information in a csv file."""
|
||||
|
|
|
@ -2,6 +2,7 @@ import os
|
|||
import json
|
||||
import shutil
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from requests.exceptions import HTTPError
|
||||
|
@ -104,6 +105,24 @@ def test_snapshot_post(user: UserClient):
|
|||
assert rate['snapshot']['id'] == snapshot['id']
|
||||
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_same_device_tow_users(user: UserClient, user2: UserClient):
|
||||
"""Two users can up the same snapshot and the system save 2 computers"""
|
||||
user.post(file('basic.snapshot'), res=Snapshot)
|
||||
i, _ = user.get(res=m.Device)
|
||||
pc = next(d for d in i['items'] if d['type'] == 'Desktop')
|
||||
pc_id = pc['id']
|
||||
assert i['items'][0]['url'] == f'/devices/{pc_id}'
|
||||
|
||||
basic_snapshot = file('basic.snapshot')
|
||||
basic_snapshot['uuid'] = f"{uuid.uuid4()}"
|
||||
user2.post(basic_snapshot, res=Snapshot)
|
||||
i2, _ = user2.get(res=m.Device)
|
||||
pc2 = next(d for d in i2['items'] if d['type'] == 'Desktop')
|
||||
assert pc['id'] != pc2['id']
|
||||
assert pc['ownerID'] != pc2['ownerID']
|
||||
assert pc['hid'] == pc2['hid']
|
||||
|
||||
@pytest.mark.mvp
|
||||
def test_snapshot_update_timefield_updated(user: UserClient):
|
||||
"""
|
||||
|
|
Reference in New Issue