refactor verification and signed presentations

This commit is contained in:
Cayo Puigdefabregas 2024-05-24 18:18:16 +02:00
parent a9cca75323
commit 8ebd824bed
4 changed files with 113 additions and 293 deletions

View file

@ -62,3 +62,5 @@ def sign_proof(document, proof, key):
jws = detached_sign_unencoded_payload(message, key)
proof["jws"] = jws.decode('utf-8')[:-2]
return proof

View file

@ -1,160 +1,22 @@
import sys
import json
import hashlib
import multicodec
import multiformats
import nacl.signing
import nacl.encoding
import argparse
from pyld import jsonld
from jwcrypto import jwk
from nacl.public import PublicKey
from nacl.signing import SigningKey
from collections import OrderedDict
from nacl.encoding import RawEncoder
from datetime import datetime, timezone
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
# For signature
from pyld.jsonld import JsonLdProcessor
_debug = False
def now():
timestamp = datetime.now(timezone.utc).replace(microsecond=0)
formatted_timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_timestamp
def key_to_did(key):
"""did-key-format :=
did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))"""
verify_key = key.verify_key
public_key_bytes = verify_key.encode()
mc = multicodec.add_prefix('ed25519-pub', public_key_bytes)
# Multibase encode the hashed bytes
did = multiformats.multibase.encode(mc, 'base58btc')
return f"did:key:{did}"
def key_save(key):
# Save the private JWK to a file
private_jwk = key.export()
with open('keypairs.jwk', 'w') as f:
f.write(private_jwk)
def key_read():
# Save the private JWK to a file
with open('keypairs.jwk', 'r') as f:
private_jwk = f.read()
return jwk.JWK.from_json(private_jwk)
def sign_bytes(data, secret):
return secret.sign(data)[:-len(data)]
def sign_bytes_b64(data, key):
signature = sign_bytes(data, key)
sig_b64 = nacl.encoding.URLSafeBase64Encoder.encode(signature)
return sig_b64
def detached_sign_unencoded_payload(payload, key):
header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}'
header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header)
signing_input = header_b64 + b"." + payload
sig_b64 = sign_bytes_b64(signing_input, key)
jws = header_b64 + b".." + sig_b64
return jws
def urdna2015_normalize(document, proof):
doc_dataset = jsonld.compact(document, "https://www.w3.org/2018/credentials/v1")
sigopts_dataset = jsonld.compact(proof, "https://w3id.org/security/v2")
doc_normalized = jsonld.normalize(
doc_dataset,
{'algorithm': 'URDNA2015', 'format': 'application/n-quads'}
)
sigopts_normalized = jsonld.normalize(
sigopts_dataset,
{'algorithm': 'URDNA2015', 'format': 'application/n-quads'}
)
return doc_normalized, sigopts_normalized
def sha256_normalized(doc_normalized, sigopts_normalized):
doc_digest = hashlib.sha256(doc_normalized.encode('utf-8')).digest()
sigopts_digest = hashlib.sha256(sigopts_normalized.encode('utf-8')).digest()
message = sigopts_digest + doc_digest
return message
def to_jws_payload(document, proof):
doc_normalized, sigopts_normalized = urdna2015_normalize(document, proof)
return sha256_normalized(doc_normalized, sigopts_normalized)
def sign_proof(document, proof, key):
message = to_jws_payload(document, proof)
jws = detached_sign_unencoded_payload(message, key)
proof["jws"] = jws.decode('utf-8')[:-2]
return proof
def get_presentation(vc):
template = {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"id": "http://example.org/presentations/3731",
"type": ["VerifiablePresentation"],
"holder": "",
"verifiableCredential": []
}
template["verifiableCredential"].append(json.loads(vc))
return template
def get_keys(path_file=None):
if path_file:
key = key_read(path_file)
else:
key = jwk.JWK.generate(kty='OKP', crv='Ed25519')
key['kid'] = 'Generated'
jwk_pr = key.export_private(True)
private_key_material_str = jwk_pr['d']
missing_padding = len(private_key_material_str) % 4
if missing_padding:
private_key_material_str += '=' * (4 - missing_padding)
private_key_material = nacl.encoding.URLSafeBase64Encoder.decode(private_key_material_str)
signing_key = SigningKey(private_key_material, encoder=RawEncoder)
return signing_key
from utils import now
from templates import presentation_tmpl, proof_tmpl
from did import key_read, generate_did, get_signing_key
from sign import sign_proof
def sign_vp(signing_key, holder_did, vc):
presentation = get_presentation(vc)
_did = holder_did + "#" + holder_did.split("did:key:")[1]
presentation = presentation_tmpl.copy()
presentation["verifiableCredential"].append(json.loads(vc))
presentation["holder"] = holder_did
proof = {
'@context':'https://w3id.org/security/v2',
'type': 'Ed25519Signature2018',
'proofPurpose': 'assertionMethod',
'verificationMethod': _did,
'created': now()
}
_did = holder_did + "#" + holder_did.split("did:key:")[1]
proof = proof_tmpl.copy()
proof['verificationMethod'] = _did
proof['created'] = now()
sign_proof(presentation, proof, signing_key)
del proof['@context']
presentation['proof'] = proof
@ -162,31 +24,27 @@ def sign_vp(signing_key, holder_did, vc):
def main():
path_credential = None
path_keys = None
parser=argparse.ArgumentParser(description='Generates a new credential')
parser.add_argument("-k", "--key-path", required=True)
parser.add_argument("-c", "--credential-path", required=True)
args=parser.parse_args()
if len(sys.argv) > 1:
path_credential = sys.argv[1]
if args.key_path and args.credential_path:
with open(args.credential_path, "r") as f:
vc = f.read()
if not vc:
print("You need pass a credential.")
return
key = key_read(args.key_path)
did = generate_did(key)
signing_key = get_signing_key(key)
vp = sign_vp(signing_key, did, vc)
print(json.dumps(vp, separators=(',', ':')))
if not path_credential:
print("You need pass a credential.")
return
if len(sys.argv) > 2:
path_keys = sys.argv[2]
with open(path_credential, "r") as f:
vc = f.read()
if not vc:
print("You need pass a credential.")
return
signing_key = get_keys(path_keys)
holder_did = key_to_did(signing_key)
vp = sign_vp(signing_key, holder_did, vc)
print(json.dumps(vp, separators=(',', ':')))
if __name__ == "__main__":
main()

