From c63d02f6951559f4e978dc6f66a2b76a91266d9d Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Fri, 24 May 2024 12:38:48 +0200 Subject: [PATCH] init status --- .gitignore | 3 + did_generate.py | 80 ++++++++++++++++ requirements.txt | 28 ++++++ sign_vc.py | 174 ++++++++++++++++++++++++++++++++++ sign_vp.py | 192 ++++++++++++++++++++++++++++++++++++++ tests/test_certificate.py | 137 +++++++++++++++++++++++++++ verify_vc.py | 129 +++++++++++++++++++++++++ verify_vp.py | 141 ++++++++++++++++++++++++++++ 8 files changed, 884 insertions(+) create mode 100644 .gitignore create mode 100644 did_generate.py create mode 100644 requirements.txt create mode 100644 sign_vc.py create mode 100644 sign_vp.py create mode 100644 tests/test_certificate.py create mode 100644 verify_vc.py create mode 100644 verify_vp.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e54919b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +env/ + diff --git a/did_generate.py b/did_generate.py new file mode 100644 index 0000000..61987aa --- /dev/null +++ b/did_generate.py @@ -0,0 +1,80 @@ +import multicodec +import multiformats +import nacl.signing +import nacl.encoding + +from jwcrypto import jwk +from nacl.signing import SigningKey +from nacl.encoding import RawEncoder +from datetime import datetime, timezone + + +def now(): + timestamp = datetime.now(timezone.utc).replace(microsecond=0) + formatted_timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") + return formatted_timestamp + + +def key_to_did(public_key_bytes, type_did): + """did-key-format := + did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))""" + + #public_key_bytes = public_key.encode() + mc = multicodec.add_prefix('ed25519-pub', public_key_bytes) + + # Multibase encode the hashed bytes + did = multiformats.multibase.encode(mc, 'base58btc') + + if type_did == "web": + return f"did:web:{did}" + + return f"did:key:{did}" + + +def key_save(key): + # Save the private JWK to a file + private_jwk = key.export() + with open('keypairs.jwk', 'w') as f: + f.write(private_jwk) + + +def key_read(): + # Save the private JWK to a file + with open('keypairs.jwk', 'r') as f: + private_jwk = f.read() + + return jwk.JWK.from_json(private_jwk) + + +def get_signing_key(jwk_pr): + private_key_material_str = jwk_pr['d'] + missing_padding = len(private_key_material_str) % 4 + if missing_padding: + private_key_material_str += '=' * (4 - missing_padding) + + private_key_material = nacl.encoding.URLSafeBase64Encoder.decode(private_key_material_str) + signing_key = SigningKey(private_key_material, encoder=RawEncoder) + return signing_key + + +def generate_did(jwk_pr, type_did=None): + signing_key = get_signing_key(jwk_pr) + verify_key = signing_key.verify_key + public_key_bytes = verify_key.encode() + + # Generate the DID + did = key_to_did(public_key_bytes, type_did) + return did + +def generate_keys(): + # Generate an Ed25519 key pair + key = jwk.JWK.generate(kty='OKP', crv='Ed25519') + key['kid'] = 'Generated' + jwk_pr = key.export_private(True) + return jwk_pr + + +if __name__ == "__main__": + + key = generate_keys() + print(generate_did(key)) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..646a616 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,28 @@ +jsonref +PyLD +Requests +jsonschema[format] +jsonref +asn1crypto +certifi +cffi +cryptography +fonttools +idna +jsonwebtoken +jwcrypto +oscrypto +pycparser +pyedid +pyHanko[opentype] +pyhanko-certvalidator +pyOpenSSL +pypng +PyYAML +qrcode +reportlab +Pillow +multiformats +PyNaCl +py-multicodec +pytest diff --git a/sign_vc.py b/sign_vc.py new file mode 100644 index 0000000..719fe37 --- /dev/null +++ b/sign_vc.py @@ -0,0 +1,174 @@ +import json +import hashlib +import multicodec +import multiformats +import nacl.signing +import nacl.encoding + +from pyld import jsonld +from jwcrypto import jwk +from nacl.public import PublicKey +from nacl.signing import SigningKey +from collections import OrderedDict +from nacl.encoding import RawEncoder +from datetime import datetime, timezone + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +# For signature +from pyld.jsonld import JsonLdProcessor + + +_debug = False + + +def now(): + timestamp = datetime.now(timezone.utc).replace(microsecond=0) + formatted_timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") + return formatted_timestamp + + +def key_to_did(public_key_bytes): + """did-key-format := + did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))""" + + #public_key_bytes = public_key.encode() + mc = multicodec.add_prefix('ed25519-pub', public_key_bytes) + + # Multibase encode the hashed bytes + did = multiformats.multibase.encode(mc, 'base58btc') + + return f"did:key:{did}" + + +def key_save(key): + # Save the private JWK to a file + private_jwk = key.export() + with open('keypairs.jwk', 'w') as f: + f.write(private_jwk) + + +def key_read(): + # Save the private JWK to a file + with open('keypairs.jwk', 'r') as f: + private_jwk = f.read() + + return jwk.JWK.from_json(private_jwk) + + +# https://github.com/spruceid/ssi/blob/main/ssi-jws/src/lib.rs#L75 +def sign_bytes(data, secret): + # https://github.com/spruceid/ssi/blob/main/ssi-jws/src/lib.rs#L125 + return secret.sign(data)[:-len(data)] + + +# https://github.com/spruceid/ssi/blob/main/ssi-jws/src/lib.rs#L248 +def sign_bytes_b64(data, key): + signature = sign_bytes(data, key) + sig_b64 = nacl.encoding.URLSafeBase64Encoder.encode(signature) + return sig_b64 + + + # https://github.com/spruceid/ssi/blob/main/ssi-jws/src/lib.rs#L581 +def detached_sign_unencoded_payload(payload, key): + header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}' + header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header) + signing_input = header_b64 + b"." + payload + sig_b64 = sign_bytes_b64(signing_input, key) + jws = header_b64 + b".." + sig_b64 + return jws + + +# https://github.com/spruceid/ssi/blob/main/ssi-ldp/src/lib.rs#L423 +def urdna2015_normalize(document, proof): + doc_dataset = jsonld.compact(document, "https://www.w3.org/2018/credentials/v1") + sigopts_dataset = jsonld.compact(proof, "https://w3id.org/security/v2") + doc_normalized = jsonld.normalize( + doc_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + sigopts_normalized = jsonld.normalize( + sigopts_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + return doc_normalized, sigopts_normalized + + +# https://github.com/spruceid/ssi/blob/main/ssi-ldp/src/lib.rs#L456 +def sha256_normalized(doc_normalized, sigopts_normalized): + doc_digest = hashlib.sha256(doc_normalized.encode('utf-8')).digest() + sigopts_digest = hashlib.sha256(sigopts_normalized.encode('utf-8')).digest() + message = sigopts_digest + doc_digest + return message + + +# https://github.com/spruceid/ssi/blob/main/ssi-ldp/src/lib.rs#L413 +def to_jws_payload(document, proof): + doc_normalized, sigopts_normalized = urdna2015_normalize(document, proof) + return sha256_normalized(doc_normalized, sigopts_normalized) + + +# https://github.com/spruceid/ssi/blob/main/ssi-ldp/src/lib.rs#L498 +def sign_proof(document, proof, key): + message = to_jws_payload(document, proof) + jws = detached_sign_unencoded_payload(message, key) + proof["jws"] = jws.decode('utf-8')[:-2] + return proof + +# source: https://github.com/mmlab-aueb/PyEd25519Signature2018/blob/master/signer.py + +def sign(document, key, issuer_did): + _did = issuer_did + "#" + issuer_did.split("did:key:")[1] + proof = { + '@context':'https://w3id.org/security/v2', + 'type': 'Ed25519Signature2018', + 'proofPurpose': 'assertionMethod', + 'verificationMethod': _did, + 'created': now() + } + sign_proof(document, proof, key) + del proof['@context'] + document['proof'] = proof + return document + + +if __name__ == "__main__": + # Generate an Ed25519 key pair + key = jwk.JWK.generate(kty='OKP', crv='Ed25519') + key['kid'] = 'Generated' + # key = key_read() + + jwk_pr = key.export_private(True) + private_key_material_str = jwk_pr['d'] + missing_padding = len(private_key_material_str) % 4 + if missing_padding: + private_key_material_str += '=' * (4 - missing_padding) + + private_key_material = nacl.encoding.URLSafeBase64Encoder.decode(private_key_material_str) + signing_key = SigningKey(private_key_material, encoder=RawEncoder) + verify_key = signing_key.verify_key + public_key_bytes = verify_key.encode() + + # Generate the DID + did = key_to_did(public_key_bytes) + # print(did) + + credential = { + "@context": "https://www.w3.org/2018/credentials/v1", + "id": "http://example.org/credentials/3731", + "type": ["VerifiableCredential"], + "credentialSubject": { + "id": "did:key:z6MkgGXSJoacuuNdwU1rGfPpFH72GACnzykKTxzCCTZs6Z2M", + }, + "issuer": did, + "issuanceDate": now() + } + + # vc = generate_vc(credential, signing_key, did) + vc = sign(credential, signing_key, did) + + print(json.dumps(vc, separators=(',', ':'))) + diff --git a/sign_vp.py b/sign_vp.py new file mode 100644 index 0000000..aa8dc56 --- /dev/null +++ b/sign_vp.py @@ -0,0 +1,192 @@ +import sys +import json +import hashlib +import multicodec +import multiformats +import nacl.signing +import nacl.encoding + +from pyld import jsonld +from jwcrypto import jwk +from nacl.public import PublicKey +from nacl.signing import SigningKey +from collections import OrderedDict +from nacl.encoding import RawEncoder +from datetime import datetime, timezone + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +# For signature +from pyld.jsonld import JsonLdProcessor + + +_debug = False + + +def now(): + timestamp = datetime.now(timezone.utc).replace(microsecond=0) + formatted_timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") + return formatted_timestamp + + +def key_to_did(key): + """did-key-format := + did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))""" + + verify_key = key.verify_key + public_key_bytes = verify_key.encode() + mc = multicodec.add_prefix('ed25519-pub', public_key_bytes) + + # Multibase encode the hashed bytes + did = multiformats.multibase.encode(mc, 'base58btc') + + return f"did:key:{did}" + + +def key_save(key): + # Save the private JWK to a file + private_jwk = key.export() + with open('keypairs.jwk', 'w') as f: + f.write(private_jwk) + + +def key_read(): + # Save the private JWK to a file + with open('keypairs.jwk', 'r') as f: + private_jwk = f.read() + + return jwk.JWK.from_json(private_jwk) + + +def sign_bytes(data, secret): + return secret.sign(data)[:-len(data)] + + +def sign_bytes_b64(data, key): + signature = sign_bytes(data, key) + sig_b64 = nacl.encoding.URLSafeBase64Encoder.encode(signature) + return sig_b64 + + +def detached_sign_unencoded_payload(payload, key): + header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}' + header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header) + signing_input = header_b64 + b"." + payload + sig_b64 = sign_bytes_b64(signing_input, key) + jws = header_b64 + b".." + sig_b64 + return jws + + +def urdna2015_normalize(document, proof): + doc_dataset = jsonld.compact(document, "https://www.w3.org/2018/credentials/v1") + sigopts_dataset = jsonld.compact(proof, "https://w3id.org/security/v2") + doc_normalized = jsonld.normalize( + doc_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + sigopts_normalized = jsonld.normalize( + sigopts_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + return doc_normalized, sigopts_normalized + + +def sha256_normalized(doc_normalized, sigopts_normalized): + doc_digest = hashlib.sha256(doc_normalized.encode('utf-8')).digest() + sigopts_digest = hashlib.sha256(sigopts_normalized.encode('utf-8')).digest() + message = sigopts_digest + doc_digest + return message + + +def to_jws_payload(document, proof): + doc_normalized, sigopts_normalized = urdna2015_normalize(document, proof) + return sha256_normalized(doc_normalized, sigopts_normalized) + + +def sign_proof(document, proof, key): + message = to_jws_payload(document, proof) + jws = detached_sign_unencoded_payload(message, key) + proof["jws"] = jws.decode('utf-8')[:-2] + return proof + + +def get_presentation(vc): + template = { + "@context": ["https://www.w3.org/2018/credentials/v1"], + "id": "http://example.org/presentations/3731", + "type": ["VerifiablePresentation"], + "holder": "", + "verifiableCredential": [] + } + + template["verifiableCredential"].append(json.loads(vc)) + return template + + +def get_keys(path_file=None): + if path_file: + key = key_read(path_file) + else: + key = jwk.JWK.generate(kty='OKP', crv='Ed25519') + key['kid'] = 'Generated' + + jwk_pr = key.export_private(True) + private_key_material_str = jwk_pr['d'] + missing_padding = len(private_key_material_str) % 4 + if missing_padding: + private_key_material_str += '=' * (4 - missing_padding) + + private_key_material = nacl.encoding.URLSafeBase64Encoder.decode(private_key_material_str) + signing_key = SigningKey(private_key_material, encoder=RawEncoder) + return signing_key + + +def sign_vp(signing_key, holder_did, vc): + presentation = get_presentation(vc) + _did = holder_did + "#" + holder_did.split("did:key:")[1] + presentation["holder"] = holder_did + proof = { + '@context':'https://w3id.org/security/v2', + 'type': 'Ed25519Signature2018', + 'proofPurpose': 'assertionMethod', + 'verificationMethod': _did, + 'created': now() + } + sign_proof(presentation, proof, signing_key) + del proof['@context'] + presentation['proof'] = proof + return presentation + + +def main(): + path_credential = None + path_keys = None + + if len(sys.argv) > 1: + path_credential = sys.argv[1] + + if not path_credential: + print("You need pass a credential.") + return + + if len(sys.argv) > 2: + path_keys = sys.argv[2] + + with open(path_credential, "r") as f: + vc = f.read() + + if not vc: + print("You need pass a credential.") + return + + signing_key = get_keys(path_keys) + holder_did = key_to_did(signing_key) + vp = sign_vp(signing_key, holder_did, vc) + print(json.dumps(vp, separators=(',', ':'))) + + +if __name__ == "__main__": + main() diff --git a/tests/test_certificate.py b/tests/test_certificate.py new file mode 100644 index 0000000..2a66f6a --- /dev/null +++ b/tests/test_certificate.py @@ -0,0 +1,137 @@ +import json +import multicodec +import multiformats +import nacl.encoding + +from datetime import datetime, timezone +from did_generate import generate_keys, generate_did, get_signing_key +from sign_vc import sign +from sign_vp import sign_vp +from verify_vc import verify_vc +from verify_vp import verify_vp + +def now(): + timestamp = datetime.now(timezone.utc).replace(microsecond=0) + formatted_timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") + return formatted_timestamp + + +def test_generated_did_key(): + key = generate_keys() + did = generate_did(key) + _did = did.split("#")[0] + pub = _did.split(":")[-1] + mc = multiformats.multibase.decode(pub) + public_key_bytes = multicodec.remove_prefix(mc) + x = nacl.encoding.URLSafeBase64Encoder.encode(public_key_bytes).decode('utf-8') + k_x = key.get('x', '') + missing_padding = len(k_x) % 4 + if missing_padding: + k_x += '=' * (4 - missing_padding) + + assert key.get('kty') == 'OKP' + assert key.get('crv') == 'Ed25519' + assert key.get('kid') == 'Generated' + assert k_x == x + assert key.get('d') is not None + + +def test_credential(): + key = generate_keys() + did = generate_did(key) + signing_key = get_signing_key(key) + + credential = { + "@context": "https://www.w3.org/2018/credentials/v1", + "id": "http://example.org/credentials/3731", + "type": ["VerifiableCredential"], + "credentialSubject": { + "id": "did:key:z6MkgGXSJoacuuNdwU1rGfPpFH72GACnzykKTxzCCTZs6Z2M", + }, + "issuer": did, + "issuanceDate": now() + } + + vc = sign(credential, signing_key, did) + header = 'eyJhbGciOiJFZERTQSIsImNyaXQiOlsiYjY0Il0sImI2NCI6ZmFsc2V9' + assert vc.get('proof', {}).get('jws') is not None + assert header in vc.get('proof', {}).get('jws') + assert did in vc.get('proof', {}).get('verificationMethod') + + +def test_presentation(): + key = generate_keys() + did = generate_did(key) + signing_key = get_signing_key(key) + + credential = { + "@context": "https://www.w3.org/2018/credentials/v1", + "id": "http://example.org/credentials/3731", + "type": ["VerifiableCredential"], + "credentialSubject": { + "id": "did:key:z6MkgGXSJoacuuNdwU1rGfPpFH72GACnzykKTxzCCTZs6Z2M", + }, + "issuer": did, + "issuanceDate": now() + } + + vc = sign(credential, signing_key, did) + vc_json = json.dumps(vc) + + holder_key = generate_keys() + holder_did = generate_did(holder_key) + holder_signing_key = get_signing_key(holder_key) + vp = sign_vp(holder_signing_key, holder_did, vc_json) + header = 'eyJhbGciOiJFZERTQSIsImNyaXQiOlsiYjY0Il0sImI2NCI6ZmFsc2V9' + assert vp.get('proof', {}).get('jws') is not None + assert header in vp.get('proof', {}).get('jws') + assert holder_did in vp.get('proof', {}).get('verificationMethod') + + +def test_verifiable_credential(): + key = generate_keys() + did = generate_did(key) + signing_key = get_signing_key(key) + + credential = { + "@context": "https://www.w3.org/2018/credentials/v1", + "id": "http://example.org/credentials/3731", + "type": ["VerifiableCredential"], + "credentialSubject": { + "id": "did:key:z6MkgGXSJoacuuNdwU1rGfPpFH72GACnzykKTxzCCTZs6Z2M", + }, + "issuer": did, + "issuanceDate": now() + } + + vc = sign(credential, signing_key, did) + verified = verify_vc(vc) + assert verified + + +def test_verifiable_presentation(): + key = generate_keys() + did = generate_did(key) + signing_key = get_signing_key(key) + + credential = { + "@context": "https://www.w3.org/2018/credentials/v1", + "id": "http://example.org/credentials/3731", + "type": ["VerifiableCredential"], + "credentialSubject": { + "id": "did:key:z6MkgGXSJoacuuNdwU1rGfPpFH72GACnzykKTxzCCTZs6Z2M", + }, + "issuer": did, + "issuanceDate": now() + } + + vc = sign(credential, signing_key, did) + vc_json = json.dumps(vc) + + holder_key = generate_keys() + holder_did = generate_did(holder_key) + holder_signing_key = get_signing_key(holder_key) + vp = sign_vp(holder_signing_key, holder_did, vc_json) + verified = verify_vp(vp) + assert verified + diff --git a/verify_vc.py b/verify_vc.py new file mode 100644 index 0000000..fed2df6 --- /dev/null +++ b/verify_vc.py @@ -0,0 +1,129 @@ +import sys +import json +import hashlib +import multicodec +import multiformats +import nacl.signing +import nacl.encoding + +from pyld import jsonld +from jwcrypto import jwk +from nacl.signing import SigningKey, VerifyKey +from collections import OrderedDict +from nacl.encoding import RawEncoder +from datetime import datetime, timezone + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + + +def key_to_did(public_key_bytes): + """did-key-format := + did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))""" + + mc = multicodec.add_prefix('ed25519-pub', public_key_bytes) + + # Multibase encode the hashed bytes + did = multiformats.multibase.encode(mc, 'base58btc') + + return f"did:key:{did}" + + +def get_signing_input(payload): + header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}' + header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header) + signing_input = header_b64 + b"." + payload + return header_b64, signing_input + + +def urdna2015_normalize(document, proof): + doc_dataset = jsonld.compact(document, "https://www.w3.org/2018/credentials/v1") + sigopts_dataset = jsonld.compact(proof, "https://w3id.org/security/v2") + doc_normalized = jsonld.normalize( + doc_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + sigopts_normalized = jsonld.normalize( + sigopts_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + return doc_normalized, sigopts_normalized + + +def sha256_normalized(doc_normalized, sigopts_normalized): + doc_digest = hashlib.sha256(doc_normalized.encode('utf-8')).digest() + sigopts_digest = hashlib.sha256(sigopts_normalized.encode('utf-8')).digest() + message = sigopts_digest + doc_digest + return message + + +def to_jws_payload(document, proof): + doc_normalized, sigopts_normalized = urdna2015_normalize(document, proof) + return sha256_normalized(doc_normalized, sigopts_normalized) + + +def get_message(vc): + document = vc.copy() + proof = document.pop("proof", {}) + jws = proof.pop("jws", None) + proof['@context'] = 'https://w3id.org/security/v2' + if not jws: + return None, False + + return jws+"==", to_jws_payload(document, proof) + + +def get_verify_key(vc): + did = vc["proof"]["verificationMethod"].split("#")[0] + pub = did.split(":")[-1] + mc = multiformats.multibase.decode(pub) + public_key_bytes = multicodec.remove_prefix(mc) + return VerifyKey(public_key_bytes) + + +def jws_split(jws): + header, sig_b64 = jws.split("..") + signature = nacl.encoding.URLSafeBase64Encoder.decode(sig_b64.encode()) + return header.encode(), signature + + +def verify_vc(vc): + header = {"alg": "EdDSA", "crit": ["b64"], "b64": False} + jws, message = get_message(vc) + if not message: + return False + + header_b64, signature = get_signing_input(message) + header_jws, signature_jws = jws_split(jws) + + if header_jws != header_b64: + return False + + header_jws_json = json.loads( + nacl.encoding.URLSafeBase64Encoder.decode(header_jws) + ) + for k, v in header.items(): + if header_jws_json.get(k) != v: + return False + + verify_key = get_verify_key(vc) + data_verified = verify_key.verify(signature_jws+signature) + return data_verified == signature + + +def get_credential(path_credential): + with open(path_credential, "r") as f: + vc = f.read() + return json.loads(vc) + + +if __name__ == "__main__": + if len(sys.argv) > 1: + path_credential = sys.argv[1] + credential = get_credential(path_credential) + print(verify_vc(credential)) + else: + print("You need pass a credential.") diff --git a/verify_vp.py b/verify_vp.py new file mode 100644 index 0000000..47c8870 --- /dev/null +++ b/verify_vp.py @@ -0,0 +1,141 @@ +import sys +import json +import hashlib +import multicodec +import multiformats +import nacl.signing +import nacl.encoding + +from pyld import jsonld +from jwcrypto import jwk +from nacl.signing import SigningKey, VerifyKey +from collections import OrderedDict +from nacl.encoding import RawEncoder +from datetime import datetime, timezone + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + + +def key_to_did(public_key_bytes): + """did-key-format := + did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))""" + + mc = multicodec.add_prefix('ed25519-pub', public_key_bytes) + + # Multibase encode the hashed bytes + did = multiformats.multibase.encode(mc, 'base58btc') + + return f"did:key:{did}" + + +def get_signing_input(payload): + header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}' + header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header) + signing_input = header_b64 + b"." + payload + return header_b64, signing_input + + +def urdna2015_normalize(document, proof): + doc_dataset = jsonld.compact(document, "https://www.w3.org/2018/credentials/v1") + sigopts_dataset = jsonld.compact(proof, "https://w3id.org/security/v2") + doc_normalized = jsonld.normalize( + doc_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + sigopts_normalized = jsonld.normalize( + sigopts_dataset, + {'algorithm': 'URDNA2015', 'format': 'application/n-quads'} + ) + return doc_normalized, sigopts_normalized + + +def sha256_normalized(doc_normalized, sigopts_normalized): + doc_digest = hashlib.sha256(doc_normalized.encode('utf-8')).digest() + sigopts_digest = hashlib.sha256(sigopts_normalized.encode('utf-8')).digest() + message = sigopts_digest + doc_digest + return message + + +def to_jws_payload(document, proof): + doc_normalized, sigopts_normalized = urdna2015_normalize(document, proof) + return sha256_normalized(doc_normalized, sigopts_normalized) + + +def get_message(vc): + document = vc.copy() + proof = document.pop("proof", {}) + jws = proof.pop("jws", None) + proof['@context'] = 'https://w3id.org/security/v2' + if not jws: + return None, False + + return jws+"==", to_jws_payload(document, proof) + + +def get_verify_key(vc): + did = vc["proof"]["verificationMethod"].split("#")[0] + pub = did.split(":")[-1] + mc = multiformats.multibase.decode(pub) + public_key_bytes = multicodec.remove_prefix(mc) + return VerifyKey(public_key_bytes) + + +def jws_split(jws): + header, sig_b64 = jws.split("..") + signature = nacl.encoding.URLSafeBase64Encoder.decode(sig_b64.encode()) + return header.encode(), signature + + +def verify_vc(vc): + header = {"alg": "EdDSA", "crit": ["b64"], "b64": False} + jws, message = get_message(vc) + if not message: + return False + + header_b64, signature = get_signing_input(message) + header_jws, signature_jws = jws_split(jws) + + if header_jws != header_b64: + return False + + header_jws_json = json.loads( + nacl.encoding.URLSafeBase64Encoder.decode(header_jws) + ) + for k, v in header.items(): + if header_jws_json.get(k) != v: + return False + + verify_key = get_verify_key(vc) + data_verified = verify_key.verify(signature_jws+signature) + return data_verified == signature + + +def verify_vp(presentation): + result_presentation = verify_vc(presentation) + if not result_presentation: + return False + + for vc in presentation.get("verifiableCredential", []): + if not verify_vc(vc): + return False + + return True + + +def get_presentation(path_presentation): + with open(path_presentation, "r") as f: + vc = f.read() + return json.loads(vc) + + +if __name__ == "__main__": + if len(sys.argv) > 1: + path_presentation = sys.argv[1] + presentation = get_presentation(path_presentation) + print(verify_vp(presentation)) + else: + print("You need pass a presentation.")