Merge pull request 'cred_snapshots' (#4) from cred_snapshots into release

Reviewed-on: #4
This commit is contained in:
cayop 2025-01-07 16:27:17 +00:00
commit 60890ae8f1
9 changed files with 258 additions and 12 deletions

File diff suppressed because one or more lines are too long

View file

@ -105,6 +105,7 @@ class Command(BaseCommand):
assert dname assert dname
assert title assert title
except Exception: except Exception:
ldata = {}
title = '' title = ''
_name = '' _name = ''

View file

@ -33,10 +33,11 @@ class UserView(LoginRequiredMixin):
] ]
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
err_txt = "User domain is {} which does not match server domain {}".format( if not settings.DEVELOPMENT:
request.get_host(), settings.DOMAIN err_txt = "User domain is {} which does not match server domain {}".format(
) request.get_host(), settings.DOMAIN
assert request.get_host() == settings.DOMAIN, err_txt )
assert request.get_host() == settings.DOMAIN, err_txt
self.admin_validated = cache.get("KEY_DIDS") self.admin_validated = cache.get("KEY_DIDS")
response = super().get(request, *args, **kwargs) response = super().get(request, *args, **kwargs)
@ -55,10 +56,11 @@ class UserView(LoginRequiredMixin):
return url or response return url or response
def post(self, request, *args, **kwargs): def post(self, request, *args, **kwargs):
err_txt = "User domain is {} which does not match server domain {}".format( if not settings.DEVELOPMENT:
request.get_host(), settings.DOMAIN err_txt = "User domain is {} which does not match server domain {}".format(
) request.get_host(), settings.DOMAIN
assert request.get_host() == settings.DOMAIN, err_txt )
assert request.get_host() == settings.DOMAIN, err_txt
self.admin_validated = cache.get("KEY_DIDS") self.admin_validated = cache.get("KEY_DIDS")
response = super().post(request, *args, **kwargs) response = super().post(request, *args, **kwargs)
url = self.check_gdpr() url = self.check_gdpr()

View file

@ -680,7 +680,7 @@ class VerificableCredential(models.Model):
credential_subject = ujson.loads(data).get("credentialSubject", {}) credential_subject = ujson.loads(data).get("credentialSubject", {})
return credential_subject.items() return credential_subject.items()
def issue(self, did, domain): def issue(self, did, domain, save=True):
if self.status == self.Status.ISSUED: if self.status == self.Status.ISSUED:
return return
@ -700,6 +700,9 @@ class VerificableCredential(models.Model):
if not valid: if not valid:
return return
if not save:
return vc_str
self.data = self.user.encrypt_data(vc_str) self.data = self.user.encrypt_data(vc_str)
self.status = self.Status.ISSUED self.status = self.Status.ISSUED

View file

@ -0,0 +1,61 @@
{
"@context": [
"https://www.w3.org/2018/credentials/v1"
],
"type": ["VerifiableCredential", "DeviceSnapshot"],
"issuer": "{{ issuer_did }}",
"issuanceDate": "{{ issuance_date }}",
"credentialSubject": {
"operatorId": "{{ operator_id }}",
"uuid": "{{ uuid }}",
"type": "hardwareList",
"software": "workbench-script",
"deviceId": [
{
"name": "Manufacturer",
"value": "{{ manufacturer }}"
},
{
"name": "Model",
"value": "{{ model }}"
},
{
"name": "Serial",
"value": "{{ serial_number }}"
},
{
"name": "SKU",
"value": "{{ sku }}"
},
{
"name": "EthernetMacAddress",
"value": "{{ mac }}"
}
],
"timestamp": "{{ issuance_date }}"
},
"evidence": [
{
"type": "HardwareList",
"operation": "dmidecode",
"output": "{{ dmidecode }}",
"timestamp": "{{ issuance_date }}"
},
{
"type": "HardwareList",
"operation": "smartctl",
"output": {{ smartctl|default:'""'|safe }},
"timestamp": "{{ issuance_date }}"
},
{
"type": "HardwareList",
"operation": "inxi",
"output": {{ inxi|default:'""'|safe }},
"timestamp": "{{ issuance_date }}"
}
],
"credentialSchema": {
"id": "https://idhub.pangea.org/vc_schemas/device-snapshot-v1.json",
"type": "FullJsonSchemaValidator2021"
}
}

View file

