resolve conflict

This commit is contained in:
Cayo Puigdefabregas 2021-03-26 11:04:04 +01:00
commit ee55020f3c
17 changed files with 498 additions and 39 deletions

View file

@ -13,6 +13,7 @@ ml).
## [1.0.5-beta] ## [1.0.5-beta]
- [addend] #124 adding endpoint for extract the internal stats of use - [addend] #124 adding endpoint for extract the internal stats of use
- [addend] #122 system for verify all documents that it's produced from devicehub
## [1.0.4-beta] ## [1.0.4-beta]
- [addend] #95 adding endpoint for check the hash of one report - [addend] #95 adding endpoint for check the hash of one report

View file

@ -1 +1 @@
__version__ = "1.0.4-beta" __version__ = "1.0.5-beta"

View file

@ -0,0 +1,62 @@
"""drop unique org for tag
Revision ID: 6a2a939d5668
Revises: eca457d8b2a4
Create Date: 2021-02-25 18:47:47.441195
"""
from alembic import op
import sqlalchemy as sa
from alembic import context
# revision identifiers, used by Alembic.
revision = '6a2a939d5668'
down_revision = 'eca457d8b2a4'
branch_labels = None
depends_on = None
def get_inv():
INV = context.get_x_argument(as_dictionary=True).get('inventory')
if not INV:
raise ValueError("Inventory value is not specified")
return INV
def upgrade_data():
con = op.get_bind()
tags = con.execute(f"select id from {get_inv()}.tag")
i = 1
for c in tags:
id_tag = c.id
internal_id = i
i += 1
sql = f"update {get_inv()}.tag set internal_id='{internal_id}' where id='{id_tag}';"
con.execute(sql)
sql = f"CREATE SEQUENCE {get_inv()}.tag_internal_id_seq START {i};"
con.execute(sql)
def upgrade():
op.drop_constraint('one tag id per organization', 'tag', schema=f'{get_inv()}')
op.drop_constraint('one secondary tag per organization', 'tag', schema=f'{get_inv()}')
op.create_primary_key('one tag id per owner', 'tag', ['id', 'owner_id'], schema=f'{get_inv()}'),
op.create_unique_constraint('one secondary tag per owner', 'tag', ['secondary', 'owner_id'], schema=f'{get_inv()}'),
op.add_column('tag', sa.Column('internal_id', sa.BigInteger(), nullable=True,
comment='The identifier of the tag for this database. Used only\n internally for software; users should not use this.\n'), schema=f'{get_inv()}')
upgrade_data()
op.alter_column('tag', sa.Column('internal_id', sa.BigInteger(), nullable=False,
comment='The identifier of the tag for this database. Used only\n internally for software; users should not use this.\n'), schema=f'{get_inv()}')
def downgrade():
op.drop_constraint('one tag id per owner', 'tag', schema=f'{get_inv()}')
op.drop_constraint('one secondary tag per owner', 'tag', schema=f'{get_inv()}')
op.create_primary_key('one tag id per organization', 'tag', ['id', 'org_id'], schema=f'{get_inv()}'),
op.create_unique_constraint('one secondary tag per organization', 'tag', ['secondary', 'org_id'], schema=f'{get_inv()}'),
op.drop_column('tag', 'internal_id', schema=f'{get_inv()}')
op.execute(f"DROP SEQUENCE {get_inv()}.tag_internal_id_seq;")

View file

@ -250,6 +250,11 @@ class MakeAvailable(ActionDef):
SCHEMA = schemas.MakeAvailable SCHEMA = schemas.MakeAvailable
class TradeDef(ActionDef):
VIEW = None
SCHEMA = schemas.Trade
class CancelTradeDef(ActionDef): class CancelTradeDef(ActionDef):
VIEW = None VIEW = None
SCHEMA = schemas.CancelTrade SCHEMA = schemas.CancelTrade

View file

@ -50,8 +50,7 @@ class DeviceRow(OrderedDict):
self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = '' self['Tag 2 Type'] = self['Tag 2 ID'] = self['Tag 2 Organization'] = ''
self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = '' self['Tag 3 Type'] = self['Tag 3 ID'] = self['Tag 3 Organization'] = ''
for i, tag in zip(range(1, 3), device.tags): for i, tag in zip(range(1, 3), device.tags):
# TODO @cayop we need redefined how save the Tag Type info self['Tag {} Type'.format(i)] = 'unamed' if tag.provider else 'named'
self['Tag {} Type'.format(i)] = 'unamed'
self['Tag {} ID'.format(i)] = tag.id self['Tag {} ID'.format(i)] = tag.id
self['Tag {} Organization'.format(i)] = tag.org.name self['Tag {} Organization'.format(i)] = tag.org.name