67
verify.py Normal file
View file

@ -0,0 +1,67 @@
import json
import nacl.encoding
import nacl.signing
import multicodec
import multiformats
from nacl.signing import VerifyKey
from sign import to_jws_payload
def get_signing_input(payload):
header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}'
header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header)
signing_input = header_b64 + b"." + payload
return header_b64, signing_input
def get_message(vc):
document = vc.copy()
proof = document.pop("proof", {})
jws = proof.pop("jws", None)
proof['@context'] = 'https://w3id.org/security/v2'
if not jws:
return None, False
return jws+"==", to_jws_payload(document, proof)
def get_verify_key(vc):
did = vc["proof"]["verificationMethod"].split("#")[0]
pub = did.split(":")[-1]
mc = multiformats.multibase.decode(pub)
public_key_bytes = multicodec.remove_prefix(mc)
return VerifyKey(public_key_bytes)
def jws_split(jws):
header, sig_b64 = jws.split("..")
signature = nacl.encoding.URLSafeBase64Encoder.decode(sig_b64.encode())
return header.encode(), signature
def verify_vc(credential):
vc = json.loads(credential)
header = {"alg": "EdDSA", "crit": ["b64"], "b64": False}
jws, message = get_message(vc)
if not message:
return False
header_b64, signature = get_signing_input(message)
header_jws, signature_jws = jws_split(jws)
if header_jws != header_b64:
return False
header_jws_json = json.loads(
nacl.encoding.URLSafeBase64Encoder.decode(header_jws)
)
for k, v in header.items():
if header_jws_json.get(k) != v:
return False
verify_key = get_verify_key(vc)
data_verified = verify_key.verify(signature_jws+signature)
return data_verified == signature

