Merge pull request 'bugfix/136-upload-bad-credentials' (#138) from bugfix/136-upload-bad-credentials into release

Reviewed-on: https://gitea.pangea.org/trustchain-oc1-orchestral/IdHub/pulls/138
This commit is contained in:
cayop 2024-02-15 23:28:52 +00:00
commit 1cd90d8321
4 changed files with 126 additions and 33 deletions

View File

@ -1,7 +1,8 @@
import csv import csv
import json import json
import base64
import copy import copy
import base64
import jsonschema
import pandas as pd import pandas as pd
from pyhanko.sign import signers from pyhanko.sign import signers
@ -176,38 +177,28 @@ class ImportForm(forms.Form):
self._schema = schema.first() self._schema = schema.first()
try: try:
self.json_schema = json.loads(self._schema.data) self.json_schema = self._schema.get_credential_subject_schema()
props = [x for x in self.json_schema["allOf"] if 'properties' in x.keys()]
prop = props[0]['properties']
self.properties = prop['credentialSubject']['properties']
except Exception: except Exception:
raise ValidationError("Schema is not valid!") raise ValidationError("Schema is not valid!")
if not self.properties:
raise ValidationError("Schema is not valid!")
# TODO we need filter "$ref" of schema for can validate a csv
self.json_schema_filtered = copy.copy(self.json_schema)
allOf = [x for x in self.json_schema["allOf"] if '$ref' not in x.keys()]
self.json_schema_filtered["allOf"] = allOf
return data return data
def clean_file_import(self): def clean_file_import(self):
data = self.cleaned_data["file_import"] data = self.cleaned_data["file_import"]
self.file_name = data.name self.file_name = data.name
if File_datas.objects.filter(file_name=self.file_name, success=True).exists():
raise ValidationError("This file already exists!")
# df = pd.read_csv (data, delimiter="\t", quotechar='"', quoting=csv.QUOTE_ALL)
df = pd.read_excel(data) df = pd.read_excel(data)
# convert dates to iso 8601
for col in df.select_dtypes(include='datetime').columns:
df[col] = df[col].dt.strftime("%Y-%m-%d")
data_pd = df.fillna('').to_dict() data_pd = df.fillna('').to_dict()
if not data_pd: if not data_pd:
self.exception("This file is empty!") self.exception("This file is empty!")
head_row = {x: '' for x in self.properties.keys()}
for n in range(df.last_valid_index()+1): for n in range(df.last_valid_index()+1):
row = head_row.copy() row = {}
for k in data_pd.keys(): for k in data_pd.keys():
row[k] = data_pd[k][n] or '' row[k] = data_pd[k][n] or ''
@ -231,12 +222,15 @@ class ImportForm(forms.Form):
def validate_jsonld(self, line, row): def validate_jsonld(self, line, row):
try: try:
check = credtools.validate_json(row, self.json_schema_filtered) jsonschema.validate(instance=row, schema=self.json_schema)
if check is not True: except jsonschema.exceptions.ValidationError as err:
raise ValidationError("Not valid row") msg = "line {}: {}".format(line+1, err)
except Exception as e: return self.exception(msg)
msg = "line {}: {}".format(line+1, e) # try:
self.exception(msg) # check = credtools.validate_json(row, self.json_schema)
# if check is not True:
# raise ValidationError("Not valid row")
# except Exception as e:
user, new = User.objects.get_or_create(email=row.get('email')) user, new = User.objects.get_or_create(email=row.get('email'))
if new: if new:
@ -245,6 +239,18 @@ class ImportForm(forms.Form):
return user return user
def create_credential(self, user, row): def create_credential(self, user, row):
bcred = VerificableCredential.objects.filter(
user=user,
schema=self._schema,
issuer_did=self._did,
status=VerificableCredential.Status.ENABLED
)
if bcred.exists():
cred = bcred.first()
cred.csv_data = json.dumps(row, default=str)
cred.eidas1_did = self._eidas1
return cred
cred = VerificableCredential( cred = VerificableCredential(
verified=False, verified=False,
user=user, user=user,

View File

@ -16,6 +16,7 @@ from utils.idhub_ssikit import (
keydid_from_controller_key, keydid_from_controller_key,
sign_credential, sign_credential,
webdid_from_controller_key, webdid_from_controller_key,
verify_credential,
) )
from idhub_auth.models import User from idhub_auth.models import User
@ -542,6 +543,28 @@ class Schemas(models.Model):
def description(self, value): def description(self, value):
self._description = value self._description = value
def get_credential_subject_schema(self):
sc = self.get_data()
properties = sc["allOf"][1]["properties"]["credentialSubject"]["properties"]
required = sc["allOf"][1]["properties"]["credentialSubject"]["required"]
if "id" in required:
required.remove("id")
schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False
}
return schema
def get_data(self):
return json.loads(self.data)
class VerificableCredential(models.Model): class VerificableCredential(models.Model):
""" """
Definition of Verificable Credentials Definition of Verificable Credentials
@ -633,7 +656,6 @@ class VerificableCredential(models.Model):
if self.status == self.Status.ISSUED: if self.status == self.Status.ISSUED:
return return
self.status = self.Status.ISSUED
self.subject_did = did self.subject_did = did
self.issued_on = datetime.datetime.now().astimezone(pytz.utc) self.issued_on = datetime.datetime.now().astimezone(pytz.utc)
issuer_pass = cache.get("KEY_DIDS") issuer_pass = cache.get("KEY_DIDS")
@ -648,11 +670,17 @@ class VerificableCredential(models.Model):
self.render(domain), self.render(domain),
self.issuer_did.get_key_material(issuer_pass) self.issuer_did.get_key_material(issuer_pass)
) )
valid, reason = verify_credential(data)
if not valid:
return
if self.eidas1_did: if self.eidas1_did:
self.data = data self.data = data
else: else:
self.data = self.user.encrypt_data(data, password) self.data = self.user.encrypt_data(data, password)
self.status = self.Status.ISSUED
def get_context(self, domain): def get_context(self, domain):
d = json.loads(self.csv_data) d = json.loads(self.csv_data)
issuance_date = '' issuance_date = ''

