diff --git a/idhub_ssikit/__init__.py b/idhub_ssikit/__init__.py index 5aab1cc..5fcabb0 100644 --- a/idhub_ssikit/__init__.py +++ b/idhub_ssikit/__init__.py @@ -1,18 +1,50 @@ import asyncio import base64 -import datetime +import copy import zlib -from typing import Any - +from typing import Callable, Any import didkit import json -import jinja2 -from jinja2 import Environment, FileSystemLoader, select_autoescape from ast import literal_eval - from pyroaring import BitMap +def deep_merge_dict_inplace(d1: dict, d2: dict): + """ + Implements d1 |= d2, but recursively. + Merges d1 and d2, giving preference to keys in d2. + Keys in d1 but not in d2 are left as-is. + """ + for key, val in d2.items(): + if isinstance(d1.get(key, None), dict) and isinstance(val, dict): + deep_merge_dict_inplace(d1[key], val) + continue + d1[key] = val + + +def deep_merge_dict(d1: dict, d2: dict) -> dict: + """ + Implements d1 | d2, but recursively. + Merges d1 and d2, giving preference to keys in d2. + Keys in d1 but not in d2 are left as-is. + """ + d1 = copy.deepcopy(d1) + deep_merge_dict_inplace(d1, d2) + return d1 + + +def deep_filter_dict(f: Callable[[Any], bool], d: dict): + """ + Implements builtin filter(), but recursively. + Applies f to all k,v pairs in d. If some v is a dict, recurse into v instead of applying f(v) directly. + """ + for key, val in d.items(): + if isinstance(val, dict): + yield key, dict(deep_filter_dict(f, val)) + elif f(val): + yield key, val + + def generate_did_controller_key(): return didkit.generate_ed25519_key() @@ -21,11 +53,6 @@ def keydid_from_controller_key(key): return didkit.key_to_did("key", key) -def generate_generic_vc_id(): - # TODO agree on a system for Verifiable Credential IDs - return "https://pangea.org/credentials/42" - - def resolve_did(keydid): async def inner(): return await didkit.resolve_did(keydid, "{}") @@ -81,21 +108,6 @@ def render_and_sign_credential(unsigned_vc: dict, jwk_issuer): return asyncio.run(inner()) -def sign_credential(unsigned_vc: str, jwk_issuer): - """ - Signs the unsigned credential with the provided key. - """ - async def inner(): - signed_vc = await didkit.issue_credential( - unsigned_vc, - '{"proofFormat": "ldp"}', - jwk_issuer - ) - return signed_vc - - return asyncio.run(inner()) - - def verify_credential(vc): """ Returns a (bool, str) tuple indicating whether the credential is valid. @@ -103,7 +115,10 @@ def verify_credential(vc): If it is false, the VC is invalid and the second argument contains a JSON object with further information. """ async def inner(): - str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}') + try: + str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}') + except: + return False, "Invalid, corrupt, or tampered-with credential." res = literal_eval(str_res) ok = res["warnings"] == [] and res["errors"] == [] return ok, str_res @@ -117,8 +132,10 @@ def verify_credential(vc): # Credential verifies against its schema. Now check revocation status. vc = json.loads(vc) if "credentialStatus" in vc: + vc_issuer = vc["credentialStatus"]["id"] # Either a DID:WEB or the special value in the line below + if vc_issuer == "https://revocation.not.supported/": + return True, "This credential does not support revocation" revocation_index = int(vc["credentialStatus"]["revocationBitmapIndex"]) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE. - vc_issuer = vc["issuer"]["id"] # This is a DID issuer_did_document = json.loads(resolve_did(vc_issuer)) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated. issuer_revocation_list = issuer_did_document["service"][0] assert issuer_revocation_list["type"] == "RevocationBitmap2022" @@ -135,9 +152,8 @@ def verify_credential(vc): return True, "Credential passes all checks" -def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str) -> str: - async def inner(): - unsigned_vp = unsigned_vp_template.render(data) +def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str, presentation_id: str) -> str: + async def inner(unsigned_vp): signed_vp = await didkit.issue_presentation( unsigned_vp, '{"proofFormat": "ldp"}', @@ -145,27 +161,40 @@ def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_di ) return signed_vp - env = Environment( - loader=FileSystemLoader("vc_templates"), - autoescape=select_autoescape() - ) - unsigned_vp_template = env.get_template("verifiable_presentation.json") - data = { - "holder_did": holder_did, - "verifiable_credential_list": "[" + ",".join(vc_list) + "]" - } + unsigned_vp = json.dumps({ + "@context": [ + "https://www.w3.org/2018/credentials/v1" + ], + "id": presentation_id, + "type": [ + "VerifiablePresentation" + ], + "holder": holder_did, + "verifiableCredential": vc_list + }) - return asyncio.run(inner()) + return asyncio.run(inner(unsigned_vp)) -def verify_presentation(vp): +def verify_presentation(vp: str): """ Returns a (bool, str) tuple indicating whether the presentation is valid. If the boolean is true, the presentation is valid and the second argument can be ignored. - If it is false, the VC is invalid and the second argument contains a JSON object with further information. + If it is false, the VC is invalid and the second argument contains further information. """ async def inner(): - proof_options = '{"proofFormat": "ldp"}' - return await didkit.verify_presentation(vp, proof_options) + str_res = await didkit.verify_presentation(vp, '{"proofFormat": "ldp"}') + res = literal_eval(str_res) + ok = res["warnings"] == [] and res["errors"] == [] + return ok, str_res - return asyncio.run(inner()) + valid, reason = asyncio.run(inner()) + if not valid: + return valid, f"Presentation is invalid: {reason}" + vp = json.loads(vp) + for idx, credential in enumerate(vp["verifiableCredential"]): + valid, reason = verify_credential(json.dumps(credential)) + if not valid: + return valid, f"Credential at index {idx} is invalid with reason: {reason}" + # Fallthrough means all is good. + return True, "Verifiable presentation passes all checks" \ No newline at end of file diff --git a/main.py b/main.py index 5b53db8..5fc0ada 100644 --- a/main.py +++ b/main.py @@ -1,50 +1,11 @@ -import asyncio -from typing import Callable, Any import didkit import json -from jinja2 import Environment, FileSystemLoader, select_autoescape import idhub_ssikit -from ast import literal_eval -import copy +import logging +from sys import stderr -def deep_merge_dict_inplace(d1: dict, d2: dict): - """ - Implements d1 |= d2, but recursively. - Merges d1 and d2, giving preference to keys in d2. - Keys in d1 but not in d2 are left as-is. - """ - for key, val in d2.items(): - if isinstance(d1.get(key, None), dict) and isinstance(val, dict): - deep_merge_dict_inplace(d1[key], val) - continue - d1[key] = val - - -def deep_merge_dict(d1: dict, d2: dict) -> dict: - """ - Implements d1 | d2, but recursively. - Merges d1 and d2, giving preference to keys in d2. - Keys in d1 but not in d2 are left as-is. - """ - d1 = copy.deepcopy(d1) - deep_merge_dict_inplace(d1, d2) - return d1 - - -def deep_filter_dict(f: Callable[[Any], bool], d: dict): - """ - Implements builtin filter(), but recursively. - Applies f to all k,v pairs in d. If some v is a dict, recurse into v instead of applying f(v) directly. - """ - for key, val in d.items(): - if isinstance(val, dict): - yield key, dict(deep_filter_dict(f, val)) - elif f(val): - yield key, val - - -def test_all_vcs(use_webdid=False): +def test_all_use_cases(): vcs = [ 'membership-card', 'financial-vulnerability', @@ -52,24 +13,71 @@ def test_all_vcs(use_webdid=False): 'federation-membership', 'e-operator-claim' ] + # Test basic VC issuance: did:key, no revocation checks (they are not supported with did:key) for vc in vcs: - print(f"trying {vc}... ", end="") + print(f"trying {vc} in did:key mode... ", end="", file=stderr) try: - signed_cred = issue_vc_test_newstyle(vc, use_webdid) + signed_cred = issue_vc_test_newstyle(vc, use_web=False) ok, err = idhub_ssikit.verify_credential(signed_cred) if ok: - print("OK") + print("OK", file=stderr) else: - print("FAILED!", err) - open(f'/tmp/{vc}', mode='w').write(signed_cred) + print("FAILED!", err, file=stderr) except Exception as e: - print("FAILED! With exception:") - print(e) + logging.exception("FAILED! With exception:") + # Test VC issuance using did:web DIDs for the issuer, unrevoked, and check revocation + print("", file=stderr) + for vc in vcs: + print(f"trying {vc} in did:web mode, unrevoked... ", end="", file=stderr) + try: + signed_cred = issue_vc_test_newstyle(vc, use_web=True, did_revokes_vc=False) + ok, err = idhub_ssikit.verify_credential(signed_cred) + if ok: + print("OK", file=stderr) + else: + print("FAILED!", err, file=stderr) + except Exception as e: + logging.exception("FAILED! With exception:") + # Test VC issuance using did:web DIDs for the issuer, *revoked*, and check revocation + print("", file=stderr) + for vc in vcs: + print(f"trying {vc} in did:web mode, *REVOKED*... ", end="", file=stderr) + try: + signed_cred = issue_vc_test_newstyle(vc, use_web=True, did_revokes_vc=True) + ok, err = idhub_ssikit.verify_credential(signed_cred) + if not ok: + print("OK", file=stderr) + else: + print("FAILED! Credential ", err, file=stderr) + except Exception as e: + logging.exception("FAILED! With exception:") + # Test VC resistance to tampering by modifying a credential after signature + print("", file=stderr) + for vc in vcs: + print(f"tampering {vc} after issuance, check that it fails to verify... ", end="", file=stderr) + try: + issue_vc_test_and_fail_verification(vc) # All went well if this doesn't raise an exception + print("OK", file=stderr) + except Exception as e: + logging.exception("FAILED!") + # Test VP issuance and signature + print("", file=stderr) + print(f"doing end-to-end VP test, expected success... ", end="", file=stderr) + ok, reason = issue_and_sign_vp_test(revoked_credential=False) + assert ok is True + print("OK", file=stderr) + print(f"doing end-to-end VP test, expected failure... ", end="", file=stderr) + ok, reason = issue_and_sign_vp_test(revoked_credential=True) + assert ok is False + print("OK", file=stderr) -def issue_vc_test_newstyle(vc_name, use_web=True, did_revokes_vc=False, check_revocation=False): +def issue_vc_test_newstyle(vc_name, + use_web=True, + did_revokes_vc=False, + holder_jwk=None): jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}' - jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}' + jwk_subject = holder_jwk or '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}' did_subject = didkit.key_to_did("key", jwk_subject) if use_web: if did_revokes_vc: @@ -81,39 +89,34 @@ def issue_vc_test_newstyle(vc_name, use_web=True, did_revokes_vc=False, check_re vc_template = json.load(open(f'schemas/vc_templates/{vc_name}.json')) data_base = json.load(open(f'schemas/vc_examples/base--data.json')) data_specific = json.load(open(f'schemas/vc_examples/{vc_name}--data.json')) - data = deep_merge_dict(data_base, data_specific) + data = idhub_ssikit.deep_merge_dict(data_base, data_specific) data["issuer"]["id"] = did_issuer data["credentialSubject"]["id"] = did_subject - data["credentialStatus"]["id"] = did_issuer - data["credentialStatus"]["revocationBitmapIndex"] = "420" - vc_rendered_unsigned = deep_merge_dict(vc_template, data) + if use_web: + data["credentialStatus"]["id"] = did_issuer + data["credentialStatus"]["revocationBitmapIndex"] = 42 + vc_rendered_unsigned = idhub_ssikit.deep_merge_dict(vc_template, data) signed_credential = idhub_ssikit.render_and_sign_credential( vc_rendered_unsigned, jwk_issuer, ) - - if check_revocation: - assert use_web is True - ok, reason = idhub_ssikit.verify_credential(signed_credential) - print(ok) - print(reason) return signed_credential def issue_vc_test_and_fail_verification(vc_name): - signed_credential = issue_vc_test_newstyle(vc_name) - verification_result = idhub_ssikit.verify_credential(signed_credential) - print(verification_result) - def replace(s, position, character): return s[:position] + character + s[position+1:] + + signed_credential = issue_vc_test_newstyle(vc_name) + ok, reason = idhub_ssikit.verify_credential(signed_credential) + assert ok is True, (ok, reason) signed_credential = replace(signed_credential, (len(signed_credential)//4)*3, ".") - verification_result = idhub_ssikit.verify_credential(signed_credential) - print(verification_result) + ok, reason = idhub_ssikit.verify_credential(signed_credential) + assert ok is False, (ok, reason) -def issue_and_sign_vp_test(): +def issue_and_sign_vp_test(revoked_credential=False): """ In this example execution two Verifiable Credentials associated with a single Holder are issued and then combined into a single Verifiable Presentation. @@ -123,51 +126,15 @@ def issue_and_sign_vp_test(): - Issuer B being "EXO" foundation, - Verifier (not pictured) being "Som Connexio", which wants verifiable data of the Holder from both Issuers. """ - jwk_issuer = didkit.generate_ed25519_key() - jwk_issuer2 = didkit.generate_ed25519_key() - jwk_subject = didkit.generate_ed25519_key() - - did_issuer = didkit.key_to_did("key", jwk_issuer) - did_issuer2 = didkit.key_to_did("key", jwk_issuer2) - did_subject = didkit.key_to_did("key", jwk_subject) - print(did_issuer) - print(did_issuer2) - print(did_subject) - - # TODO: WE'RE NO LONGER USING JINJA2 - env = Environment( - loader=FileSystemLoader("vc_templates"), - autoescape=select_autoescape() + holder_jwk = didkit.generate_ed25519_key() + holder_did = didkit.key_to_did("key", holder_jwk) + vc1 = issue_vc_test_newstyle('membership-card', use_web=True, holder_jwk=holder_jwk) + vc2 = issue_vc_test_newstyle('course-credential', use_web=True, did_revokes_vc=revoked_credential, holder_jwk=holder_jwk) + signed_presentation = idhub_ssikit.issue_verifiable_presentation( + [json.loads(vc1), json.loads(vc2)], + holder_jwk, + holder_did, + "https://idhub.pangea.org/presentations/42" ) - unsigned_vc_template = env.get_template("member.json") - data = { - "vc_id": "http://example.org/credentials/3731", - "issuer_did": did_issuer, - "subject_did": did_subject, - "issuance_date": "2020-08-19T21:41:50Z", - "subject_is_member_of": "Pangea" - } - signed_credential = idhub_ssikit.render_and_sign_credential( - unsigned_vc_template, - jwk_issuer, - data - ) - data2 = data - data2["issuer_did"] = did_issuer2 - signed_credential2 = idhub_ssikit.render_and_sign_credential( - unsigned_vc_template, - jwk_issuer2, - data2 - ) - signed_presentation = idhub_ssikit.issue_verifiable_presentation([signed_credential, signed_credential2], jwk_subject, did_subject) - print("##############--- SIGNED PRESENTATION ---##############") - print(signed_presentation) - print("##############--- ------------------- ---##############") - res = idhub_ssikit.verify_presentation(signed_presentation) - print(res) - - -def scratch(): - jwk_issuer = didkit.generate_ed25519_key() - did_issuer = didkit.key_to_did("key", jwk_issuer) \ No newline at end of file + return idhub_ssikit.verify_presentation(signed_presentation) diff --git a/schemas b/schemas index f1e6a33..90ce148 160000 --- a/schemas +++ b/schemas @@ -1 +1 @@ -Subproject commit f1e6a33d0801a2104bf297292b953b1d138f675b +Subproject commit 90ce148f88e18dead0d9b2f94803236d6312ee09