2023-11-08 10:24:37 +00:00
|
|
|
import csv
|
|
|
|
import json
|
|
|
|
import pandas as pd
|
|
|
|
from jsonschema import validate
|
|
|
|
|
2023-10-30 12:53:19 +00:00
|
|
|
from django import forms
|
2023-11-13 17:09:37 +00:00
|
|
|
from django.utils.translation import gettext_lazy as _
|
2023-11-08 10:24:37 +00:00
|
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from idhub.models import (
|
|
|
|
DID,
|
2023-11-13 17:09:37 +00:00
|
|
|
File_datas,
|
|
|
|
Membership,
|
2023-11-08 10:24:37 +00:00
|
|
|
Schemas,
|
2023-11-14 14:55:17 +00:00
|
|
|
Service,
|
|
|
|
UserRol,
|
2023-11-08 10:24:37 +00:00
|
|
|
VerificableCredential,
|
|
|
|
)
|
|
|
|
from idhub_auth.models import User
|
2023-10-30 12:53:19 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ImportForm(forms.Form):
|
2023-11-08 10:24:37 +00:00
|
|
|
did = forms.ChoiceField(choices=[])
|
|
|
|
schema = forms.ChoiceField(choices=[])
|
2023-10-30 12:53:19 +00:00
|
|
|
file_import = forms.FileField()
|
2023-11-02 09:59:15 +00:00
|
|
|
|
2023-11-08 10:24:37 +00:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
self._schema = None
|
|
|
|
self._did = None
|
|
|
|
self.rows = {}
|
|
|
|
self.user = kwargs.pop('user', None)
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.fields['did'].choices = [
|
|
|
|
(x.did, x.label) for x in DID.objects.filter(user=self.user)
|
|
|
|
]
|
|
|
|
self.fields['schema'].choices = [
|
|
|
|
(x.id, x.name()) for x in Schemas.objects.filter()
|
|
|
|
]
|
|
|
|
|
|
|
|
def clean_did(self):
|
|
|
|
data = self.cleaned_data["did"]
|
|
|
|
did = DID.objects.filter(
|
|
|
|
user=self.user,
|
|
|
|
did=data
|
|
|
|
)
|
|
|
|
|
|
|
|
if not did.exists():
|
|
|
|
raise ValidationError("Did is not valid!")
|
|
|
|
|
|
|
|
self._did = did.first()
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
def clean_schema(self):
|
|
|
|
data = self.cleaned_data["schema"]
|
|
|
|
schema = Schemas.objects.filter(
|
|
|
|
id=data
|
|
|
|
)
|
|
|
|
if not schema.exists():
|
|
|
|
raise ValidationError("Schema is not valid!")
|
|
|
|
|
|
|
|
self._schema = schema.first()
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
def clean_file_import(self):
|
|
|
|
data = self.cleaned_data["file_import"]
|
|
|
|
self.file_name = data.name
|
|
|
|
if File_datas.objects.filter(file_name=self.file_name, success=True).exists():
|
|
|
|
raise ValidationError("This file already exists!")
|
|
|
|
|
|
|
|
self.json_schema = json.loads(self._schema.data)
|
|
|
|
df = pd.read_csv (data, delimiter="\t", quotechar='"', quoting=csv.QUOTE_ALL)
|
|
|
|
data_pd = df.fillna('').to_dict()
|
|
|
|
|
|
|
|
if not data_pd:
|
|
|
|
self.exception("This file is empty!")
|
|
|
|
|
|
|
|
for n in range(df.last_valid_index()+1):
|
|
|
|
row = {}
|
|
|
|
for k in data_pd.keys():
|
|
|
|
row[k] = data_pd[k][n]
|
|
|
|
|
|
|
|
user = self.validate_jsonld(n, row)
|
|
|
|
self.rows[user] = row
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
def save(self, commit=True):
|
|
|
|
table = []
|
|
|
|
for k, v in self.rows.items():
|
|
|
|
table.append(self.create_credential(k, v))
|
|
|
|
|
|
|
|
if commit:
|
|
|
|
for cred in table:
|
|
|
|
cred.save()
|
|
|
|
File_datas.objects.create(file_name=self.file_name)
|
|
|
|
return table
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
def validate_jsonld(self, line, row):
|
|
|
|
try:
|
|
|
|
validate(instance=row, schema=self.json_schema)
|
|
|
|
except Exception as e:
|
|
|
|
msg = "line {}: {}".format(line+1, e)
|
|
|
|
self.exception(msg)
|
|
|
|
|
|
|
|
user = User.objects.filter(email=row.get('email'))
|
|
|
|
if not user:
|
2023-11-13 09:15:52 +00:00
|
|
|
txt = _('The user does not exist!')
|
2023-11-08 10:24:37 +00:00
|
|
|
msg = "line {}: {}".format(line+1, txt)
|
|
|
|
self.exception(msg)
|
|
|
|
|
|
|
|
return user.first()
|
|
|
|
|
|
|
|
def create_credential(self, user, row):
|
|
|
|
return VerificableCredential(
|
|
|
|
verified=False,
|
|
|
|
user=user,
|
2023-11-14 17:50:23 +00:00
|
|
|
csv_data=json.dumps(row),
|
2023-11-15 10:43:13 +00:00
|
|
|
issuer_did=self._did,
|
2023-11-14 17:50:23 +00:00
|
|
|
schema=self._schema,
|
2023-11-08 10:24:37 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
def exception(self, msg):
|
|
|
|
File_datas.objects.create(file_name=self.file_name, success=False)
|
|
|
|
raise ValidationError(msg)
|
|
|
|
|
2023-11-02 09:59:15 +00:00
|
|
|
|
|
|
|
class SchemaForm(forms.Form):
|
|
|
|
file_template = forms.FileField()
|
2023-11-13 17:09:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
class MembershipForm(forms.ModelForm):
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
model = Membership
|
|
|
|
fields = ['type', 'start_date', 'end_date']
|
|
|
|
|
|
|
|
def clean_end_date(self):
|
|
|
|
data = super().clean()
|
|
|
|
start_date = data['start_date']
|
|
|
|
end_date = data.get('end_date')
|
2023-11-14 14:55:17 +00:00
|
|
|
members = Membership.objects.filter(
|
|
|
|
type=data['type'],
|
|
|
|
user=self.instance.user
|
|
|
|
)
|
|
|
|
if self.instance.id:
|
|
|
|
members = members.exclude(id=self.instance.id)
|
|
|
|
|
2023-11-13 17:09:37 +00:00
|
|
|
if (start_date and end_date):
|
|
|
|
if start_date > end_date:
|
|
|
|
msg = _("The end date is less than the start date")
|
|
|
|
raise forms.ValidationError(msg)
|
|
|
|
|
2023-11-14 14:55:17 +00:00
|
|
|
members = members.filter(
|
2023-11-13 17:09:37 +00:00
|
|
|
start_date__lte=end_date,
|
|
|
|
end_date__gte=start_date,
|
|
|
|
)
|
2023-11-14 14:55:17 +00:00
|
|
|
|
|
|
|
if members.exists():
|
2023-11-13 17:09:37 +00:00
|
|
|
msg = _("This membership already exists!")
|
|
|
|
raise forms.ValidationError(msg)
|
|
|
|
|
|
|
|
return end_date
|
2023-11-14 14:55:17 +00:00
|
|
|
|
|
|
|
|
|
|
|
class UserRolForm(forms.ModelForm):
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
model = UserRol
|
|
|
|
fields = ['service']
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if not self.instance.id:
|
|
|
|
user = self.instance.user
|
|
|
|
choices = self.fields['service'].choices
|
|
|
|
choices.queryset = choices.queryset.exclude(users__user=user)
|
|
|
|
self.fields['service'].choices = choices
|
|
|
|
|
|
|
|
def clean_service(self):
|
|
|
|
data = super().clean()
|
|
|
|
service = UserRol.objects.filter(
|
|
|
|
service=data['service'],
|
|
|
|
user=self.instance.user
|
|
|
|
)
|
|
|
|
|
|
|
|
if service.exists():
|
|
|
|
msg = _("Is not possible to have a duplicate role")
|
|
|
|
raise forms.ValidationError(msg)
|
|
|
|
|
|
|
|
return data['service']
|