View file

@ -27,7 +27,7 @@ from ereuse_devicehub.resources.documents.device_row import (DeviceRow, StockRow
InternalStatsRow) InternalStatsRow)
from ereuse_devicehub.resources.lot import LotView from ereuse_devicehub.resources.lot import LotView
from ereuse_devicehub.resources.lot.models import Lot from ereuse_devicehub.resources.lot.models import Lot
from ereuse_devicehub.resources.hash_reports import insert_hash, ReportHash from ereuse_devicehub.resources.hash_reports import insert_hash, ReportHash, verify_hash
class Format(enum.Enum): class Format(enum.Enum):
@ -80,6 +80,7 @@ class DocumentView(DeviceView):
res = flask_weasyprint.render_pdf( res = flask_weasyprint.render_pdf(
flask_weasyprint.HTML(string=template), download_filename='{}.pdf'.format(type) flask_weasyprint.HTML(string=template), download_filename='{}.pdf'.format(type)
) )
insert_hash(res.data)
else: else:
res = flask.make_response(template) res = flask.make_response(template)
return res return res
@ -186,7 +187,9 @@ class LotsDocumentView(LotView):
cw.writerow(l.keys()) cw.writerow(l.keys())
first = False first = False
cw.writerow(l.values()) cw.writerow(l.values())
output = make_response(data.getvalue()) bfile = data.getvalue().encode('utf-8')
output = make_response(bfile)
insert_hash(bfile)
output.headers['Content-Disposition'] = 'attachment; filename=lots-info.csv' output.headers['Content-Disposition'] = 'attachment; filename=lots-info.csv'
output.headers['Content-type'] = 'text/csv' output.headers['Content-type'] = 'text/csv'
return output return output
@ -223,7 +226,9 @@ class StockDocumentView(DeviceView):
cw.writerow(d.keys()) cw.writerow(d.keys())
first = False first = False
cw.writerow(d.values()) cw.writerow(d.values())
output = make_response(data.getvalue()) bfile = data.getvalue().encode('utf-8')
output = make_response(bfile)
insert_hash(bfile)
output.headers['Content-Disposition'] = 'attachment; filename=devices-stock.csv' output.headers['Content-Disposition'] = 'attachment; filename=devices-stock.csv'
output.headers['Content-type'] = 'text/csv' output.headers['Content-type'] = 'text/csv'
return output return output
@ -247,12 +252,32 @@ class StampsView(View):
This view render one public ans static page for see the links for to do the check This view render one public ans static page for see the links for to do the check
of one csv file of one csv file
""" """
def get(self): def get_url_path(self):
url = urlutils.URL(request.url) url = urlutils.URL(request.url)
url.normalize() url.normalize()
url.path_parts = url.path_parts[:-2] + ['check', ''] url.path_parts = url.path_parts[:-2] + ['check', '']
url_path = url.to_text() return url.to_text()
return flask.render_template('documents/stamp.html', rq_url=url_path)
def get(self):
result = ('', '')
return flask.render_template('documents/stamp.html', rq_url=self.get_url_path(),
result=result)
def post(self):
result = ('', '')
if 'docUpload' in request.files:
file_check = request.files['docUpload']
bad = 'There are no coincidences. The attached file data does not come \
from our backend or it has been subsequently modified.'
ok = '100% coincidence. The attached file contains data 100% existing in \
to our backend'
result = ('Bad', bad)
if file_check.mimetype in ['text/csv', 'application/pdf']:
if verify_hash(file_check):
result = ('Ok', ok)
return flask.render_template('documents/stamp.html', rq_url=self.get_url_path(),
result=result)
class InternalStatsView(DeviceView): class InternalStatsView(DeviceView):
@ -345,7 +370,7 @@ class DocumentDef(Resource):
self.add_url_rule('/check/', defaults={}, view_func=check_view, methods=get) self.add_url_rule('/check/', defaults={}, view_func=check_view, methods=get)
stamps_view = StampsView.as_view('StampsView', definition=self, auth=app.auth) stamps_view = StampsView.as_view('StampsView', definition=self, auth=app.auth)
self.add_url_rule('/stamps/', defaults={}, view_func=stamps_view, methods=get) self.add_url_rule('/stamps/', defaults={}, view_func=stamps_view, methods={'GET', 'POST'})
internalstats_view = InternalStatsView.as_view( internalstats_view = InternalStatsView.as_view(
'InternalStatsView', definition=self, auth=app.auth) 'InternalStatsView', definition=self, auth=app.auth)

View file