View File

@ -36,6 +36,14 @@
"description": "Legal name of the operator", "description": "Legal name of the operator",
"type": "string" "type": "string"
}, },
"role": {
"description": "Role, either operator, witness, auditor",
"type": "string"
},
"email": {
"type": "string",
"format": "email"
},
"accreditedBy": { "accreditedBy": {
"description": "Legal name of the accrediting entity", "description": "Legal name of the accrediting entity",
"type": "string" "type": "string"
@ -51,16 +59,13 @@
"accreditedFor": { "accreditedFor": {
"description": "Operation type: e.g. manufacture, repair, refurbishment, remanufacture, transport, dismantle, recycle, verification, audit", "description": "Operation type: e.g. manufacture, repair, refurbishment, remanufacture, transport, dismantle, recycle, verification, audit",
"type": "string" "type": "string"
},
"role": {
"description": "Role, either operator, witness, auditor",
"type": "string"
} }
}, },
"required": [ "required": [
"id", "id",
"legalName", "legalName",
"role" "role",
"email"
] ]
} }
} }

View File

@ -9,6 +9,7 @@ import requests
from pyld import jsonld from pyld import jsonld
import jsonref import jsonref
from jsonpath_ng import jsonpath, parse from jsonpath_ng import jsonpath, parse
from datetime import datetime
# def remove_null_values(dictionary): # def remove_null_values(dictionary):
@ -205,22 +206,75 @@ def schema_to_xls_comment(schema, xls_file_path):
# Get the xlsxwriter workbook and worksheet objects # Get the xlsxwriter workbook and worksheet objects
workbook = writer.book workbook = writer.book
matches_title = parse('$.title').find(schema)
title = matches_title[0].value if matches_title else 'no title'
matches_desc = parse('$.description').find(schema)
desc = matches_desc[0].value if matches_desc else 'no description'
matches_id = parse("$['$id']").find(schema)
idschema = matches_id[0].value if matches_id else 'no schema'
matches_subject_desc = parse('$..credentialSubject.description').find(schema)
subject_desc = matches_subject_desc[0].value if matches_subject_desc else 'no subject description'
workbook.set_properties({
'title': title,
'subject': desc,
'author': 'IdHub Orchestral',
'category': subject_desc,
'keywords': 'schema, template, plantilla',
'created': datetime.now().date(), #datetime.date(2018, 1, 1),
'comments': 'Created with Python for IdHub'})
workbook.set_custom_property('Schema', idschema)
worksheet = writer.sheets['Full1'] worksheet = writer.sheets['Full1']
# Define a format for the required header cells # Define a format for the required header cells
req_format = workbook.add_format({'border': 1}) req_f = workbook.add_format({'border': 1})
# cell_format = workbook.add_format({'bold': True, 'font_color': 'red'}) req_da = workbook.add_format({'border': 1, 'num_format': 'yyyy-mm-dd'})
req_in = workbook.add_format({'border': 1, 'num_format': '0'})
req_st = workbook.add_format({'border': 1, 'num_format': '@'})
opt_da = workbook.add_format({'num_format': 'yyyy-mm-dd'})
opt_in = workbook.add_format({'num_format': '0'})
opt_st = workbook.add_format({'num_format': '@'})
fmts = {
'string' : {True: req_st, False: opt_st},
'date' : {True: req_da, False: opt_da},
'integer' : {True: req_in, False: opt_in}
}
# Write comments to the cells # Write comments to the cells
for i, header in enumerate(headers): for i, header in enumerate(headers):
if header in req: fmt = {}
worksheet.set_column(i,i, None, req_format) #if header in req:
# fmt = req_format
# worksheet.set_column(i,i, None, req_format)
# Get the description for the current field # Get the description for the current field
if 'description' in matches[0][header]: if 'description' in matches[0][header]:
description = matches[0][header]['description'] description = matches[0][header]['description']
if description is not None: if description is not None:
# Write the description as a comment to the corresponding cell # Write the description as a comment to the corresponding cell
worksheet.write_comment(0, i, description) worksheet.write_comment(0, i, description)
# Get the type for the current field
if 'type' in matches[0][header]:
type_field = matches[0][header]['type']
format_field = None
if 'format' in matches[0][header]:
format_field = matches[0][header]['format']
if type_field is not None:
if format_field is not None and format_field == 'date':
type_field = 'date'
fmt = fmts[type_field][header in req] # Add type format
print(f'header {header} with fmt {fmt}\n')
worksheet.set_column(i,i, None, fmt)
# Close the Pandas Excel writer and output the Excel file # Close the Pandas Excel writer and output the Excel file
worksheet.autofit() worksheet.autofit()