View file

@ -1,129 +1,22 @@
import sys
import json
import hashlib
import multicodec
import multiformats
import nacl.signing
import nacl.encoding
from pyld import jsonld
from jwcrypto import jwk
from nacl.signing import SigningKey, VerifyKey
from collections import OrderedDict
from nacl.encoding import RawEncoder
from datetime import datetime, timezone
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
def key_to_did(public_key_bytes):
"""did-key-format :=
did:key:MULTIBASE(base58-btc, MULTICODEC(public-key-type, raw-public-key-bytes))"""
mc = multicodec.add_prefix('ed25519-pub', public_key_bytes)
# Multibase encode the hashed bytes
did = multiformats.multibase.encode(mc, 'base58btc')
return f"did:key:{did}"
def get_signing_input(payload):
header = b'{"alg":"EdDSA","crit":["b64"],"b64":false}'
header_b64 = nacl.encoding.URLSafeBase64Encoder.encode(header)
signing_input = header_b64 + b"." + payload
return header_b64, signing_input
def urdna2015_normalize(document, proof):
doc_dataset = jsonld.compact(document, "https://www.w3.org/2018/credentials/v1")
sigopts_dataset = jsonld.compact(proof, "https://w3id.org/security/v2")
doc_normalized = jsonld.normalize(
doc_dataset,
{'algorithm': 'URDNA2015', 'format': 'application/n-quads'}
)
sigopts_normalized = jsonld.normalize(
sigopts_dataset,
{'algorithm': 'URDNA2015', 'format': 'application/n-quads'}
)
return doc_normalized, sigopts_normalized
def sha256_normalized(doc_normalized, sigopts_normalized):
doc_digest = hashlib.sha256(doc_normalized.encode('utf-8')).digest()
sigopts_digest = hashlib.sha256(sigopts_normalized.encode('utf-8')).digest()
message = sigopts_digest + doc_digest
return message
def to_jws_payload(document, proof):
doc_normalized, sigopts_normalized = urdna2015_normalize(document, proof)
return sha256_normalized(doc_normalized, sigopts_normalized)
def get_message(vc):
document = vc.copy()
proof = document.pop("proof", {})
jws = proof.pop("jws", None)
proof['@context'] = 'https://w3id.org/security/v2'
if not jws:
return None, False
return jws+"==", to_jws_payload(document, proof)
def get_verify_key(vc):
did = vc["proof"]["verificationMethod"].split("#")[0]
pub = did.split(":")[-1]
mc = multiformats.multibase.decode(pub)
public_key_bytes = multicodec.remove_prefix(mc)
return VerifyKey(public_key_bytes)
def jws_split(jws):
header, sig_b64 = jws.split("..")
signature = nacl.encoding.URLSafeBase64Encoder.decode(sig_b64.encode())
return header.encode(), signature
def verify_vc(vc):
header = {"alg": "EdDSA", "crit": ["b64"], "b64": False}
jws, message = get_message(vc)
if not message:
return False
header_b64, signature = get_signing_input(message)
header_jws, signature_jws = jws_split(jws)
if header_jws != header_b64:
return False
header_jws_json = json.loads(
nacl.encoding.URLSafeBase64Encoder.decode(header_jws)
)
for k, v in header.items():
if header_jws_json.get(k) != v:
return False
verify_key = get_verify_key(vc)
data_verified = verify_key.verify(signature_jws+signature)
return data_verified == signature
import argparse
from verify import verify_vc
def get_credential(path_credential):
with open(path_credential, "r") as f:
vc = f.read()
return json.loads(vc)
return vc
def main():
parser=argparse.ArgumentParser(description='Verify a credential')
parser.add_argument("credential_path")
args=parser.parse_args()
if args.credential_path:
credential = get_credential(args.credential_path)
print(verify_vc(credential))
if __name__ == "__main__":
if len(sys.argv) > 1:
path_credential = sys.argv[1]
credential = get_credential(path_credential)
print(verify_vc(credential))
else:
print("You need pass a credential.")
main()