IdHub/idhub/admin/forms.py
Cayo Puigdefabregas d09f0f94bd clean nans
2024-03-18 17:30:32 +01:00

496 lines
14 KiB
Python

import json
import base64
import jsonschema
import pandas as pd
from nacl.exceptions import CryptoError
from openpyxl import load_workbook
from django import forms
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ValidationError
from utils import certs
from idhub.models import (
DID,
File_datas,
Membership,
Schemas,
UserRol,
VerificableCredential,
)
from idhub_auth.models import User
class TermsConditionsForm2(forms.Form):
accept = forms.BooleanField(
label=_("Accept terms and conditions of the service"),
required=False
)
def __init__(self, *args, **kwargs):
self.user = kwargs.pop('user', None)
super().__init__(*args, **kwargs)
def clean(self):
data = self.cleaned_data
if data.get("accept"):
self.user.accept_gdpr = True
else:
self.user.accept_gdpr = False
return data
def save(self, commit=True):
if commit:
self.user.save()
return self.user
return
class EncryptionKeyForm(forms.Form):
key = forms.CharField(
label=_("Key for encrypt the secrets of all system"),
required=True
)
def clean(self):
data = self.cleaned_data
self._key = data["key"]
if not DID.objects.exists():
return data
did = DID.objects.first()
cache.set("KEY_DIDS", self._key, None)
try:
did.get_key_material()
except CryptoError:
cache.set("KEY_DIDS", None)
txt = _("Key no valid!")
raise ValidationError(txt)
cache.set("KEY_DIDS", None)
return data
def save(self, commit=True):
if commit:
cache.set("KEY_DIDS", self._key, None)
if not DID.objects.exists():
did = DID.objects.create(label='Default', type=DID.Types.WEB)
did.set_did()
did.save()
return
class TermsConditionsForm(forms.Form):
accept_privacy = forms.BooleanField(
widget=forms.CheckboxInput(attrs={'class': 'form-check-input'}),
required=False
)
accept_legal = forms.BooleanField(
widget=forms.CheckboxInput(attrs={'class': 'form-check-input'}),
required=False
)
accept_cookies = forms.BooleanField(
widget=forms.CheckboxInput(attrs={'class': 'form-check-input'}),
required=False
)
def __init__(self, *args, **kwargs):
self.user = kwargs.pop('user', None)
super().__init__(*args, **kwargs)
def get_label(self, url, read):
label = _('I read and accepted the')
label += f' <a class="btn btn-green-admin" target="_blank" href="{url}" '
label += f'title="{read}">{read}</a>'
return label
def privacy_label(self):
url = "https://laweb.pangea.org/politica-de-privacitat/"
read = _("Privacy policy")
return self.get_label(url, read)
def legal_label(self):
url = "https://laweb.pangea.org/avis-legal/"
read = _("Legal policy")
return self.get_label(url, read)
def cookies_label(self):
url = "https://laweb.pangea.org/politica-de-cookies-2/"
read = _("Cookies policy")
return self.get_label(url, read)
def clean(self):
data = self.cleaned_data
privacy = data.get("accept_privacy")
legal = data.get("accept_legal")
cookies = data.get("accept_cookies")
if privacy and legal and cookies:
self.user.accept_gdpr = True
else:
self.user.accept_gdpr = False
return data
def save(self, commit=True):
if commit:
self.user.save()
return self.user
return
class ImportForm(forms.Form):
did = forms.ChoiceField(label=_("Did"), choices=[])
eidas1 = forms.ChoiceField(
label=_("Signature with Eidas1"),
choices=[],
required=False
)
schema = forms.ChoiceField(label=_("Schema"), choices=[])
file_import = forms.FileField(label=_("File to import"))
def __init__(self, *args, **kwargs):
self._schema = None
self._did = None
self._eidas1 = None
self.rows = {}
self.properties = {}
self.users = []
super().__init__(*args, **kwargs)
dids = DID.objects.filter(user__isnull=True)
self.fields['did'].choices = [
(x.did, x.label) for x in dids.filter(eidas1=False)
]
txt_select_one = _("Please choose a data schema ...")
self.fields['schema'].choices = [(0,txt_select_one)] + [
(x.id, x.name) for x in Schemas.objects.filter()
]
if dids.filter(eidas1=True).exists():
choices = [("", "")]
choices.extend([
(x.did, x.label) for x in dids.filter(eidas1=True)
])
self.fields['eidas1'].choices = choices
else:
self.fields.pop('eidas1')
def clean(self):
data = self.cleaned_data["did"]
did = DID.objects.filter(
user__isnull=True,
did=data
)
if not did.exists():
raise ValidationError(_("Did not valid!"))
self._did = did.first()
eidas1 = self.cleaned_data.get('eidas1')
if eidas1:
self._eidas1 = DID.objects.filter(
user__isnull=True,
eidas1=True,
did=eidas1
).first()
return data
def clean_schema(self):
data = self.cleaned_data["schema"]
schema = Schemas.objects.filter(
id=data
)
if not schema.exists():
raise ValidationError(_("Schema is not valid!"))
self._schema = schema.first()
try:
self.json_schema = self._schema.get_credential_subject_schema()
except Exception:
raise ValidationError(_("Schema not valid!"))
return data
def clean_file_import(self):
data = self.cleaned_data["file_import"]
if not self._schema:
return data
self.file_name = data.name
props = self.json_schema.get("properties", {})
# Forze than pandas read one column as string
dtype_dict = {
"phoneNumber": str,
"phone": str,
'postCode': str
}
df = pd.read_excel(data, dtype=dtype_dict)
df.fillna('', inplace=True)
try:
workbook = load_workbook(data)
# if no there are schema meen than is a excel costum and you
# don't have control abour that
if 'Schema' in workbook.custom_doc_props.names:
excel_schema = workbook.custom_doc_props['Schema'].value
file_schema = self._schema.file_schema.split('.json')[0]
assert file_schema in excel_schema
except Exception:
txt = _("This File does not correspond to this scheme!")
raise ValidationError(txt)
# convert dates to iso 8601
for col in df.select_dtypes(include='datetime').columns:
df[col] = df[col].dt.strftime("%Y-%m-%d")
df.fillna('', inplace=True)
# convert numbers to strings if this is indicate in schema
for col in props.keys():
if col not in df.columns:
continue
if "string" in props[col]["type"]:
df[col] = df[col].astype(str)
# TODO @cayop if there are a cel with nan then now is ''
# for this raison crash with df[col].astype(int)
# elif "integer" in props[col]["type"]:
# df[col] = df[col].astype(int)
# elif "number" in props[col]["type"]:
# df[col] = df[col].astype(float)
data_pd = df.to_dict(orient='index')
if not data_pd or df.last_valid_index() is None:
self.exception(_("The file you try to import is empty!"))
for n in data_pd.keys():
row = {}
d = data_pd[n]
for k, v in d.items():
if d[k] or d[k] == 0:
row[k] = d[k]
if row:
user = self.validate_jsonld(n+2, row)
if user:
self.rows[user] = row
return data
def save(self, commit=True):
table = []
for k, v in self.rows.items():
table.append(self.create_credential(k, v))
if commit:
for cred in table:
cred.save()
File_datas.objects.create(file_name=self.file_name)
return table
return
def validate_jsonld(self, line, row):
try:
jsonschema.validate(
instance=row,
schema=self.json_schema,
format_checker=jsonschema.Draft202012Validator.FORMAT_CHECKER
)
except jsonschema.exceptions.ValidationError as err:
msg = "line {}: {}".format(line, err.message)
return self.exception(msg)
user, new = User.objects.get_or_create(email=row.get('email'))
if new:
self.users.append(user)
user.set_encrypted_sensitive_data()
user.save()
self.create_defaults_dids(user)
return user
def create_defaults_dids(self, user):
did = DID(label="Default", user=user, type=DID.Types.WEB)
did.set_did()
did.save()
def create_credential(self, user, row):
bcred = VerificableCredential.objects.filter(
user=user,
schema=self._schema,
issuer_did=self._did,
status=VerificableCredential.Status.ENABLED
)
if bcred.exists():
cred = bcred.first()
cred.csv_data = json.dumps(row, default=str)
cred.eidas1_did = self._eidas1
return cred
cred = VerificableCredential(
verified=False,
user=user,
csv_data=json.dumps(row, default=str),
issuer_did=self._did,
schema=self._schema,
eidas1_did=self._eidas1
)
cred.set_type()
return cred
def exception(self, msg):
File_datas.objects.create(file_name=self.file_name, success=False)
raise ValidationError(msg)
class SchemaForm(forms.Form):
file_template = forms.FileField(label=_("File template"))
class MembershipForm(forms.ModelForm):
class Meta:
model = Membership
fields = ['type', 'start_date', 'end_date']
def clean_end_date(self):
data = super().clean()
start_date = data['start_date']
end_date = data.get('end_date')
members = Membership.objects.filter(
type=data['type'],
user=self.instance.user
)
if self.instance.id:
members = members.exclude(id=self.instance.id)
if members.filter(start_date__lte=start_date, end_date=None).exists():
msg = _("This membership already exists!")
raise forms.ValidationError(msg)
if (start_date and end_date):
if start_date > end_date:
msg = _("The end date is less than the start date")
raise forms.ValidationError(msg)
members = members.filter(
start_date__lte=end_date,
end_date__gte=start_date,
)
if members.exists():
msg = _("This membership already exists!")
raise forms.ValidationError(msg)
if not end_date:
members = members.filter(
start_date__gte=start_date,
)
if members.exists():
msg = _("This membership already exists!")
raise forms.ValidationError(msg)
return end_date
class UserRolForm(forms.ModelForm):
class Meta:
model = UserRol
fields = ['service']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.instance.id:
user = self.instance.user
choices = self.fields['service'].choices
choices.queryset = choices.queryset.exclude(users__user=user)
self.fields['service'].choices = choices
def clean_service(self):
data = super().clean()
service = UserRol.objects.filter(
service=data['service'],
user=self.instance.user
)
if service.exists():
msg = _("Is not possible to have a duplicate role")
raise forms.ValidationError(msg)
return data['service']
class ImportCertificateForm(forms.Form):
label = forms.CharField(label=_("Label"))
password = forms.CharField(
label=_("Password of certificate"),
widget=forms.PasswordInput
)
file_import = forms.FileField(label=_("File import"))
def __init__(self, *args, **kwargs):
self._did = None
self._s = None
self._label = None
super().__init__(*args, **kwargs)
def clean(self):
data = super().clean()
file_import = data.get('file_import')
self.pfx_file = file_import.read()
self.file_name = file_import.name
self._pss = data.get('password')
self._label = data.get('label')
if not self.pfx_file or not self._pss:
msg = _("Is not a valid certificate")
raise forms.ValidationError(msg)
self.signer_init()
if not self._s:
msg = _("Is not a valid certificate")
raise forms.ValidationError(msg)
self.new_did()
return data
def new_did(self):
keys = {
"cert": base64.b64encode(self.pfx_file).decode('utf-8'),
"passphrase": self._pss
}
key_material = json.dumps(keys)
self._did = DID(
key_material=key_material,
did=self.file_name,
label=self._label,
eidas1=True,
type=DID.Types.KEY
)
self._did.set_key_material(key_material)
def save(self, commit=True):
if commit:
self._did.save()
return self._did
return
def signer_init(self):
self._s = certs.load_cert(
self.pfx_file, self._pss.encode('utf-8')
)