IdHub/utils/credtools.py
Cayo Puigdefabregas b96fcf063e clean
2024-04-02 09:52:11 +02:00

342 lines
12 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import json
# import jsonld
import csv
import sys
import jsonschema
# from jsonschema import validate, ValidationError
import requests
from pyld import jsonld
import jsonref
from jsonpath_ng import parse
from datetime import datetime
# def remove_null_values(dictionary):
# return {k: v for k, v in dictionary.items() if v is not None}
def _remove_null_values(dictionary):
filtered = {k: v for k, v in dictionary.items() if v is not None and v != ''}
dictionary.clear()
dictionary.update(filtered)
def validate_context(jsld):
"""Validate a @context string through expanding"""
context = jsld["@context"]
# schema = jsld["credentialSchema"]
# Validate the context
try:
jsonld.expand(context)
print("Context is valid")
except jsonld.JsonLdError:
print("Context is not valid")
return False
return True
def compact_js(doc, context):
"""Validate a @context string through compacting, returns compacted context"""
try:
compacted = jsonld.compact(doc, context)
print(json.dumps(compacted, indent=2))
except jsonld.JsonLdError as e:
print(f"Error compacting document: {e}")
return None
return compacted
def dereference_context_file(json_file):
"""Dereference and return json-ld context from file"""
json_text = open(json_file).read()
json_dict = json.loads(json_text)
return dereference_context(json_dict)
def dereference_context(jsonld_dict):
"""Dereference and return json-ld context"""
try:
# Extract the context from the parsed JSON-LD
context_urls = jsonld_dict.get('@context')
if not context_urls:
raise ValueError("No context found in the JSON-LD string.")
return None
# Dereference each context URL
dereferenced_contexts = []
for context_url in context_urls:
response = requests.get(context_url)
response.raise_for_status() # Raise an exception if the request failed
context_dict = response.json()
dereferenced_context = jsonref.loads(json.dumps(context_dict))
dereferenced_contexts.append(dereferenced_context)
print(f"dereferenced contexts:\n", json.dumps(dereferenced_contexts, indent=4))
return dereferenced_contexts
except (json.JSONDecodeError, requests.RequestException, jsonref.JsonRefError) as e:
print(f"An error occurred: {e}")
return None
def validate_schema_file(json_schema_file):
"""Validate standalone schema from file"""
try:
json_schema = json.loads(open(json_schema_file).read())
validate_schema(json_schema)
except Exception as e:
print(f"Error loading file {json_schema_file} or validating schema {json_schema}: {e}")
return False
return True
def validate_schema(json_schema):
"""Validate standalone schema, returns bool (uses Draft202012Validator, alt: Draft7Validator, alt: Draft4Validator, Draft6Validator )"""
try:
jsonschema.validators.Draft202012Validator.check_schema(json_schema)
# jsonschema.validators.Draft7Validator.check_schema(json_schema)
except jsonschema.exceptions.SchemaError as e:
print(e)
return False
return True
def validate_json_file(json_data_file, json_schema_file):
"""Validate standalone schema from file"""
try:
json_data = json.loads(open(json_data_file).read())
json_schema = json.loads(open(json_schema_file).read())
validate_json(json_data, json_schema)
except Exception as e:
print(f"Error loading file {json_schema_file} or {json_data_file}: {e}")
return False
return True
def validate_json(json_data, json_schema):
"""Validate json string basic (no format) with schema, returns bool"""
try:
jsonschema.validate(instance=json_data, schema=json_schema)
except jsonschema.exceptions.ValidationError as err:
print('Validation error: ', json_data, '\n')
return False
print("Successful validation")
return True
def validate_json_format(json_data, json_schema):
"""Validate a json string basic (including format) with schema, returns bool"""
try:
jsonschema.validate(instance=json_data, schema=json_schema, format_checker=FormatChecker())
except jsonschema.exceptions.ValidationError as err:
print('Validation error: ', json_data, '\n')
return False
return True
def schema_to_csv_file(sch_f, csv_f):
try:
json_schema = json.loads(open(sch_f).read())
except Exception as e:
print(f"Error loading file {sch_f}: {e}\nSchema:\n{json_schema}.")
return False
schema_to_csv(json_schema, csv_f)
return True
def schema_to_csv(schema, csv_file_path):
"""Extract headers from an schema and write to file, returns bool"""
jsonpath_expr = parse('$..credentialSubject.properties')
# Use the JSONPath expression to select all properties under 'credentialSubject.properties'
matches = [match.value for match in jsonpath_expr.find(schema)]
# Get the keys of the matched objects
# headers = [match.keys() for match in matches]
# Use the JSONPath expression to select all properties under 'credentialSubject.properties'
# Get the keys of the matched objects
headers = [key for match in matches for key in match.keys()]
# print('\nHeaders: ', headers)
# Create a CSV file with the headers
with open(csv_file_path, 'w', newline='') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(headers)
return True
def schema_to_xls_basic(schema, xls_file_path):
"""Extract headers from an schema and write to file, returns bool"""
jsonpath_expr = parse('$..credentialSubject.properties')
# Use the JSONPath expression to select all properties under 'credentialSubject.properties'
matches = [match.value for match in jsonpath_expr.find(schema)]
# Get the keys of the matched objects
# headers = [match.keys() for match in matches]
# Get the keys of the matched objects
headers = [key for match in matches for key in match.keys() if key != 'id']
# Create a DataFrame with the fields as columns
df = pd.DataFrame(columns=headers)
# Save the DataFrame as an Excel file
# df.to_excel(xls_file_path, index=False)
df.to_excel(xls_file_path, index=False, engine='openpyxl') # For .xlsx files, and pip install openpyxl
return True
def schema_to_xls_comment(schema, xls_file_path):
"""Extract headers from an schema and write to file, returns bool"""
jsonpath_expr = parse('$..credentialSubject.properties')
# Use the JSONPath expression to select all properties under 'credentialSubject.properties'
matches = [match.value for match in jsonpath_expr.find(schema)]
# Get the keys of the matched objects
# headers = [match.keys() for match in matches]
# Get the keys of the matched objects
headers = [key for match in matches for key in match.keys() if key != 'id']
jsonpath_expr_req = parse('$..credentialSubject.required')
req = [match.value for match in jsonpath_expr_req.find(schema)][0]
# Create a DataFrame with the fields as columns
df = pd.DataFrame(columns=headers)
writer = pd.ExcelWriter(xls_file_path, engine='xlsxwriter')
# Convert the dataframe to an xlsxwriter Excel object
df.to_excel(writer, sheet_name='Full1', index=False)
# Get the xlsxwriter workbook and worksheet objects
workbook = writer.book
matches_title = parse('$.title').find(schema)
title = matches_title[0].value if matches_title else 'no title'
matches_desc = parse('$.description').find(schema)
desc = matches_desc[0].value if matches_desc else 'no description'
matches_id = parse("$['$id']").find(schema)
idschema = matches_id[0].value if matches_id else 'no schema'
matches_subject_desc = parse('$..credentialSubject.description').find(schema)
subject_desc = matches_subject_desc[0].value if matches_subject_desc else 'no subject description'
workbook.set_properties({
'title': title,
'subject': desc,
'author': 'IdHub Orchestral',
'category': subject_desc,
'keywords': 'schema, template, plantilla',
'created': datetime.now().date(), #datetime.date(2018, 1, 1),
'comments': 'Created with Python for IdHub'})
workbook.set_custom_property('Schema', idschema)
worksheet = writer.sheets['Full1']
# Define a format for the required header cells
req_f = workbook.add_format({'border': 1})
req_da = workbook.add_format({'border': 1, 'num_format': 'yyyy-mm-dd'})
req_in = workbook.add_format({'border': 1, 'num_format': '0'})
req_st = workbook.add_format({'border': 1, 'num_format': '@'})
opt_da = workbook.add_format({'num_format': 'yyyy-mm-dd'})
opt_in = workbook.add_format({'num_format': '0'})
opt_st = workbook.add_format({'num_format': '@'})
fmts = {
'string' : {True: req_st, False: opt_st},
'date' : {True: req_da, False: opt_da},
'integer' : {True: req_in, False: opt_in}
}
# Write comments to the cells
for i, header in enumerate(headers):
fmt = {}
#if header in req:
# fmt = req_format
# worksheet.set_column(i,i, None, req_format)
# Get the description for the current field
if 'description' in matches[0][header]:
description = matches[0][header]['description']
if description is not None:
# Write the description as a comment to the corresponding cell
worksheet.write_comment(0, i, description)
# Get the type for the current field
if 'type' in matches[0][header]:
type_field = matches[0][header]['type']
format_field = None
if 'format' in matches[0][header]:
format_field = matches[0][header]['format']
if type_field is not None:
if format_field is not None and format_field == 'date':
type_field = 'date'
fmt = fmts[type_field][header in req] # Add type format
print(f'header {header} with fmt {fmt}\n')
worksheet.set_column(i,i, None, fmt)
# Close the Pandas Excel writer and output the Excel file
worksheet.autofit()
writer.close()
return True
def csv_to_json(csvFilePath, schema, jsonFilePath):
"""Read from a csv file, check schema, write to json file, returns bool"""
jsonArray = []
# Read CSV file
with open(csvFilePath, 'r') as csvf:
# Load CSV file data using csv library's dictionary reader
csvReader = csv.DictReader(csvf)
# Convert each CSV row into python dict and validate against schema
for row in csvReader:
_remove_null_values(row)
print('Row: ', row, '\n')
validate_json(row, schema)
# Add this python dict to json array
jsonArray.append(row)
# Convert python jsonArray to JSON String and write to file
with open(jsonFilePath, 'w', encoding='utf-8') as jsonf:
jsonString = json.dumps(jsonArray, indent=4)
jsonf.write(jsonString)
return True
def csv_to_json2(csv_file_path, json_file_path):
"""Read from a csv file, write to json file (assumes a row 'No' is primary key), returns bool EXPERIMENT"""
# Create a dictionary
data = {}
# Open a csv reader called DictReader
with open(csv_file_path, encoding='utf-8') as csvf:
csvReader = csv.DictReader(csvf)
# Convert each row into a dictionary and add it to data
for rows in csvReader:
# Assuming a column named 'No' to be the primary key
key = rows['No']
data[key] = rows
# Open a json writer, and use the json.dumps() function to dump data
with open(json_file_path, 'w', encoding='utf-8') as jsonf:
jsonf.write(json.dumps(data, indent=4))
return True
if __name__ == "__main__":
# sch_name = sys.argv[1]
schemas = sys.argv[1:]
# credtools.py course-credential device-purchase e-operator-claim federation-membership financial-vulnerability membership-card
#sch_name = 'e-operator-claim'
for i, schema in enumerate(schemas):
print(schema)
sch = json.loads(open('vc_schemas/' + schema + '.json').read())
if schema_to_xls_comment(sch,'vc_excel/' + schema + '.xlsx'):
print('Success')
else:
print("Validation error: ", schema)