200 lines
7.2 KiB
Python
200 lines
7.2 KiB
Python
import asyncio
|
|
import base64
|
|
import copy
|
|
import zlib
|
|
from typing import Callable, Any
|
|
import didkit
|
|
import json
|
|
from ast import literal_eval
|
|
from pyroaring import BitMap
|
|
|
|
|
|
def deep_merge_dict_inplace(d1: dict, d2: dict):
|
|
"""
|
|
Implements d1 |= d2, but recursively.
|
|
Merges d1 and d2, giving preference to keys in d2.
|
|
Keys in d1 but not in d2 are left as-is.
|
|
"""
|
|
for key, val in d2.items():
|
|
if isinstance(d1.get(key, None), dict) and isinstance(val, dict):
|
|
deep_merge_dict_inplace(d1[key], val)
|
|
continue
|
|
d1[key] = val
|
|
|
|
|
|
def deep_merge_dict(d1: dict, d2: dict) -> dict:
|
|
"""
|
|
Implements d1 | d2, but recursively.
|
|
Merges d1 and d2, giving preference to keys in d2.
|
|
Keys in d1 but not in d2 are left as-is.
|
|
"""
|
|
d1 = copy.deepcopy(d1)
|
|
deep_merge_dict_inplace(d1, d2)
|
|
return d1
|
|
|
|
|
|
def deep_filter_dict(f: Callable[[Any], bool], d: dict):
|
|
"""
|
|
Implements builtin filter(), but recursively.
|
|
Applies f to all k,v pairs in d. If some v is a dict, recurse into v instead of applying f(v) directly.
|
|
"""
|
|
for key, val in d.items():
|
|
if isinstance(val, dict):
|
|
yield key, dict(deep_filter_dict(f, val))
|
|
elif f(val):
|
|
yield key, val
|
|
|
|
|
|
def generate_did_controller_key():
|
|
return didkit.generate_ed25519_key()
|
|
|
|
|
|
def keydid_from_controller_key(key):
|
|
return didkit.key_to_did("key", key)
|
|
|
|
|
|
def resolve_did(keydid):
|
|
async def inner():
|
|
return await didkit.resolve_did(keydid, "{}")
|
|
|
|
return asyncio.run(inner())
|
|
|
|
|
|
def webdid_from_controller_key(key):
|
|
"""
|
|
Se siguen los pasos para generar un webdid a partir de un keydid.
|
|
Documentado en la docu de spruceid.
|
|
"""
|
|
keydid = keydid_from_controller_key(key) # "did:key:<...>"
|
|
pubkeyid = keydid.rsplit(":")[-1] # <...>
|
|
document = json.loads(resolve_did(keydid)) # Documento DID en terminos "key"
|
|
webdid_url = f"did:web:idhub.pangea.org:{pubkeyid}" # nueva URL: "did:web:idhub.pangea.org:<...>"
|
|
webdid_url_owner = webdid_url + "#owner"
|
|
# Reemplazamos los campos del documento DID necesarios:
|
|
document["id"] = webdid_url
|
|
document["verificationMethod"][0]["id"] = webdid_url_owner
|
|
document["verificationMethod"][0]["controller"] = webdid_url
|
|
document["authentication"][0] = webdid_url_owner
|
|
document["assertionMethod"][0] = webdid_url_owner
|
|
document_fixed_serialized = json.dumps(document)
|
|
return webdid_url, document_fixed_serialized
|
|
|
|
|
|
def render_and_sign_credential(unsigned_vc: dict, jwk_issuer):
|
|
"""
|
|
Populates a VC template with data for issuance, and signs the result with the provided key.
|
|
|
|
The `vc_data` parameter must at a minimum include:
|
|
* issuer_did
|
|
* subject_did
|
|
* vc_id
|
|
and must include whatever other fields are relevant for the vc_template to be instantiated.
|
|
|
|
The following field(s) will be auto-generated if not passed in `vc_data`:
|
|
* issuance_date (to `datetime.datetime.now()`)
|
|
"""
|
|
async def inner():
|
|
signed_vc = await didkit.issue_credential(
|
|
json.dumps(unsigned_vc),
|
|
'{"proofFormat": "ldp"}',
|
|
jwk_issuer
|
|
)
|
|
return signed_vc
|
|
|
|
# if vc_data.get("issuance_date") is None:
|
|
# vc_data["issuance_date"] = datetime.datetime.now().replace(microsecond=0).isoformat()
|
|
#print(json.dumps(unsigned_vc))
|
|
|
|
return asyncio.run(inner())
|
|
|
|
|
|
def verify_credential(vc):
|
|
"""
|
|
Returns a (bool, str) tuple indicating whether the credential is valid.
|
|
If the boolean is true, the credential is valid and the second argument can be ignored.
|
|
If it is false, the VC is invalid and the second argument contains a JSON object with further information.
|
|
"""
|
|
async def inner():
|
|
try:
|
|
str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}')
|
|
except:
|
|
return False, "Invalid, corrupt, or tampered-with credential."
|
|
res = literal_eval(str_res)
|
|
ok = res["warnings"] == [] and res["errors"] == []
|
|
return ok, str_res
|
|
|
|
valid, reason = asyncio.run(inner())
|
|
if not valid:
|
|
return valid, reason
|
|
# Credential passes basic signature verification. Now check it against its schema.
|
|
# TODO: check agasint schema
|
|
pass
|
|
# Credential verifies against its schema. Now check revocation status.
|
|
vc = json.loads(vc)
|
|
if "credentialStatus" in vc:
|
|
vc_issuer = vc["credentialStatus"]["id"] # Either a DID:WEB or the special value in the line below
|
|
if vc_issuer == "https://revocation.not.supported/":
|
|
return True, "This credential does not support revocation"
|
|
revocation_index = int(vc["credentialStatus"]["revocationBitmapIndex"]) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE.
|
|
issuer_did_document = json.loads(resolve_did(vc_issuer)) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated.
|
|
issuer_revocation_list = issuer_did_document["service"][0]
|
|
assert issuer_revocation_list["type"] == "RevocationBitmap2022"
|
|
revocation_bitmap = BitMap.deserialize(
|
|
zlib.decompress(
|
|
base64.b64decode(
|
|
issuer_revocation_list["serviceEndpoint"].rsplit(",")[1]
|
|
)
|
|
)
|
|
)
|
|
if revocation_index in revocation_bitmap:
|
|
return False, "Credential has been revoked by the issuer"
|
|
# Fallthrough means all is good.
|
|
return True, "Credential passes all checks"
|
|
|
|
|
|
def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str, presentation_id: str) -> str:
|
|
async def inner(unsigned_vp):
|
|
signed_vp = await didkit.issue_presentation(
|
|
unsigned_vp,
|
|
'{"proofFormat": "ldp"}',
|
|
jwk_holder
|
|
)
|
|
return signed_vp
|
|
|
|
unsigned_vp = json.dumps({
|
|
"@context": [
|
|
"https://www.w3.org/2018/credentials/v1"
|
|
],
|
|
"id": presentation_id,
|
|
"type": [
|
|
"VerifiablePresentation"
|
|
],
|
|
"holder": holder_did,
|
|
"verifiableCredential": vc_list
|
|
})
|
|
|
|
return asyncio.run(inner(unsigned_vp))
|
|
|
|
|
|
def verify_presentation(vp: str):
|
|
"""
|
|
Returns a (bool, str) tuple indicating whether the presentation is valid.
|
|
If the boolean is true, the presentation is valid and the second argument can be ignored.
|
|
If it is false, the VC is invalid and the second argument contains further information.
|
|
"""
|
|
async def inner():
|
|
str_res = await didkit.verify_presentation(vp, '{"proofFormat": "ldp"}')
|
|
res = literal_eval(str_res)
|
|
ok = res["warnings"] == [] and res["errors"] == []
|
|
return ok, str_res
|
|
|
|
valid, reason = asyncio.run(inner())
|
|
if not valid:
|
|
return valid, f"Presentation is invalid: {reason}"
|
|
vp = json.loads(vp)
|
|
for idx, credential in enumerate(vp["verifiableCredential"]):
|
|
valid, reason = verify_credential(json.dumps(credential))
|
|
if not valid:
|
|
return valid, f"Credential at index {idx} is invalid with reason: {reason}"
|
|
# Fallthrough means all is good.
|
|
return True, "Verifiable presentation passes all checks" |