This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
devicehub-teal/ereuse_devicehub/inventory/forms.py

826 lines
30 KiB
Python
Raw Normal View History

import copy
import json
2022-02-03 09:50:36 +00:00
from json.decoder import JSONDecodeError
from boltons.urlutils import URL
2021-12-30 20:35:54 +00:00
from ereuse_devicehub.db import db
from ereuse_devicehub.resources.action.models import RateComputer, Snapshot
2022-02-03 09:50:36 +00:00
from ereuse_devicehub.resources.action.rate.v1_0 import CannotRate
from ereuse_devicehub.resources.action.schemas import \
Snapshot as SnapshotSchema
from ereuse_devicehub.resources.action.views.snapshot import move_json, save_json
2022-02-03 09:50:36 +00:00
from ereuse_devicehub.resources.device.models import (SAI, Cellphone, Computer,
Device, Keyboard, MemoryCardReader,
Monitor, Mouse, Smartphone, Tablet)
from flask import g, request
from flask_wtf import FlaskForm
from sqlalchemy.util import OrderedSet
from wtforms import (BooleanField, DateField, FileField, FloatField, Form,
HiddenField, IntegerField, MultipleFileField, SelectField,
StringField, TextAreaField, URLField, validators)
from wtforms.fields import FormField
2022-02-03 09:50:36 +00:00
from ereuse_devicehub.resources.device.sync import Sync
from ereuse_devicehub.resources.documents.models import DataWipeDocument
2022-02-03 09:50:36 +00:00
from ereuse_devicehub.resources.enums import Severity, SnapshotSoftware
from ereuse_devicehub.resources.hash_reports import insert_hash
2021-12-30 12:52:06 +00:00
from ereuse_devicehub.resources.lot.models import Lot
2022-01-11 12:42:01 +00:00
from ereuse_devicehub.resources.tag.model import Tag
2022-02-16 13:03:01 +00:00
from ereuse_devicehub.resources.user.models import User
2022-02-21 12:02:28 +00:00
from ereuse_devicehub.resources.tradedocument.models import TradeDocument
from ereuse_devicehub.resources.user.exceptions import InsufficientPermission
2021-12-30 12:52:06 +00:00
2022-01-03 10:32:12 +00:00
class LotDeviceForm(FlaskForm):
2021-12-30 12:52:06 +00:00
lot = StringField(u'Lot', [validators.UUID()])
devices = StringField(u'Devices', [validators.length(min=1)])
def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators)
if not is_valid:
return False
2022-01-03 10:32:12 +00:00
self._lot = Lot.query.filter(Lot.id == self.lot.data).filter(
Lot.owner_id == g.user.id).one()
2021-12-30 12:52:06 +00:00
devices = set(self.devices.data.split(","))
2022-01-26 10:22:47 +00:00
self._devices = Device.query.filter(Device.id.in_(devices)).filter(
Device.owner_id == g.user.id).distinct().all()
2021-12-30 12:52:06 +00:00
2022-01-26 10:25:50 +00:00
return bool(self._devices)
2021-12-30 20:35:54 +00:00
def save(self):
trade = self._lot.trade
if trade:
for dev in self._devices:
if not trade in dev.actions:
trade.devices.add(dev)
2022-01-03 10:32:12 +00:00
self._lot.devices.update(self._devices)
db.session.add(self._lot)
db.session.commit()
def remove(self):
self._lot.devices.difference_update(self._devices)
db.session.add(self._lot)
2021-12-30 20:35:54 +00:00
db.session.commit()
class LotForm(FlaskForm):
name = StringField(u'Name', [validators.length(min=1)])
def __init__(self, *args, **kwargs):
self.id = kwargs.pop('id', None)
2022-01-04 11:45:13 +00:00
self.instance = None
if self.id:
self.instance = Lot.query.filter(Lot.id == self.id).filter(
2022-01-03 10:32:12 +00:00
Lot.owner_id == g.user.id).one()
2021-12-30 20:35:54 +00:00
super().__init__(*args, **kwargs)
2022-01-04 11:45:13 +00:00
if self.instance and not self.name.data:
self.name.data = self.instance.name
2021-12-30 20:35:54 +00:00
def save(self):
2022-01-12 19:26:37 +00:00
if not self.id:
self.instance = Lot(name=self.name.data)
self.populate_obj(self.instance)
2021-12-30 20:35:54 +00:00
if not self.id:
2022-01-12 19:26:37 +00:00
self.id = self.instance.id
db.session.add(self.instance)
db.session.commit()
2022-01-12 19:26:37 +00:00
return self.id
2021-12-30 20:35:54 +00:00
db.session.commit()
return self.id
2022-01-03 11:28:42 +00:00
def remove(self):
2022-01-04 11:45:13 +00:00
if self.instance and not self.instance.devices:
self.instance.delete()
2022-01-03 11:28:42 +00:00
db.session.commit()
2022-01-04 11:45:13 +00:00
return self.instance
class UploadSnapshotForm(FlaskForm):
snapshot = MultipleFileField(u'Select a Snapshot File', [validators.DataRequired()])
def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators)
if not is_valid:
return False
data = request.files.getlist(self.snapshot.name)
if not data:
return False
self.snapshots = []
self.result = {}
for d in data:
filename = d.filename
self.result[filename] = 'Not processed'
d = d.stream.read()
if not d:
self.result[filename] = 'Error this snapshot is empty'
continue
try:
d_json = json.loads(d)
except JSONDecodeError:
self.result[filename] = 'Error this snapshot is not a json'
continue
uuid_snapshot = d_json.get('uuid')
if Snapshot.query.filter(Snapshot.uuid == uuid_snapshot).all():
self.result[filename] = 'Error this snapshot exist'
continue
self.snapshots.append((filename, d_json))
if not self.snapshots:
return False
return True
def save(self):
if any([x == 'Error' for x in self.result.values()]):
return
# result = []
self.sync = Sync()
schema = SnapshotSchema()
# self.tmp_snapshots = app.config['TMP_SNAPSHOTS']
# TODO @cayop get correct var config
self.tmp_snapshots = '/tmp/'
for filename, snapshot_json in self.snapshots:
path_snapshot = save_json(snapshot_json, self.tmp_snapshots, g.user.email)
snapshot_json.pop('debug', None)
snapshot_json = schema.load(snapshot_json)
2022-01-20 12:13:20 +00:00
response = self.build(snapshot_json)
if hasattr(response, 'type'):
self.result[filename] = 'Ok'
else:
self.result[filename] = 'Error'
move_json(self.tmp_snapshots, path_snapshot, g.user.email)
db.session.commit()
return response
def build(self, snapshot_json):
# this is a copy adaptated from ereuse_devicehub.resources.action.views.snapshot
device = snapshot_json.pop('device') # type: Computer
components = None
if snapshot_json['software'] == (SnapshotSoftware.Workbench or SnapshotSoftware.WorkbenchAndroid):
components = snapshot_json.pop('components', None) # type: List[Component]
if isinstance(device, Computer) and device.hid:
device.add_mac_to_hid(components_snap=components)
snapshot = Snapshot(**snapshot_json)
# Remove new actions from devices so they don't interfere with sync
actions_device = set(e for e in device.actions_one)
device.actions_one.clear()
if components:
actions_components = tuple(set(e for e in c.actions_one) for c in components)
for component in components:
component.actions_one.clear()
assert not device.actions_one
assert all(not c.actions_one for c in components) if components else True
db_device, remove_actions = self.sync.run(device, components)
del device # Do not use device anymore
snapshot.device = db_device
snapshot.actions |= remove_actions | actions_device # Set actions to snapshot
# commit will change the order of the components by what
# the DB wants. Let's get a copy of the list so we preserve order
ordered_components = OrderedSet(x for x in snapshot.components)
# Add the new actions to the db-existing devices and components
db_device.actions_one |= actions_device
if components:
for component, actions in zip(ordered_components, actions_components):
component.actions_one |= actions
snapshot.actions |= actions
if snapshot.software == SnapshotSoftware.Workbench:
# Check ownership of (non-component) device to from current.user
if db_device.owner_id != g.user.id:
raise InsufficientPermission()
# Compute ratings
try:
rate_computer, price = RateComputer.compute(db_device)
except CannotRate:
pass
else:
snapshot.actions.add(rate_computer)
if price:
snapshot.actions.add(price)
elif snapshot.software == SnapshotSoftware.WorkbenchAndroid:
pass # TODO try except to compute RateMobile
# Check if HID is null and add Severity:Warning to Snapshot
if snapshot.device.hid is None:
snapshot.severity = Severity.Warning
db.session.add(snapshot)
return snapshot
2022-01-19 12:40:40 +00:00
class NewDeviceForm(FlaskForm):
type = StringField(u'Type', [validators.DataRequired()])
label = StringField(u'Label')
serial_number = StringField(u'Seria Number', [validators.DataRequired()])
model = StringField(u'Model', [validators.DataRequired()])
manufacturer = StringField(u'Manufacturer', [validators.DataRequired()])
appearance = StringField(u'Appearance', [validators.Optional()])
functionality = StringField(u'Functionality', [validators.Optional()])
2022-01-19 12:40:40 +00:00
brand = StringField(u'Brand')
2022-01-20 12:13:20 +00:00
generation = IntegerField(u'Generation')
2022-01-19 12:40:40 +00:00
version = StringField(u'Version')
2022-01-20 12:13:20 +00:00
weight = FloatField(u'Weight', [validators.DataRequired()])
width = FloatField(u'Width', [validators.DataRequired()])
height = FloatField(u'Height', [validators.DataRequired()])
depth = FloatField(u'Depth', [validators.DataRequired()])
variant = StringField(u'Variant', [validators.Optional()])
sku = StringField(u'SKU', [validators.Optional()])
2022-01-20 12:13:20 +00:00
image = StringField(u'Image', [validators.Optional(), validators.URL()])
imei = IntegerField(u'IMEI', [validators.Optional()])
meid = StringField(u'MEID', [validators.Optional()])
resolution = IntegerField(u'Resolution width', [validators.Optional()])
screen = FloatField(u'Screen size', [validators.Optional()])
2022-01-18 13:09:01 +00:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
2022-01-19 12:40:40 +00:00
self.devices = {"Smartphone": Smartphone,
"Tablet": Tablet,
"Cellphone": Cellphone,
"Monitor": Monitor,
"Mouse": Mouse,
"Keyboard": Keyboard,
"SAI": SAI,
2022-01-20 12:13:20 +00:00
"MemoryCardReader": MemoryCardReader}
if not self.generation.data:
self.generation.data = 1
2022-01-20 12:13:20 +00:00
if not self.weight.data:
self.weight.data = 0.1
if not self.height.data:
self.height.data = 0.1
if not self.width.data:
self.width.data = 0.1
if not self.depth.data:
self.depth.data = 0.1
def validate(self, extra_validators=None):
error = ["Not a correct value"]
2022-01-20 12:13:20 +00:00
is_valid = super().validate(extra_validators)
if self.generation.data < 1:
self.generation.errors = error
is_valid = False
2022-01-20 12:13:20 +00:00
if self.weight.data < 0.1:
self.weight.errors = error
is_valid = False
2022-01-20 12:13:20 +00:00
if self.height.data < 0.1:
self.height.errors = error
is_valid = False
2022-01-20 12:13:20 +00:00
if self.width.data < 0.1:
self.width.errors = error
is_valid = False
2022-01-20 12:13:20 +00:00
if self.depth.data < 0.1:
self.depth.errors = error
is_valid = False
if self.imei.data:
if not 13 < len(str(self.imei.data)) < 17:
self.imei.errors = error
is_valid = False
if self.meid.data:
meid = self.meid.data
if not 13 < len(meid) < 17:
is_valid = False
try:
int(meid, 16)
except ValueError:
self.meid.errors = error
is_valid = False
if not is_valid:
2022-01-20 12:13:20 +00:00
return False
2022-01-21 10:07:15 +00:00
if self.image.data == '':
self.image.data = None
if self.manufacturer.data:
self.manufacturer.data = self.manufacturer.data.lower()
if self.model.data:
self.model.data = self.model.data.lower()
if self.serial_number.data:
self.serial_number.data = self.serial_number.data.lower()
2022-01-20 12:13:20 +00:00
return True
2022-01-18 13:09:01 +00:00
def save(self):
json_snapshot = {
'type': 'Snapshot',
'software': 'Web',
'version': '11.0',
'device': {
'type': self.type.data,
'model': self.model.data,
'manufacturer': self.manufacturer.data,
'serialNumber': self.serial_number.data,
'brand': self.brand.data,
'version': self.version.data,
'generation': self.generation.data,
'sku': self.sku.data,
'weight': self.weight.data,
'width': self.width.data,
'height': self.height.data,
'depth': self.depth.data,
'variant': self.variant.data,
'image': self.image.data
}
}
if self.appearance.data or self.functionality.data:
json_snapshot['device']['actions'] = [{
'type': 'VisualTest',
'appearanceRange': self.appearance.data,
'functionalityRange': self.functionality.data
}]
upload_form = UploadSnapshotForm()
upload_form.sync = Sync()
schema = SnapshotSchema()
self.tmp_snapshots = '/tmp/'
path_snapshot = save_json(json_snapshot, self.tmp_snapshots, g.user.email)
snapshot_json = schema.load(json_snapshot)
if self.type.data == 'Monitor':
snapshot_json['device'].resolution_width = self.resolution.data
snapshot_json['device'].size = self.screen.data
if self.type.data in ['Smartphone', 'Tablet', 'Cellphone']:
snapshot_json['device'].imei = self.imei.data
snapshot_json['device'].meid = self.meid.data
snapshot = upload_form.build(snapshot_json)
move_json(self.tmp_snapshots, path_snapshot, g.user.email)
if self.type.data == 'Monitor':
snapshot.device.resolution = self.resolution.data
snapshot.device.screen = self.screen.data
db.session.commit()
return snapshot
2022-01-18 13:09:01 +00:00
2022-01-11 12:42:01 +00:00
class TagForm(FlaskForm):
code = StringField(u'Code', [validators.length(min=1)])
2022-01-25 11:53:36 +00:00
def validate(self, extra_validators=None):
error = ["This value is being used"]
is_valid = super().validate(extra_validators)
if not is_valid:
return False
tag = Tag.query.filter(Tag.id==self.code.data).all()
if tag:
self.code.errors = error
return False
return True
2022-01-11 12:42:01 +00:00
def save(self):
self.instance = Tag(id=self.code.data)
db.session.add(self.instance)
db.session.commit()
return self.instance
def remove(self):
if not self.instance.device and not self.instance.provider:
self.instance.delete()
db.session.commit()
return self.instance
2022-01-26 12:16:18 +00:00
class TagUnnamedForm(FlaskForm):
amount = IntegerField(u'amount')
def save(self):
num = self.amount.data
tags_id, _ = g.tag_provider.post('/', {}, query=[('num', num)])
tags = [Tag(id=tag_id, provider=g.inventory.tag_provider) for tag_id in tags_id]
db.session.add_all(tags)
db.session.commit()
return tags
2022-01-25 11:53:36 +00:00
class TagDeviceForm(FlaskForm):
2022-01-25 13:39:15 +00:00
tag = SelectField(u'Tag', choices=[])
device = StringField(u'Device', [validators.Optional()])
2022-01-25 13:39:15 +00:00
def __init__(self, *args, **kwargs):
self.delete = kwargs.pop('delete', None)
self.device_id = kwargs.pop('device', None)
2022-01-25 13:39:15 +00:00
super().__init__(*args, **kwargs)
if self.delete:
tags = Tag.query.filter(Tag.owner_id==g.user.id).filter(Tag.device_id==self.device_id)
2022-01-25 13:39:15 +00:00
else:
tags = Tag.query.filter(Tag.owner_id==g.user.id).filter(Tag.device_id==None)
self.tag.choices = [(tag.id, tag.id) for tag in tags]
2022-01-25 11:53:36 +00:00
def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators)
if not is_valid:
return False
self._tag = Tag.query.filter(Tag.id == self.tag.data).filter(
Tag.owner_id == g.user.id).one()
2022-01-26 12:50:43 +00:00
if not self.delete and self._tag.device_id:
self.tag.errors = [("This tag is actualy in use.")]
return False
if self.device.data:
try:
self.device.data = int(self.device.data.split(',')[-1])
except:
self.device.data = None
if self.device_id or self.device.data:
self.device_id = self.device_id or self.device.data
self._device = Device.query.filter(Device.id == self.device_id).filter(
Device.owner_id == g.user.id).one()
2022-01-25 11:53:36 +00:00
return True
def save(self):
self._tag.device_id = self._device.id
db.session.add(self._tag)
db.session.commit()
def remove(self):
self._tag.device = None
db.session.add(self._tag)
db.session.commit()
2022-01-11 12:42:01 +00:00
2022-01-04 11:56:28 +00:00
class NewActionForm(FlaskForm):
2022-02-07 13:02:28 +00:00
name = StringField(u'Name', [validators.length(max=50)],
description="A name or title of the event. Something to look for.")
devices = HiddenField()
2022-02-07 13:02:28 +00:00
date = DateField(u'Date', [validators.Optional()],
description="""When the action ends. For some actions like booking
the time when it expires, for others like renting the
time that the end rents. For specific actions, it is the
time in which they are carried out; differs from created
in that created is where the system receives the action.""")
severity = SelectField(u'Severity', choices=[(v.name, v.name) for v in Severity],
description="""An indicator that evaluates the execution of the event.
For example, failed events are set to Error""")
description = TextAreaField(u'Description')
lot = HiddenField()
type = HiddenField()
2022-01-10 12:07:05 +00:00
def validate(self, extra_validators=None):
is_valid = self.generic_validation(extra_validators=extra_validators)
2022-01-10 12:07:05 +00:00
if not is_valid:
return False
2022-02-16 13:03:01 +00:00
self._devices = OrderedSet()
2022-02-02 12:51:54 +00:00
if self.devices.data:
devices = set(self.devices.data.split(","))
self._devices = OrderedSet(Device.query.filter(Device.id.in_(devices)).filter(
Device.owner_id == g.user.id).all())
2022-01-10 12:07:05 +00:00
2022-02-02 12:51:54 +00:00
if not self._devices:
return False
2022-01-10 12:07:05 +00:00
return True
def generic_validation(self, extra_validators=None):
# Some times we want check validations without devices list
return super().validate(extra_validators)
2022-01-04 11:56:28 +00:00
def save(self):
Model = db.Model._decl_class_registry.data[self.type.data]()
self.instance = Model()
2022-02-02 11:52:16 +00:00
devices = self.devices.data
severity = self.severity.data
self.devices.data = self._devices
self.severity.data = Severity[self.severity.data]
self.populate_obj(self.instance)
2022-02-02 12:51:54 +00:00
db.session.add(self.instance)
db.session.commit()
2022-02-02 11:52:16 +00:00
self.devices.data = devices
self.severity.data = severity
2022-01-10 12:07:05 +00:00
return self.instance
2022-01-10 12:27:27 +00:00
def check_valid(self):
if self.type.data in ['', None]:
return
if not self.validate():
return self.type.data
2022-01-10 12:27:27 +00:00
class AllocateForm(NewActionForm):
start_time = DateField(u'Start time')
end_time = DateField(u'End time')
2022-01-10 12:27:27 +00:00
final_user_code = StringField(u'Final user code', [validators.length(max=50)])
transaction = StringField(u'Transaction', [validators.length(max=50)])
end_users = IntegerField(u'End users')
def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators)
start_time = self.start_time.data
end_time = self.end_time.data
if start_time and end_time and end_time < start_time:
error = ['The action cannot finish before it starts.']
self.start_time.errors = error
self.end_time.errors = error
is_valid = False
2022-02-02 11:52:16 +00:00
if not self.end_users.data:
self.end_users.errors = ["You need to specify a number of users"]
is_valid = False
return is_valid
2022-02-03 10:27:16 +00:00
class DataWipeDocumentForm(Form):
date = DateField(u'Date', [validators.Optional()],
description="Date when was data wipe")
2022-02-07 13:02:28 +00:00
url = URLField(u'Url', [validators.Optional()],
description="Url where the document resides")
success = BooleanField(u'Success', [validators.Optional()],
description="The erase was success or not?")
software = StringField(u'Software', [validators.Optional()],
description="Which software has you use for erase the disks")
id_document = StringField(u'Document Id', [validators.Optional()],
description="Identification number of document")
file_name = FileField(u'File', [validators.DataRequired()],
description="""This file is not stored on our servers, it is only used to
generate a digital signature and obtain the name of the file.""")
2022-02-03 10:27:16 +00:00
def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators)
return is_valid
2022-02-07 13:02:28 +00:00
def save(self, commit=True):
2022-02-07 13:02:28 +00:00
file_name = ''
file_hash = ''
if self.file_name.data:
file_name = self.file_name.data.filename
file_hash = insert_hash(self.file_name.data.read(), commit=False)
self.url.data = URL(self.url.data)
self._obj = DataWipeDocument(
document_type='DataWipeDocument',
2022-02-07 13:02:28 +00:00
)
self.populate_obj(self._obj)
self._obj.file_name = file_name
self._obj.file_hash = file_hash
db.session.add(self._obj)
if commit:
db.session.commit()
return self._obj
2022-02-07 13:02:28 +00:00
class DataWipeForm(NewActionForm):
document = FormField(DataWipeDocumentForm)
def save(self):
self.document.form.save(commit=False)
2022-02-07 13:02:28 +00:00
Model = db.Model._decl_class_registry.data[self.type.data]()
self.instance = Model()
devices = self.devices.data
severity = self.severity.data
self.devices.data = self._devices
self.severity.data = Severity[self.severity.data]
document = copy.copy(self.document)
del self.document
2022-02-07 13:02:28 +00:00
self.populate_obj(self.instance)
self.instance.document = document.form._obj
2022-02-07 13:02:28 +00:00
db.session.add(self.instance)
db.session.commit()
self.devices.data = devices
self.severity.data = severity
self.document = document
2022-02-07 13:02:28 +00:00
return self.instance
2022-02-09 13:06:28 +00:00
class TradeForm(NewActionForm):
user_from = StringField(u'Supplier', [validators.Optional()],
2022-02-10 12:22:47 +00:00
description="Please enter the supplier's email address",
render_kw={'data-email': ""})
user_to = StringField(u'Receiver', [validators.Optional()],
2022-02-10 12:22:47 +00:00
description="Please enter the receiver's email address",
render_kw={'data-email': ""})
2022-02-09 13:06:28 +00:00
confirm = BooleanField(u'Confirm', [validators.Optional()],
2022-02-10 12:22:47 +00:00
default=True,
2022-02-09 13:06:28 +00:00
description="I need confirmation from the other user for every device and document.")
code = StringField(u'Code', [validators.Optional()],
description="If you don't need confirm, you need put a code for trace the user in the statistics.")
2022-02-10 12:22:47 +00:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user_from.render_kw['data-email'] = g.user.email
self.user_to.render_kw['data-email'] = g.user.email
self._lot = Lot.query.filter(Lot.id==self.lot.data).filter(Lot.owner_id==g.user.id).one()
2022-02-10 12:22:47 +00:00
def validate(self, extra_validators=None):
is_valid = self.generic_validation(extra_validators=extra_validators)
email_from = self.user_from.data
email_to = self.user_to.data
2022-02-10 12:22:47 +00:00
2022-02-10 12:53:52 +00:00
if not self.confirm.data and not self.code.data:
2022-02-10 12:22:47 +00:00
self.code.errors = ["If you don't want confirm, you need a code"]
is_valid = False
if self.confirm.data and not (email_from and email_to) or email_to == email_from or \
g.user.email not in [email_from, email_to]:
2022-02-10 12:53:52 +00:00
errors = ["If you want confirm, you need a correct email"]
self.user_to.errors = errors
self.user_from.errors = errors
2022-02-10 12:22:47 +00:00
2022-02-10 12:53:52 +00:00
is_valid = False
2022-02-16 13:03:01 +00:00
if self.confirm.data and is_valid:
user_to = User.query.filter_by(email=email_to).first() or g.user
user_from = User.query.filter_by(email=email_from).first() or g.user
2022-02-16 13:03:01 +00:00
if user_to == user_from:
is_valid = False
else:
self.db_user_to = user_to
self.db_user_from = user_from
2022-02-16 13:03:01 +00:00
2022-02-10 12:53:52 +00:00
return is_valid
2022-02-16 13:03:01 +00:00
2022-02-21 12:02:28 +00:00
def save(self, commit=True):
self.create_phantom_account()
self.prepare_instance()
self.create_automatic_trade()
2022-02-21 12:02:28 +00:00
if commit:
db.session.commit()
return self.instance
def prepare_instance(self):
2022-02-21 12:02:28 +00:00
Model = db.Model._decl_class_registry.data['Trade']()
2022-02-16 13:03:01 +00:00
self.instance = Model()
self.instance.user_from = self.db_user_from
self.instance.user_to = self.db_user_to
2022-02-16 13:03:01 +00:00
self.instance.lot_id = self._lot.id
self.instance.devices = self._lot.devices
2022-02-16 13:03:01 +00:00
self.instance.code = self.code.data
self.instance.confirm = self.confirm.data
self.instance.date = self.date.data
self.instance.name = self.name.data
self.instance.description = self.description.data
db.session.add(self.instance)
def create_phantom_account(self):
"""
If exist both users not to do nothing
If exist from but not to:
search if exist in the DB
if exist use it
else create new one
The same if exist to but not from
"""
# Checks
if self.confirm.data or not self.code.data:
return
2022-02-16 13:03:01 +00:00
user_from = self.user_from.data
user_to = self.user_to.data
code = self.code.data
if user_from and user_to:
# both users exist, no further action is necessary
return
2022-02-22 10:13:00 +00:00
# Create receiver (to) phantom account
if user_from and not user_to:
assert g.user.email == user_from
user = self.create_user(code)
self.user_from = g.user
self.user_to = user
return
2022-02-22 10:13:00 +00:00
# Create supplier (from) phantom account
if not user_from and user_to:
assert g.user.email == user_to
user = self.create_user(code)
self.user_from = user
self.user_to = g.user
def create_user(self, code):
email = "{}_{}@dhub.com".format(str(g.user.id), code)
user = User.query.filter_by(email=email).first()
if not user:
user = User(email=email, password='', active=False, phantom=True)
db.session.add(user)
return user
def create_automatic_trade(self):
# This method change the ownership of devices
# do nothing if an explicit confirmation is required
if self.confirm.data:
return
# Change the owner for every devices
for dev in self._lot.devices:
dev.change_owner(self.db_user_to)
def check_valid(self):
if self.user_from.data == self.user_to.data:
return
if self.user_from.data == g.user.email:
return 'user_to'
if self.user_to.data == g.user.email:
2022-02-21 12:02:28 +00:00
return 'user_from'
class TradeDocumentForm(FlaskForm):
url = URLField(u'Url', [validators.Optional()],
render_kw={'class': "form-control"},
description="Url where the document resides")
description = StringField(u'Description', [validators.Optional()],
render_kw={'class': "form-control"},
description="")
id_document = StringField(u'Document Id', [validators.Optional()],
render_kw={'class': "form-control"},
description="Identification number of document")
date = DateField(u'Date', [validators.Optional()],
render_kw={'class': "form-control"},
description="")
file_name = FileField(u'File', [validators.DataRequired()],
render_kw={'class': "form-control"},
description="""This file is not stored on our servers, it is only used to
generate a digital signature and obtain the name of the file.""")
def __init__(self, *args, **kwargs):
lot_id = kwargs.pop('lot')
super().__init__(*args, **kwargs)
self._lot = Lot.query.filter(Lot.id==lot_id).one()
def validate(self, extra_validators=None):
is_valid = super().validate(extra_validators)
if g.user not in [self._lot.trade.user_from, self._lot.trade.user_to]:
is_valid = False
return is_valid
def save(self, commit=True):
file_name = ''
file_hash = ''
if self.file_name.data:
file_name = self.file_name.data.filename
file_hash = insert_hash(self.file_name.data.read(), commit=False)
self.url.data = URL(self.url.data)
self._obj = TradeDocument(lot_id=self._lot.id)
self.populate_obj(self._obj)
self._obj.file_name = file_name
self._obj.file_hash = file_hash
db.session.add(self._obj)
self._lot.trade.documents.add(self._obj)
if commit:
db.session.commit()
return self._obj