IdHub/oidc4vp/views.py

288 lines
9.2 KiB
Python
Raw Permalink Normal View History

2023-11-27 09:59:30 +00:00
import json
2023-11-28 16:33:24 +00:00
import base64
2024-03-07 14:31:07 +00:00
import logging
2024-03-25 15:19:40 +00:00
import requests
2023-11-24 15:36:05 +00:00
2024-03-07 14:31:07 +00:00
from django.template import loader
from django.core.mail import EmailMultiAlternatives
2023-11-29 11:06:53 +00:00
from django.conf import settings
2023-11-29 12:21:49 +00:00
from django.views.generic.edit import View, FormView
2024-03-26 17:50:42 +00:00
from django.http import HttpResponse, Http404, JsonResponse, QueryDict
2023-11-29 12:21:49 +00:00
from django.shortcuts import get_object_or_404, redirect
2023-12-04 14:47:48 +00:00
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
2023-11-29 12:21:49 +00:00
from django.utils.translation import gettext_lazy as _
from django.urls import reverse_lazy
2023-12-01 16:10:21 +00:00
from django.contrib import messages
2024-03-07 15:56:15 +00:00
from django.contrib.auth import get_user_model
2023-11-24 15:36:05 +00:00
2023-12-11 14:38:16 +00:00
from oidc4vp.models import Authorization, Organization, OAuth2VPToken
2023-11-29 12:21:49 +00:00
from idhub.mixins import UserView
2024-01-21 12:45:08 +00:00
from idhub.models import Event
2023-11-28 09:48:57 +00:00
2023-11-29 16:29:31 +00:00
from oidc4vp.forms import AuthorizeForm
2023-11-28 08:39:02 +00:00
2024-03-07 15:56:15 +00:00
User = get_user_model()
2024-03-07 14:31:07 +00:00
logger = logging.getLogger(__name__)
2023-11-29 12:21:49 +00:00
class AuthorizeView(UserView, FormView):
title = _("My wallet")
section = "MyWallet"
template_name = "credentials_presentation.html"
subtitle = _('Credential presentation')
icon = 'bi bi-patch-check-fill'
2023-11-29 16:29:31 +00:00
form_class = AuthorizeForm
2023-11-29 12:21:49 +00:00
success_url = reverse_lazy('idhub:user_demand_authorization')
2023-12-13 16:52:18 +00:00
def get(self, request, *args, **kwargs):
response = super().get(request, *args, **kwargs)
if self.request.session.get('next_url'):
return redirect(reverse_lazy('idhub:user_credentials_request'))
return response
2023-11-29 12:21:49 +00:00
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
2023-12-07 17:10:04 +00:00
try:
vps = json.loads(self.request.GET.get('presentation_definition'))
2024-02-21 11:13:08 +00:00
except Exception:
2023-12-07 17:10:04 +00:00
vps = []
kwargs['presentation_definition'] = vps
2023-12-04 08:51:08 +00:00
kwargs["org"] = self.get_org()
2023-12-07 17:10:04 +00:00
kwargs["code"] = self.request.GET.get('code')
2023-11-29 12:21:49 +00:00
return kwargs
2023-12-13 16:52:18 +00:00
def get_form(self, form_class=None):
form = super().get_form(form_class=form_class)
if form.all_credentials.exists() and not form.credentials.exists():
self.request.session['next_url'] = self.request.get_full_path()
return form
2023-11-29 12:21:49 +00:00
def form_valid(self, form):
authorization = form.save()
2023-12-04 14:47:48 +00:00
if not authorization or authorization.status_code != 200:
2023-11-29 12:21:49 +00:00
messages.error(self.request, _("Error sending credential!"))
2024-01-21 12:45:08 +00:00
return redirect(self.success_url)
2023-12-04 14:47:48 +00:00
try:
2023-12-04 16:12:12 +00:00
authorization = authorization.json()
2024-02-21 11:13:08 +00:00
except Exception:
2023-12-04 14:47:48 +00:00
messages.error(self.request, _("Error sending credential!"))
2024-01-21 12:45:08 +00:00
return redirect(self.success_url)
2023-12-04 14:47:48 +00:00
verify = authorization.get('verify')
result, msg = verify.split(",")
if 'error' in result.lower():
messages.error(self.request, msg)
if 'ok' in result.lower():
messages.success(self.request, msg)
2024-01-21 12:58:13 +00:00
cred = form.credentials.first()
verifier = form.org.name
if cred and verifier:
Event.set_EV_CREDENTIAL_PRESENTED(cred, verifier)
2023-12-04 14:47:48 +00:00
if authorization.get('redirect_uri'):
return redirect(authorization.get('redirect_uri'))
elif authorization.get('response'):
txt = authorization.get('response')
messages.success(self.request, txt)
2024-02-27 18:33:40 +00:00
Event.set_EV_USR_SEND_CREDENTIAL(txt)
2024-01-21 12:45:08 +00:00
txt2 = f"Verifier {verifier} send: " + txt
Event.set_EV_USR_SEND_VP(txt2, self.request.user)
url = reverse_lazy('idhub:user_dashboard')
return redirect(url)
2023-12-04 14:47:48 +00:00
2024-01-21 12:45:08 +00:00
return redirect(self.success_url)
2023-11-29 11:27:20 +00:00
2023-12-04 08:51:08 +00:00
def get_org(self):
client_id = self.request.GET.get("client_id")
if not client_id:
raise Http404("Organization not found!")
org = get_object_or_404(
Organization,
client_id=client_id,
)
return org
2023-11-29 11:27:20 +00:00
2023-12-04 14:47:48 +00:00
@method_decorator(csrf_exempt, name='dispatch')
2023-11-28 09:48:57 +00:00
class VerifyView(View):
2024-03-07 16:52:39 +00:00
subject_template_name = 'email/verify_subject.txt'
email_template_name = 'email/verify_email.txt'
html_email_template_name = 'email/verify_email.html'
2024-03-07 14:31:07 +00:00
2023-11-28 09:48:57 +00:00
def get(self, request, *args, **kwargs):
2023-11-28 16:33:24 +00:00
org = self.validate(request)
2023-11-29 11:06:53 +00:00
presentation_definition = json.dumps(settings.SUPPORTED_CREDENTIALS)
2023-11-28 11:49:28 +00:00
authorization = Authorization(
organization=org,
2023-11-29 10:18:12 +00:00
presentation_definition=presentation_definition
2023-11-28 11:49:28 +00:00
)
2023-12-07 17:10:04 +00:00
authorization.save()
2023-11-29 10:18:12 +00:00
res = json.dumps({"redirect_uri": authorization.authorize()})
2023-11-28 09:48:57 +00:00
return HttpResponse(res)
2023-11-28 08:39:02 +00:00
2023-12-07 17:10:04 +00:00
def post(self, request, *args, **kwargs):
code = self.request.POST.get("code")
vp_tk = self.request.POST.get("vp_token")
if not vp_tk or not code:
raise Http404("Page not Found!")
org = self.validate(request)
2024-03-07 15:56:15 +00:00
self.vp_token = OAuth2VPToken(
2023-12-07 17:10:04 +00:00
vp_token = vp_tk,
organization=org,
code=code
)
2024-03-07 15:56:15 +00:00
2024-03-07 16:03:35 +00:00
if not self.vp_token.authorization:
2023-12-11 17:40:37 +00:00
raise Http404("Page not Found!")
2023-12-07 17:10:04 +00:00
2024-03-07 15:56:15 +00:00
self.vp_token.verifing()
response = self.vp_token.get_response_verify()
self.vp_token.save()
2024-03-07 14:31:07 +00:00
for user in User.objects.filter(is_admin=True):
2024-03-07 15:56:15 +00:00
self.send_email(user)
2024-02-02 08:18:15 +00:00
response["response"] = "Validation Code {}".format(code)
2023-12-07 17:10:04 +00:00
return JsonResponse(response)
def validate(self, request):
2023-11-28 16:33:24 +00:00
auth_header = request.headers.get('Authorization', b'')
auth_data = auth_header.split()
2023-11-29 10:42:20 +00:00
if len(auth_data) == 2 and auth_data[0].lower() == 'basic':
2023-11-28 16:33:24 +00:00
decoded_auth = base64.b64decode(auth_data[1]).decode('utf-8')
client_id, client_secret = decoded_auth.split(':', 1)
org = get_object_or_404(
2023-12-04 14:47:48 +00:00
Organization,
client_id=client_id,
client_secret=client_secret
)
2023-11-28 16:33:24 +00:00
return org
2023-12-04 14:47:48 +00:00
raise Http404("Page not Found!")
2023-11-29 12:21:49 +00:00
2024-03-07 14:31:07 +00:00
def send_email(self, user):
"""
Send a email when a user is activated.
"""
2024-06-06 11:46:59 +00:00
if not self.vp_token.result_verify:
2024-03-07 14:31:07 +00:00
return
email = self.get_email(user)
try:
if settings.ENABLE_EMAIL:
email.send()
return
logger.warning(user.email)
logger.warning(email.body)
except Exception as err:
logger.error(err)
return
def get_context(self):
url_domain = "https://{}/".format(settings.DOMAIN)
context = {
"domain": settings.DOMAIN,
"url_domain": url_domain,
"verification": self.get_verification(),
2024-03-07 16:52:39 +00:00
"code": self.vp_token.code,
2024-03-07 14:31:07 +00:00
}
return context
def get_email(self, user):
context = self.get_context()
subject = loader.render_to_string(self.subject_template_name, context)
# Email subject *must not* contain newlines
subject = ''.join(subject.splitlines())
body = loader.render_to_string(self.email_template_name, context)
from_email = settings.DEFAULT_FROM_EMAIL
to_email = user.email
email_message = EmailMultiAlternatives(
subject, body, from_email, [to_email])
html_email = loader.render_to_string(self.html_email_template_name, context)
email_message.attach_alternative(html_email, 'text/html')
return email_message
2024-03-07 16:52:39 +00:00
def get_verification(self):
2024-03-15 13:13:35 +00:00
return self.vp_token.get_user_info_all()
2023-12-07 17:10:04 +00:00
2024-03-26 17:50:42 +00:00
2023-12-07 17:10:04 +00:00
class AllowCodeView(View):
def get(self, request, *args, **kwargs):
code = self.request.GET.get("code")
if not code:
raise Http404("Page not Found!")
self.authorization = get_object_or_404(
Authorization,
code=code,
code_used=False
)
2024-03-26 17:50:42 +00:00
if self.request.session.get("response_uri"):
url = self.send_api()
2024-03-27 16:33:39 +00:00
if url:
return redirect(url)
2024-01-21 13:27:21 +00:00
promotion = self.authorization.promotions.first()
if not promotion:
return redirect(reverse_lazy('oidc4vp:received_code'))
2023-12-04 14:47:48 +00:00
2023-12-07 17:10:04 +00:00
return redirect(promotion.get_url(code))
2023-12-04 14:47:48 +00:00
2024-03-26 17:50:42 +00:00
def send_api(self):
vp = self.get_vp_token()
if not vp:
return
data = {
"vp_token": vp,
"code": self.authorization.code
}
url = self.request.session.get("response_uri")
result = requests.post(url, data=data)
return result.json().get('redirect_uri')
def get_vp_token(self):
vp = self.authorization.vp_tokens.first()
if not vp:
return
return base64.b64encode(vp.vp_token.encode()).decode()
def get_response_uri(self):
data = {
"code": self.authorization.code,
}
query_dict = QueryDict('', mutable=True)
query_dict.update(data)
response_uri = self.request.session.get("response_uri")
url = '{response_uri}?{params}'.format(
response_uri=response_uri,
params=query_dict.urlencode()
)
return url
class ReceivedCodeView(View):
template_name = "received_code.html"
def get(self, request, *args, **kwargs):
self.context = {}
template = loader.get_template(
self.template_name,
).render()
return HttpResponse(template)