From f67c6882595468fef4d07a07ec9528120c007352 Mon Sep 17 00:00:00 2001 From: Daniel Armengod Date: Thu, 1 Feb 2024 21:35:31 +0100 Subject: [PATCH] =?UTF-8?q?A=C3=B1adido=20tests=20para=20la=20revocaci?= =?UTF-8?q?=C3=B3n=20de=20certificados?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- idhub_ssikit/__init__.py | 53 ++++++++++++++++++++++++++++------------ main.py | 25 +++++++++++++++++++ 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/idhub_ssikit/__init__.py b/idhub_ssikit/__init__.py index b76e6ba..fca76a1 100644 --- a/idhub_ssikit/__init__.py +++ b/idhub_ssikit/__init__.py @@ -1,5 +1,7 @@ import asyncio +import base64 import datetime +import zlib from typing import Any import didkit @@ -8,6 +10,8 @@ import jinja2 from jinja2 import Environment, FileSystemLoader, select_autoescape from ast import literal_eval +from pyroaring import BitMap + def generate_did_controller_key(): return didkit.generate_ed25519_key() @@ -22,8 +26,11 @@ def generate_generic_vc_id(): return "https://pangea.org/credentials/42" -async def resolve_keydid(keydid): - return await didkit.resolve_did(keydid, "{}") +def resolve_did(keydid): + async def inner(): + return await didkit.resolve_did(keydid, "{}") + + return asyncio.run(inner()) def webdid_from_controller_key(key): @@ -33,7 +40,7 @@ def webdid_from_controller_key(key): """ keydid = keydid_from_controller_key(key) # "did:key:<...>" pubkeyid = keydid.rsplit(":")[-1] # <...> - document = json.loads(asyncio.run(resolve_keydid(keydid))) # Documento DID en terminos "key" + document = json.loads(resolve_did(keydid)) # Documento DID en terminos "key" webdid_url = f"did:web:idhub.pangea.org:{pubkeyid}" # nueva URL: "did:web:idhub.pangea.org:<...>" webdid_url_owner = webdid_url + "#owner" # Reemplazamos los campos del documento DID necesarios: @@ -91,11 +98,8 @@ def sign_credential(unsigned_vc: str, jwk_issuer): def verify_credential(vc): """ Returns a (bool, str) tuple indicating whether the credential is valid. - Checks performed: - * The credential is valid in signature and form, and - * The credential validates itself against its declared schema. If the boolean is true, the credential is valid and the second argument can be ignored. - If it is false, the VC is invalid and the second argument contains a string (which is a valid JSON object) with further information. + If it is false, the VC is invalid and the second argument contains a JSON object with further information. """ async def inner(): str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}') @@ -103,12 +107,31 @@ def verify_credential(vc): ok = res["warnings"] == [] and res["errors"] == [] return ok, str_res - (ok, res) = asyncio.run(inner()) - if not ok: - # The credential doesn't pass signature checks, so early return - return ok, res - return ok, res - + valid, reason = asyncio.run(inner()) + if not valid: + return valid, reason + # Credential passes basic signature verification. Now check it against its schema. + # TODO: check agasint schema + pass + # Credential verifies against its schema. Now check revocation status. + vc = json.loads(vc) + if "credentialStatus" in vc: + revocation_index = int(vc["credentialStatus"]["revocationBitmapIndex"]) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE. + vc_issuer = vc["issuer"]["id"] # This is a DID + issuer_did_document = json.loads(resolve_did(vc_issuer)) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated. + issuer_revocation_list = issuer_did_document["service"][0] + assert issuer_revocation_list["type"] == "RevocationBitmap2022" + revocation_bitmap = BitMap.deserialize( + zlib.decompress( + base64.b64decode( + issuer_revocation_list["serviceEndpoint"].rsplit(",")[1] + ) + ) + ) + if revocation_index in revocation_bitmap: + return False, "Credential has been revoked by the issuer" + # Fallthrough means all is good. + return True, "Credential passes all checks" def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str) -> str: @@ -136,8 +159,8 @@ def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_di def verify_presentation(vp): """ - Returns a (bool, str) tuple indicating whether the credential is valid. - If the boolean is true, the credential is valid and the second argument can be ignored. + Returns a (bool, str) tuple indicating whether the presentation is valid. + If the boolean is true, the presentation is valid and the second argument can be ignored. If it is false, the VC is invalid and the second argument contains a JSON object with further information. """ async def inner(): diff --git a/main.py b/main.py index 8313eb2..a0a4b9f 100644 --- a/main.py +++ b/main.py @@ -70,6 +70,31 @@ def test_all_vcs(use_webdid=False): print(e) +def did_web_issue_vc_and_check_revocation(vc_name, revoked=True): + jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}' + jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}' + + did_issuer = "did:web:idhub.pangea.org:did-registry:allRevoked" if revoked else "did:web:idhub.pangea.org:did-registry:noneRevoked" + did_subject = didkit.key_to_did("key", jwk_subject) + + vc_template = json.load(open(f'../../schemas/vc_templates/{vc_name}.json')) + data_base = json.load(open(f'../../schemas/vc_examples/base--data.json')) + data_base["issuer"]["id"] = did_issuer + data_base["credentialSubject"]["id"] = did_subject + data_specific = json.load(open(f'../../schemas/vc_examples/{vc_name}--data.json')) + data = deep_merge_dict(data_base, data_specific) + vc_rendered_unsigned = deep_merge_dict(vc_template, data) + + signed_credential = idhub_ssikit.render_and_sign_credential( + vc_rendered_unsigned, + jwk_issuer, + ) + + ok, reason = idhub_ssikit.verify_credential(signed_credential) + print(ok) + print(reason) + + def did_web_issue_vc_test_newstyle(vc_name): jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}' jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}'