@ -38,6 +38,17 @@
</nav> </nav>
<div class="container-fluid"> <div class="container-fluid">
<div class="page-header col-md-6 col-md-offset-3"> <div class="page-header col-md-6 col-md-offset-3">
<div class="row">
{% if result.0 == 'Ok' %}
<div class="alert alert-info" style="background-color: #3fb618;" role="alert">
{{ result.1 }}
</div>
{% elif result.0 == 'Bad' %}
<div class="alert alert-danger" style="background-color: #ff0039" role="alert">
{{ result.1 }}
</div>
{% endif %}
</div>
<div class="row"> <div class="row">
<a href="http://dlt.ereuse.org/stamps/create?url={{ rq_url }}" target="_blank">Add one new check in your csv</a> <a href="http://dlt.ereuse.org/stamps/create?url={{ rq_url }}" target="_blank">Add one new check in your csv</a>
</div> </div>
@ -45,6 +56,19 @@
<a href="http://dlt.ereuse.org/stamps/check?url={{ rq_url }}" target="_blank">Verify a CSV file in here.</a> <a href="http://dlt.ereuse.org/stamps/check?url={{ rq_url }}" target="_blank">Verify a CSV file in here.</a>
</div> </div>
</div> </div>
<div class="row">
<div class="page-header col-md-6 col-md-offset-3">
If you want us to verify a document issued by us, upload it using the following form
</div>
<div class="col-md-6 col-md-offset-3">
<form enctype="multipart/form-data" action="" method="post">
<label for="name">Document: </label>
<input type="file" name="docUpload" accept="*" /><br />
<input type="submit" id="send-signup" name="signup" value="Verify" />
</form>
</div>
</div>
</div> </div>
</body> </body>
</html> </html>

View file

@ -32,3 +32,8 @@ def insert_hash(bfile):
db.session.add(db_hash) db.session.add(db_hash)
db.session.commit() db.session.commit()
db.session.flush() db.session.flush()
def verify_hash(bfile):
hash3 = hashlib.sha3_256(bfile.read()).hexdigest()
return ReportHash.query.filter(ReportHash.hash3 == hash3).count()

View file

@ -47,6 +47,10 @@ class TagDef(Resource):
'device/<{0.ID_CONVERTER.value}:device_id>'.format(DeviceDef), 'device/<{0.ID_CONVERTER.value}:device_id>'.format(DeviceDef),
view_func=device_view, view_func=device_view,
methods={'PUT'}) methods={'PUT'})
self.add_url_rule('/<{0.ID_CONVERTER.value}:tag_id>/'.format(self) +
'device/<{0.ID_CONVERTER.value}:device_id>'.format(DeviceDef),
view_func=device_view,
methods={'DELETE'})
@option('-u', '--owner', help=OWNER_H) @option('-u', '--owner', help=OWNER_H)
@option('-o', '--org', help=ORG_H) @option('-o', '--org', help=ORG_H)

View file

