2023-11-10 05:45:12 +00:00
import asyncio
2024-02-01 20:35:31 +00:00
import base64
2023-11-10 05:45:12 +00:00
import datetime
2024-02-01 20:35:31 +00:00
import zlib
2024-01-03 15:53:06 +00:00
from typing import Any
2023-11-10 05:45:12 +00:00
import didkit
import json
import jinja2
2023-11-22 13:09:08 +00:00
from jinja2 import Environment , FileSystemLoader , select_autoescape
2024-01-03 15:53:06 +00:00
from ast import literal_eval
2023-11-22 13:09:08 +00:00
2024-02-01 20:35:31 +00:00
from pyroaring import BitMap
2023-11-10 05:45:12 +00:00
def generate_did_controller_key ( ) :
return didkit . generate_ed25519_key ( )
2023-11-28 08:45:06 +00:00
def keydid_from_controller_key ( key ) :
2023-11-10 05:45:12 +00:00
return didkit . key_to_did ( " key " , key )
def generate_generic_vc_id ( ) :
# TODO agree on a system for Verifiable Credential IDs
return " https://pangea.org/credentials/42 "
2024-02-01 20:35:31 +00:00
def resolve_did ( keydid ) :
async def inner ( ) :
return await didkit . resolve_did ( keydid , " {} " )
return asyncio . run ( inner ( ) )
2024-01-25 10:02:41 +00:00
def webdid_from_controller_key ( key ) :
"""
Se siguen los pasos para generar un webdid a partir de un keydid .
Documentado en la docu de spruceid .
"""
keydid = keydid_from_controller_key ( key ) # "did:key:<...>"
pubkeyid = keydid . rsplit ( " : " ) [ - 1 ] # <...>
2024-02-01 20:35:31 +00:00
document = json . loads ( resolve_did ( keydid ) ) # Documento DID en terminos "key"
2024-01-25 10:02:41 +00:00
webdid_url = f " did:web:idhub.pangea.org: { pubkeyid } " # nueva URL: "did:web:idhub.pangea.org:<...>"
webdid_url_owner = webdid_url + " #owner "
# Reemplazamos los campos del documento DID necesarios:
document [ " id " ] = webdid_url
document [ " verificationMethod " ] [ 0 ] [ " id " ] = webdid_url_owner
document [ " verificationMethod " ] [ 0 ] [ " controller " ] = webdid_url
document [ " authentication " ] [ 0 ] = webdid_url_owner
document [ " assertionMethod " ] [ 0 ] = webdid_url_owner
document_fixed_serialized = json . dumps ( document )
return webdid_url , document_fixed_serialized
2024-01-03 15:53:06 +00:00
def render_and_sign_credential ( unsigned_vc : dict , jwk_issuer ) :
2023-11-10 05:45:12 +00:00
"""
Populates a VC template with data for issuance , and signs the result with the provided key .
The ` vc_data ` parameter must at a minimum include :
* issuer_did
* subject_did
* vc_id
and must include whatever other fields are relevant for the vc_template to be instantiated .
The following field ( s ) will be auto - generated if not passed in ` vc_data ` :
* issuance_date ( to ` datetime . datetime . now ( ) ` )
"""
async def inner ( ) :
signed_vc = await didkit . issue_credential (
2024-01-03 15:53:06 +00:00
json . dumps ( unsigned_vc ) ,
2023-11-10 05:45:12 +00:00
' { " proofFormat " : " ldp " } ' ,
jwk_issuer
)
return signed_vc
2024-01-03 15:53:06 +00:00
# if vc_data.get("issuance_date") is None:
# vc_data["issuance_date"] = datetime.datetime.now().replace(microsecond=0).isoformat()
2023-11-10 05:45:12 +00:00
return asyncio . run ( inner ( ) )
2023-11-28 08:45:06 +00:00
def sign_credential ( unsigned_vc : str , jwk_issuer ) :
"""
Signs the unsigned credential with the provided key .
"""
async def inner ( ) :
signed_vc = await didkit . issue_credential (
unsigned_vc ,
' { " proofFormat " : " ldp " } ' ,
jwk_issuer
)
return signed_vc
return asyncio . run ( inner ( ) )
def verify_credential ( vc ) :
2023-11-10 05:45:12 +00:00
"""
Returns a ( bool , str ) tuple indicating whether the credential is valid .
If the boolean is true , the credential is valid and the second argument can be ignored .
2024-02-01 20:35:31 +00:00
If it is false , the VC is invalid and the second argument contains a JSON object with further information .
2023-11-10 05:45:12 +00:00
"""
async def inner ( ) :
2024-01-03 15:53:06 +00:00
str_res = await didkit . verify_credential ( vc , ' { " proofFormat " : " ldp " } ' )
res = literal_eval ( str_res )
ok = res [ " warnings " ] == [ ] and res [ " errors " ] == [ ]
return ok , str_res
2024-02-01 20:35:31 +00:00
valid , reason = asyncio . run ( inner ( ) )
if not valid :
return valid , reason
# Credential passes basic signature verification. Now check it against its schema.
# TODO: check agasint schema
pass
# Credential verifies against its schema. Now check revocation status.
vc = json . loads ( vc )
if " credentialStatus " in vc :
revocation_index = int ( vc [ " credentialStatus " ] [ " revocationBitmapIndex " ] ) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE.
vc_issuer = vc [ " issuer " ] [ " id " ] # This is a DID
issuer_did_document = json . loads ( resolve_did ( vc_issuer ) ) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated.
issuer_revocation_list = issuer_did_document [ " service " ] [ 0 ]
assert issuer_revocation_list [ " type " ] == " RevocationBitmap2022 "
revocation_bitmap = BitMap . deserialize (
zlib . decompress (
base64 . b64decode (
issuer_revocation_list [ " serviceEndpoint " ] . rsplit ( " , " ) [ 1 ]
)
)
)
if revocation_index in revocation_bitmap :
return False , " Credential has been revoked by the issuer "
# Fallthrough means all is good.
return True , " Credential passes all checks "
2023-11-22 13:09:08 +00:00
def issue_verifiable_presentation ( vc_list : list [ str ] , jwk_holder : str , holder_did : str ) - > str :
async def inner ( ) :
unsigned_vp = unsigned_vp_template . render ( data )
signed_vp = await didkit . issue_presentation (
unsigned_vp ,
' { " proofFormat " : " ldp " } ' ,
jwk_holder
)
return signed_vp
env = Environment (
loader = FileSystemLoader ( " vc_templates " ) ,
autoescape = select_autoescape ( )
)
unsigned_vp_template = env . get_template ( " verifiable_presentation.json " )
data = {
" holder_did " : holder_did ,
" verifiable_credential_list " : " [ " + " , " . join ( vc_list ) + " ] "
}
return asyncio . run ( inner ( ) )
2023-11-27 06:26:02 +00:00
def verify_presentation ( vp ) :
"""
2024-02-01 20:35:31 +00:00
Returns a ( bool , str ) tuple indicating whether the presentation is valid .
If the boolean is true , the presentation is valid and the second argument can be ignored .
2023-11-27 06:26:02 +00:00
If it is false , the VC is invalid and the second argument contains a JSON object with further information .
"""
async def inner ( ) :
proof_options = ' { " proofFormat " : " ldp " } '
2023-11-28 08:45:06 +00:00
return await didkit . verify_presentation ( vp , proof_options )
2023-11-27 06:26:02 +00:00
return asyncio . run ( inner ( ) )