@ -0,0 +1,122 @@
{
"$id": "https://idhub.pangea.org/vc_schemas/device-snapshot-v1.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "DeviceSnapshotV1",
"description": "Snapshot create by workbench-script, software for discover hardware in one device.",
"name": [
{
"value": "Snapshot",
"lang": "en"
}
],
"type": "object",
"allOf": [
{
"$ref": "https://www.w3.org/2018/credentials/v1"
},
{
"properties": {
"credentialSubject": {
"description": "Define the properties of a digital device snapshot",
"type": "object",
"properties": {
"operatorId": {
"description": "Indentifier related to the product operator, defined a hash of an Id token (10 chars enough)",
"type": "string",
"minLength": 10
},
"uuid": {
"description": "Unique identifier of the snapshot.",
"type": "string",
"minLength": 36
},
"type": {
"description": "Defines a snapshot type, e.g., hardwareList, dataDeletion (need to adjust the enum values).",
"type": "string",
"enum": [
"hardwareList", "dataDeletion"
],
"minLength": 1
},
"software": {
"description": "Name of the snapshot software used.",
"type": "string",
"enum": [
"workbench-script"
],
"minLength": 1
},
"deviceId": {
"description": "List of identification properties for the device, each with a name and value.",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"description": "The type of device identifier information, e.g., ManufacturerSerial, EthernetMacAddress.",
"type": "string"
},
"value": {
"description": "The value of the device identifier information.",
"type": "string"
}
},
"required": ["name", "value"]
}
},
"timestamp": {
"description": "Date and time of this snapshot.",
"type": "string",
"format": "date-time"
}
},
"required": [
"uuid",
"type",
"timestamp"
]
},
"evidence": {
"description": "Contains supporting evidence about the process which resulted in the issuance of this credential as a result of system operations.",
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"description": "Type of evidence, linked to credentialSubject.type.",
"type": "string",
"enum": [
"HardwareList",
"DataDeletion"
]
},
"operation": {
"description": "Specifies the command executed for evidence generation.",
"type": "string",
"enum": [
"inxi",
"dmidecode",
"smartctl"
]
},
"output": {
"description": "Output from the executed command.",
"type": "string"
},
"timestamp": {
"description": "Timestamp of the evidence generation if needed.",
"type": "string",
"format": "date-time"
}
},
"required": [
"type",
"operation",
"output"
]
}
}
}
}
]
}

View file

@ -31,6 +31,7 @@ SECRET_KEY = config('SECRET_KEY')
# SECURITY WARNING: don't run with debug turned on in production! # SECURITY WARNING: don't run with debug turned on in production!
DEBUG = config('DEBUG', default=False, cast=bool) DEBUG = config('DEBUG', default=False, cast=bool)
DEVELOPMENT = config('DEVELOPMENT', default=False, cast=bool)
DOMAIN = config("DOMAIN") DOMAIN = config("DOMAIN")
assert DOMAIN not in [None, ''], "DOMAIN var is MANDATORY" assert DOMAIN not in [None, ''], "DOMAIN var is MANDATORY"

View file

@ -7,6 +7,7 @@ app_name = 'webhook'
urlpatterns = [ urlpatterns = [
path('verify/', views.webhook_verify, name='verify'), path('verify/', views.webhook_verify, name='verify'),
path('sign/', views.webhook_issue, name='sign'),
path('tokens/', views.WebHookTokenView.as_view(), name='tokens'), path('tokens/', views.WebHookTokenView.as_view(), name='tokens'),
path('tokens/new', views.TokenNewView.as_view(), name='new_token'), path('tokens/new', views.TokenNewView.as_view(), name='new_token'),
path('tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'), path('tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'),

View file

@ -11,6 +11,8 @@ from pyvckit.verify import verify_vp, verify_vc
from uuid import uuid4 from uuid import uuid4
from idhub.mixins import AdminView from idhub.mixins import AdminView
from idhub_auth.models import User
from idhub.models import DID, Schemas, VerificableCredential
from webhook.models import Token from webhook.models import Token
from webhook.tables import TokensTable from webhook.tables import TokensTable
@ -22,7 +24,7 @@ def webhook_verify(request):
if not auth_header or not auth_header.startswith('Bearer '): if not auth_header or not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Invalid or missing token'}, status=401) return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1] token = auth_header.split(' ')[1].strip("'").strip('"')
tk = Token.objects.filter(token=token).first() tk = Token.objects.filter(token=token).first()
if not tk: if not tk:
return JsonResponse({'error': 'Invalid or missing token'}, status=401) return JsonResponse({'error': 'Invalid or missing token'}, status=401)
@ -51,6 +53,60 @@ def webhook_verify(request):
return JsonResponse({'error': 'Invalid request method'}, status=400) return JsonResponse({'error': 'Invalid request method'}, status=400)
@csrf_exempt
def webhook_issue(request):
if request.method == 'POST':
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1].strip("'").strip('"')
tk = Token.objects.filter(token=token).first()
if not tk:
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
typ = data.get("type")
vc = data.get("data")
save = data.get("save", True)
try:
vc = json.dumps(vc)
except Exception:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
if not typ or not vc:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
did = DID.objects.filter(user__isnull=True).first()
if not did:
return JsonResponse({'error': 'Invalid DID'}, status=400)
schema = Schemas.objects.filter(type=typ).first()
if not schema:
return JsonResponse({'error': 'Invalid credential'}, status=400)
user = User.objects.filter(is_admin=True).first()
cred = VerificableCredential(
csv_data=vc,
issuer_did=did,
schema=schema,
user=user
)
cred.set_type()
vc_signed = cred.issue(did, domain=request.get_host(), save=save)
return JsonResponse({'status': 'success', "data": vc_signed}, status=200)
return JsonResponse({'status': 'fail'}, status=200)
return JsonResponse({'error': 'Invalid request method'}, status=400)
class WebHookTokenView(AdminView, SingleTableView): class WebHookTokenView(AdminView, SingleTableView):
template_name = "token.html" template_name = "token.html"
title = _("Credential management") title = _("Credential management")
@ -93,4 +149,3 @@ class TokenNewView(AdminView, View):
Token.objects.create(token=uuid4()) Token.objects.create(token=uuid4())
return redirect('webhook:tokens') return redirect('webhook:tokens')