@ -3,7 +3,7 @@ from typing import Set
from boltons import urlutils from boltons import urlutils
from flask import g from flask import g
from sqlalchemy import BigInteger, Column, ForeignKey, UniqueConstraint from sqlalchemy import BigInteger, Column, ForeignKey, UniqueConstraint, Sequence
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import backref, relationship, validates from sqlalchemy.orm import backref, relationship, validates
from teal.db import DB_CASCADE_SET_NULL, Query, URL from teal.db import DB_CASCADE_SET_NULL, Query, URL
@ -15,6 +15,7 @@ from ereuse_devicehub.resources.agent.models import Organization
from ereuse_devicehub.resources.device.models import Device from ereuse_devicehub.resources.device.models import Device
from ereuse_devicehub.resources.models import Thing from ereuse_devicehub.resources.models import Thing
from ereuse_devicehub.resources.user.models import User from ereuse_devicehub.resources.user.models import User
from ereuse_devicehub.resources.utils import hashcode
class Tags(Set['Tag']): class Tags(Set['Tag']):
@ -25,17 +26,23 @@ class Tags(Set['Tag']):
return ', '.join(format(tag, format_spec) for tag in self).strip() return ', '.join(format(tag, format_spec) for tag in self).strip()
class Tag(Thing): class Tag(Thing):
internal_id = Column(BigInteger, Sequence('tag_internal_id_seq'), unique=True, nullable=False)
internal_id.comment = """The identifier of the tag for this database. Used only
internally for software; users should not use this.
"""
id = Column(db.CIText(), primary_key=True) id = Column(db.CIText(), primary_key=True)
id.comment = """The ID of the tag.""" id.comment = """The ID of the tag."""
owner_id = Column(UUID(as_uuid=True), owner_id = Column(UUID(as_uuid=True),
ForeignKey(User.id), ForeignKey(User.id),
primary_key=True,
nullable=False, nullable=False,
default=lambda: g.user.id) default=lambda: g.user.id)
owner = relationship(User, primaryjoin=owner_id == User.id) owner = relationship(User, primaryjoin=owner_id == User.id)
org_id = Column(UUID(as_uuid=True), org_id = Column(UUID(as_uuid=True),
ForeignKey(Organization.id), ForeignKey(Organization.id),
primary_key=True,
# If we link with the Organization object this instance # If we link with the Organization object this instance
# will be set as persistent and added to session # will be set as persistent and added to session
# which is something we don't want to enforce by default # which is something we don't want to enforce by default
@ -97,8 +104,8 @@ class Tag(Thing):
return url return url
__table_args__ = ( __table_args__ = (
UniqueConstraint(id, org_id, name='one tag id per organization'), UniqueConstraint(id, owner_id, name='one tag id per owner'),
UniqueConstraint(secondary, org_id, name='one secondary tag per organization') UniqueConstraint(secondary, owner_id, name='one secondary tag per organization')
) )
@property @property
@ -109,7 +116,7 @@ class Tag(Thing):
def url(self) -> urlutils.URL: def url(self) -> urlutils.URL:
"""The URL where to GET this device.""" """The URL where to GET this device."""
# todo this url only works for printable internal tags # todo this url only works for printable internal tags
return urlutils.URL(url_for_resource(Tag, item_id=self.id)) return urlutils.URL(url_for_resource(Tag, item_id=self.code))
@property @property
def printable(self) -> bool: def printable(self) -> bool:
@ -125,6 +132,23 @@ class Tag(Thing):
"""Return a SQLAlchemy filter expression for printable queries.""" """Return a SQLAlchemy filter expression for printable queries."""
return cls.org_id == Organization.get_default_org_id() return cls.org_id == Organization.get_default_org_id()
@property
def code(self) -> str:
return hashcode.encode(self.internal_id)
def delete(self):
"""Deletes the tag.
This method removes the tag if is named tag and don't have any linked device.
"""
if self.device:
raise TagLinked(self)
if self.provider:
# if is an unnamed tag not delete
raise TagUnnamed(self.id)
db.session.delete(self)
def __repr__(self) -> str: def __repr__(self) -> str:
return '<Tag {0.id} org:{0.org_id} device:{0.device_id}>'.format(self) return '<Tag {0.id} org:{0.org_id} device:{0.device_id}>'.format(self)
@ -133,3 +157,15 @@ class Tag(Thing):
def __format__(self, format_spec: str) -> str: def __format__(self, format_spec: str) -> str:
return '{0.org.name} {0.id}'.format(self) return '{0.org.name} {0.id}'.format(self)
class TagLinked(ValidationError):
def __init__(self, tag):
message = 'The tag {} is linked to device {}.'.format(tag.id, tag.device.id)
super().__init__(message, field_names=['device'])
class TagUnnamed(ValidationError):
def __init__(self, id):
message = 'This tag {} is unnamed tag. It is imposible delete.'.format(id)
super().__init__(message, field_names=['device'])

View file

@ -28,3 +28,4 @@ class Tag(Thing):
secondary = SanitizedStr(lower=True, description=m.Tag.secondary.comment) secondary = SanitizedStr(lower=True, description=m.Tag.secondary.comment)
printable = Boolean(dump_only=True, decsription=m.Tag.printable.__doc__) printable = Boolean(dump_only=True, decsription=m.Tag.printable.__doc__)
url = URL(dump_only=True, description=m.Tag.url.__doc__) url = URL(dump_only=True, description=m.Tag.url.__doc__)
code = SanitizedStr(dump_only=True, description=m.Tag.internal_id.comment)

View file

@ -6,18 +6,29 @@ from teal.resource import View, url_for_resource
from ereuse_devicehub import auth from ereuse_devicehub import auth
from ereuse_devicehub.db import db from ereuse_devicehub.db import db
from ereuse_devicehub.query import things_response from ereuse_devicehub.query import things_response
from ereuse_devicehub.resources.utils import hashcode
from ereuse_devicehub.resources.device.models import Device from ereuse_devicehub.resources.device.models import Device
from ereuse_devicehub.resources.tag import Tag from ereuse_devicehub.resources.tag import Tag
class TagView(View): class TagView(View):
def one(self, code):
"""Gets the device from the named tag, /tags/namedtag."""
internal_id = hashcode.decode(code.upper()) or -1
tag = Tag.query.filter_by(internal_id=internal_id).one() # type: Tag
if not tag.device:
raise TagNotLinked(tag.id)
return redirect(location=url_for_resource(Device, tag.device.id))
@auth.Auth.requires_auth @auth.Auth.requires_auth
def post(self): def post(self):
"""Creates a tag.""" """Creates a tag."""
num = request.args.get('num', type=int) num = request.args.get('num', type=int)
if num: if num:
# create unnamed tag
res = self._create_many_regular_tags(num) res = self._create_many_regular_tags(num)
else: else:
# create named tag
res = self._post_one() res = self._post_one()
return res return res
@ -42,7 +53,6 @@ class TagView(View):
return response return response
def _post_one(self): def _post_one(self):
# todo do we use this?
t = request.get_json() t = request.get_json()
tag = Tag(**t) tag = Tag(**t)
if tag.like_etag(): if tag.like_etag():
@ -52,12 +62,23 @@ class TagView(View):
db.session.commit() db.session.commit()
return Response(status=201) return Response(status=201)
@auth.Auth.requires_auth
def delete(self, id):
tag = Tag.from_an_id(id).filter_by(owner=g.user).one()
tag.delete()
db.session().final_flush()
db.session.commit()
return Response(status=204)
class TagDeviceView(View): class TagDeviceView(View):
"""Endpoints to work with the device of the tag; /tags/23/device.""" """Endpoints to work with the device of the tag; /tags/23/device."""
def one(self, id): def one(self, id):
"""Gets the device from the tag.""" """Gets the device from the tag."""
if request.authorization:
return self.one_authorization(id)
tag = Tag.from_an_id(id).one() # type: Tag tag = Tag.from_an_id(id).one() # type: Tag
if not tag.device: if not tag.device:
raise TagNotLinked(tag.id) raise TagNotLinked(tag.id)
@ -65,22 +86,46 @@ class TagDeviceView(View):
return redirect(location=url_for_resource(Device, tag.device.devicehub_id)) return redirect(location=url_for_resource(Device, tag.device.devicehub_id))
return app.resources[Device.t].schema.jsonify(tag.device) return app.resources[Device.t].schema.jsonify(tag.device)
# noinspection PyMethodOverriding @auth.Auth.requires_auth
def one_authorization(self, id):
tag = Tag.from_an_id(id).filter_by(owner=g.user).one() # type: Tag
if not tag.device:
raise TagNotLinked(tag.id)
return app.resources[Device.t].schema.jsonify(tag.device)
@auth.Auth.requires_auth
def put(self, tag_id: str, device_id: int): def put(self, tag_id: str, device_id: int):
"""Links an existing tag with a device.""" """Links an existing tag with a device."""
device_id = int(device_id) device_id = int(device_id)
tag = Tag.from_an_id(tag_id).one() # type: Tag tag = Tag.from_an_id(tag_id).filter_by(owner=g.user).one() # type: Tag
if tag.device_id: if tag.device_id:
if tag.device_id == device_id: if tag.device_id == device_id:
return Response(status=204) return Response(status=204)
else: else:
raise LinkedToAnotherDevice(tag.device_id) raise LinkedToAnotherDevice(tag.device_id)
else: else:
# Check if this device exist for this owner
Device.query.filter_by(owner=g.user).filter_by(id=device_id).one()
tag.device_id = device_id tag.device_id = device_id
db.session().final_flush() db.session().final_flush()
db.session.commit() db.session.commit()
return Response(status=204) return Response(status=204)
@auth.Auth.requires_auth
def delete(self, tag_id: str, device_id: str):
tag = Tag.from_an_id(tag_id).filter_by(owner=g.user).one() # type: Tag
device = Device.query.filter_by(owner=g.user).filter_by(id=device_id).one()
if tag.provider:
# if is an unamed tag not do nothing
return Response(status=204)
if tag.device == device:
tag.device_id = None
db.session().final_flush()
db.session.commit()
return Response(status=204)
def get_device_from_tag(id: str): def get_device_from_tag(id: str):
"""Gets the device by passing a tag id. """Gets the device by passing a tag id.

File diff suppressed because one or more lines are too long

View file

@ -752,6 +752,7 @@ def test_deallocate_bad_dates(user: UserClient):
def test_trade(action_model_state: Tuple[Type[models.Action], states.Trading], user: UserClient): def test_trade(action_model_state: Tuple[Type[models.Action], states.Trading], user: UserClient):
"""Tests POSTing all Trade actions.""" """Tests POSTing all Trade actions."""
# todo missing None states.Trading for after cancelling renting, for example # todo missing None states.Trading for after cancelling renting, for example
# Remove this test
action_model, state = action_model_state action_model, state = action_model_state
snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot) snapshot, _ = user.post(file('basic.snapshot'), res=models.Snapshot)
action = { action = {

View file

@ -119,4 +119,4 @@ def test_api_docs(client: Client):
'scheme': 'basic', 'scheme': 'basic',
'name': 'Authorization' 'name': 'Authorization'
} }
assert len(docs['definitions']) == 117 assert len(docs['definitions']) == 118

View file

@ -1,8 +1,9 @@
import csv import csv
import hashlib import hashlib
from datetime import datetime from datetime import datetime
from io import StringIO from io import StringIO, BytesIO
from pathlib import Path from pathlib import Path
from flask import url_for
import pytest import pytest
from werkzeug.exceptions import Unauthorized from werkzeug.exceptions import Unauthorized
@ -463,6 +464,159 @@ def test_get_document_lots(user: UserClient, user2: UserClient):
assert export2_csv[1][3] == 'comments,lot3,testcomment-lot3,' assert export2_csv[1][3] == 'comments,lot3,testcomment-lot3,'
@pytest.mark.mvp
def test_verify_stamp(user: UserClient, client: Client):
"""Test verify stamp of one export device information in a csv file."""
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
csv_str, _ = user.get(res=documents.DocumentDef.t,
item='devices/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(bytes(csv_str, 'utf-8')), 'example.csv')]},
status=200)
assert "alert alert-info" in response
assert not "alert alert-danger" in response
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(b'abc'), 'example.csv')]},
status=200)
assert not "alert alert-info" in response
assert "alert alert-danger" in response
response, _ = client.get(res=documents.DocumentDef.t,
item='stamps/',
accept='text/html',
status=200)
assert not "alert alert-info" in response
assert not "alert alert-danger" in response
@pytest.mark.mvp
def test_verify_stamp_log_info(user: UserClient, client: Client):
"""Test verify stamp of one export lots-info in a csv file."""
l, _ = user.post({'name': 'Lot1', 'description': 'comments,lot1,testcomment-lot1,'}, res=Lot)
l, _ = user.post({'name': 'Lot2', 'description': 'comments,lot2,testcomment-lot2,'}, res=Lot)
csv_str, _ = user.get(res=documents.DocumentDef.t,
item='lots/',
accept='text/csv')
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(bytes(csv_str, 'utf-8')),
'example.csv')]},
status=200)
assert "alert alert-info" in response
@pytest.mark.mvp
def test_verify_stamp_devices_stock(user: UserClient, client: Client):
"""Test verify stamp of one export device information in a csv file."""
snapshot, _ = user.post(file('basic.snapshot'), res=Snapshot)
csv_str, _ = user.get(res=documents.DocumentDef.t,
item='stock/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(bytes(csv_str, 'utf-8')),
'example.csv')]},
status=200)
assert "alert alert-info" in response
@pytest.mark.mvp
def test_verify_stamp_csv_actions(user: UserClient, client: Client):
"""Test verify stamp of one export device information in a csv file with others users."""
acer = file('acer.happy.battery.snapshot')
snapshot, _ = user.post(acer, res=Snapshot)
device_id = snapshot['device']['id']
post_request = {"transaction": "ccc", "name": "John", "endUsers": 1,
"devices": [device_id], "description": "aaa",
"finalUserCode": "abcdefjhi",
"startTime": "2020-11-01T02:00:00+00:00",
"endTime": "2020-12-01T02:00:00+00:00"
}
user.post(res=Allocate, data=post_request)
hdd = [c for c in acer['components'] if c['type'] == 'HardDrive'][0]
hdd_action = [a for a in hdd['actions'] if a['type'] == 'TestDataStorage'][0]
hdd_action['lifetime'] += 1000
acer.pop('elapsed')
acer['licence_version'] = '1.0.0'
snapshot, _ = client.post(acer, res=Live)
csv_str, _ = user.get(res=documents.DocumentDef.t,
item='actions/',
accept='text/csv',
query=[('filter', {'type': ['Computer']})])
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(bytes(csv_str, 'utf-8')),
'example.csv')]},
status=200)
assert "alert alert-info" in response
@pytest.mark.mvp
def test_verify_stamp_erasure_certificate(user: UserClient, client: Client):
"""Test verify stamp of one export certificate in PDF."""
s = file('erase-sectors.snapshot')
snapshot, response = user.post(s, res=Snapshot)
doc, _ = user.get(res=documents.DocumentDef.t,
item='erasures/',
query=[('filter', {'id': [snapshot['device']['id']]})],
accept=ANY)
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(bytes(doc, 'utf-8')),
'example.csv')]},
status=200)
assert "alert alert-danger" in response
doc, _ = user.get(res=documents.DocumentDef.t,
item='erasures/',
query=[
('filter', {'id': [snapshot['device']['id']]}),
('format', 'PDF')
],
accept='application/pdf')
response, _ = client.post(res=documents.DocumentDef.t,
item='stamps/',
content_type='multipart/form-data',
accept='text/html',
data={'docUpload': [(BytesIO(doc),
'example.csv')]},
status=200)
assert "alert alert-info" in response
@pytest.mark.mvp @pytest.mark.mvp
def test_get_document_internal_stats(user: UserClient, user2: UserClient): def test_get_document_internal_stats(user: UserClient, user2: UserClient):
"""Tests for get teh internal stats.""" """Tests for get teh internal stats."""

View file

@ -8,7 +8,7 @@ from pytest import raises
from teal.db import MultipleResourcesFound, ResourceNotFound, UniqueViolation, DBError from teal.db import MultipleResourcesFound, ResourceNotFound, UniqueViolation, DBError
from teal.marshmallow import ValidationError from teal.marshmallow import ValidationError
from ereuse_devicehub.client import UserClient from ereuse_devicehub.client import UserClient, Client
from ereuse_devicehub.db import db from ereuse_devicehub.db import db
from ereuse_devicehub.devicehub import Devicehub from ereuse_devicehub.devicehub import Devicehub
from ereuse_devicehub.resources.action.models import Snapshot from ereuse_devicehub.resources.action.models import Snapshot
@ -33,6 +33,68 @@ def test_create_tag(user: UserClient):
tag = Tag.query.one() tag = Tag.query.one()
assert tag.id == 'bar-1' assert tag.id == 'bar-1'
assert tag.provider == URL('http://foo.bar') assert tag.provider == URL('http://foo.bar')
res, _ = user.get(res=Tag, item=tag.code, status=422)
assert res['type'] == 'TagNotLinked'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_create_tag_with_device(user: UserClient):
"""Creates a tag specifying linked with one device."""
pc = Desktop(serial_number='sn1', chassis=ComputerChassis.Tower, owner_id=user.user['id'])
db.session.add(pc)
db.session.commit()
tag = Tag(id='bar', owner_id=user.user['id'])
db.session.add(tag)
db.session.commit()
data = '{tag_id}/device/{device_id}'.format(tag_id=tag.id, device_id=pc.id)
user.put({}, res=Tag, item=data, status=204)
user.get(res=Tag, item='{}/device'.format(tag.id))
user.delete({}, res=Tag, item=data, status=204)
res, _ = user.get(res=Tag, item='{}/device'.format(tag.id), status=422)
assert res['type'] == 'TagNotLinked'
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_delete_tags(user: UserClient, client: Client):
"""Delete a named tag."""
# Delete Tag Named
pc = Desktop(serial_number='sn1', chassis=ComputerChassis.Tower, owner_id=user.user['id'])
db.session.add(pc)
db.session.commit()
tag = Tag(id='bar', owner_id=user.user['id'], device_id=pc.id)
db.session.add(tag)
db.session.commit()
tag = Tag.query.one()
assert tag.id == 'bar'
# Is not possible delete one tag linked to one device
res, _ = user.delete(res=Tag, item=tag.id, status=422)
msg = 'The tag bar is linked to device'
assert msg in res['message'][0]
tag.device_id = None
db.session.add(tag)
db.session.commit()
# Is not possible delete one tag from an anonymous user
client.delete(res=Tag, item=tag.id, status=401)
# Is possible delete one normal tag
user.delete(res=Tag, item=tag.id)
user.get(res=Tag, item=tag.id, status=404)
# Delete Tag UnNamed
org = Organization(name='bar', tax_id='bartax')
tag = Tag(id='bar-1', org=org, provider=URL('http://foo.bar'), owner_id=user.user['id'])
db.session.add(tag)
db.session.commit()
tag = Tag.query.one()
assert tag.id == 'bar-1'
res, _ = user.delete(res=Tag, item=tag.id, status=422)
msg = 'This tag {} is unnamed tag. It is imposible delete.'.format(tag.id)
assert msg in res['message']
tag = Tag.query.one()
assert tag.id == 'bar-1'
@pytest.mark.mvp @pytest.mark.mvp
@ -51,13 +113,15 @@ def test_create_tag_default_org(user: UserClient):
@pytest.mark.mvp @pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__) @pytest.mark.usefixtures(conftest.app_context.__name__)
def test_create_tag_no_slash(): def test_create_same_tag_default_org_two_users(user: UserClient, user2: UserClient):
"""Checks that no tags can be created that contain a slash.""" """Creates a tag using the default organization."""
with raises(ValidationError): tag = Tag(id='foo-1', owner_id=user.user['id'])
Tag('/') tag2 = Tag(id='foo-1', owner_id=user2.user['id'])
db.session.add(tag)
with raises(ValidationError): db.session.add(tag2)
Tag('bar', secondary='/') db.session.commit()
assert tag.org.name == 'FooOrg' # as defined in the settings
assert tag2.org.name == 'FooOrg' # as defined in the settings
@pytest.mark.mvp @pytest.mark.mvp
@ -75,7 +139,19 @@ def test_create_two_same_tags(user: UserClient):
db.session.add(Tag(id='foo-bar', owner_id=user.user['id'])) db.session.add(Tag(id='foo-bar', owner_id=user.user['id']))
org2 = Organization(name='org 2', tax_id='tax id org 2') org2 = Organization(name='org 2', tax_id='tax id org 2')
db.session.add(Tag(id='foo-bar', org=org2, owner_id=user.user['id'])) db.session.add(Tag(id='foo-bar', org=org2, owner_id=user.user['id']))
db.session.commit() with raises(DBError):
db.session.commit()
@pytest.mark.mvp
@pytest.mark.usefixtures(conftest.app_context.__name__)
def test_create_tag_no_slash():
"""Checks that no tags can be created that contain a slash."""
with raises(ValidationError):
Tag('/')
with raises(ValidationError):
Tag('bar', secondary='/')
@pytest.mark.mvp @pytest.mark.mvp
@ -131,17 +207,39 @@ def test_tag_get_device_from_tag_endpoint_no_tag(user: UserClient):
@pytest.mark.mvp @pytest.mark.mvp
def test_tag_get_device_from_tag_endpoint_multiple_tags(app: Devicehub, user: UserClient): @pytest.mark.usefixtures(conftest.app_context.__name__)
"""As above, but when there are two tags with the same ID, the def test_tag_get_device_from_tag_endpoint_multiple_tags(app: Devicehub, user: UserClient, user2: UserClient, client: Client):
"""As above, but when there are two tags with the secondary ID, the
system should not return any of both (to be deterministic) so system should not return any of both (to be deterministic) so
it should raise an exception. it should raise an exception.
""" """
with app.app_context(): db.session.add(Tag(id='foo', secondary='bar', owner_id=user.user['id']))
db.session.add(Tag(id='foo-bar', owner_id=user.user['id'])) db.session.commit()
org2 = Organization(name='org 2', tax_id='tax id org 2')
db.session.add(Tag(id='foo-bar', org=org2, owner_id=user.user['id'])) db.session.add(Tag(id='foo', secondary='bar', owner_id=user2.user['id']))
db.session.commit()
db.session.add(Tag(id='foo2', secondary='bar', owner_id=user.user['id']))
with raises(DBError):
db.session.commit() db.session.commit()
user.get(res=Tag, item='foo-bar/device', status=MultipleResourcesFound) db.session.rollback()
tag1 = Tag.from_an_id('foo').filter_by(owner_id=user.user['id']).one()
tag2 = Tag.from_an_id('foo').filter_by(owner_id=user2.user['id']).one()
pc1 = Desktop(serial_number='sn1', chassis=ComputerChassis.Tower, owner_id=user.user['id'])
pc2 = Desktop(serial_number='sn2', chassis=ComputerChassis.Tower, owner_id=user2.user['id'])
pc1.tags.add(tag1)
pc2.tags.add(tag2)
db.session.add(pc1)
db.session.add(pc2)
db.session.commit()
computer, _ = user.get(res=Tag, item='foo/device')
assert computer['serialNumber'] == 'sn1'
computer, _ = user2.get(res=Tag, item='foo/device')
assert computer['serialNumber'] == 'sn2'
_, status = client.get(res=Tag, item='foo/device', status=MultipleResourcesFound)
assert status.status_code == 422
@pytest.mark.mvp @pytest.mark.mvp
@ -217,8 +315,7 @@ def test_tag_secondary_workbench_link_find(user: UserClient):
t = Tag('foo', secondary='bar', owner_id=user.user['id']) t = Tag('foo', secondary='bar', owner_id=user.user['id'])
db.session.add(t) db.session.add(t)
db.session.flush() db.session.flush()
assert Tag.from_an_id('bar').one() == t assert Tag.from_an_id('bar').one() == Tag.from_an_id('foo').one()
assert Tag.from_an_id('foo').one() == t
with pytest.raises(ResourceNotFound): with pytest.raises(ResourceNotFound):
Tag.from_an_id('nope').one() Tag.from_an_id('nope').one()