IdHub/oidc4vp/views.py

288 lines
9.2 KiB
Python

import json
import base64
import logging
import requests
from django.template import loader
from django.core.mail import EmailMultiAlternatives
from django.conf import settings
from django.views.generic.edit import View, FormView
from django.http import HttpResponse, Http404, JsonResponse, QueryDict
from django.shortcuts import get_object_or_404, redirect
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _
from django.urls import reverse_lazy
from django.contrib import messages
from django.contrib.auth import get_user_model
from oidc4vp.models import Authorization, Organization, OAuth2VPToken
from idhub.mixins import UserView
from idhub.models import Event
from oidc4vp.forms import AuthorizeForm
User = get_user_model()
logger = logging.getLogger(__name__)
class AuthorizeView(UserView, FormView):
title = _("My wallet")
section = "MyWallet"
template_name = "credentials_presentation.html"
subtitle = _('Credential presentation')
icon = 'bi bi-patch-check-fill'
form_class = AuthorizeForm
success_url = reverse_lazy('idhub:user_demand_authorization')
def get(self, request, *args, **kwargs):
response = super().get(request, *args, **kwargs)
if self.request.session.get('next_url'):
return redirect(reverse_lazy('idhub:user_credentials_request'))
return response
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
try:
vps = json.loads(self.request.GET.get('presentation_definition'))
except Exception:
vps = []
kwargs['presentation_definition'] = vps
kwargs["org"] = self.get_org()
kwargs["code"] = self.request.GET.get('code')
return kwargs
def get_form(self, form_class=None):
form = super().get_form(form_class=form_class)
if form.all_credentials.exists() and not form.credentials.exists():
self.request.session['next_url'] = self.request.get_full_path()
return form
def form_valid(self, form):
authorization = form.save()
if not authorization or authorization.status_code != 200:
messages.error(self.request, _("Error sending credential!"))
return redirect(self.success_url)
try:
authorization = authorization.json()
except Exception:
messages.error(self.request, _("Error sending credential!"))
return redirect(self.success_url)
verify = authorization.get('verify')
result, msg = verify.split(",")
if 'error' in result.lower():
messages.error(self.request, msg)
if 'ok' in result.lower():
messages.success(self.request, msg)
cred = form.credentials.first()
verifier = form.org.name
if cred and verifier:
Event.set_EV_CREDENTIAL_PRESENTED(cred, verifier)
if authorization.get('redirect_uri'):
return redirect(authorization.get('redirect_uri'))
elif authorization.get('response'):
txt = authorization.get('response')
messages.success(self.request, txt)
Event.set_EV_USR_SEND_CREDENTIAL(txt)
txt2 = f"Verifier {verifier} send: " + txt
Event.set_EV_USR_SEND_VP(txt2, self.request.user)
url = reverse_lazy('idhub:user_dashboard')
return redirect(url)
return redirect(self.success_url)
def get_org(self):
client_id = self.request.GET.get("client_id")
if not client_id:
raise Http404("Organization not found!")
org = get_object_or_404(
Organization,
client_id=client_id,
)
return org
@method_decorator(csrf_exempt, name='dispatch')
class VerifyView(View):
subject_template_name = 'email/verify_subject.txt'
email_template_name = 'email/verify_email.txt'
html_email_template_name = 'email/verify_email.html'
def get(self, request, *args, **kwargs):
org = self.validate(request)
presentation_definition = json.dumps(settings.SUPPORTED_CREDENTIALS)
authorization = Authorization(
organization=org,
presentation_definition=presentation_definition
)
authorization.save()
res = json.dumps({"redirect_uri": authorization.authorize()})
return HttpResponse(res)
def post(self, request, *args, **kwargs):
code = self.request.POST.get("code")
vp_tk = self.request.POST.get("vp_token")
if not vp_tk or not code:
raise Http404("Page not Found!")
org = self.validate(request)
self.vp_token = OAuth2VPToken(
vp_token = vp_tk,
organization=org,
code=code
)
if not self.vp_token.authorization:
raise Http404("Page not Found!")
self.vp_token.verifing()
response = self.vp_token.get_response_verify()
self.vp_token.save()
for user in User.objects.filter(is_admin=True):
self.send_email(user)
response["response"] = "Validation Code {}".format(code)
return JsonResponse(response)
def validate(self, request):
auth_header = request.headers.get('Authorization', b'')
auth_data = auth_header.split()
if len(auth_data) == 2 and auth_data[0].lower() == 'basic':
decoded_auth = base64.b64decode(auth_data[1]).decode('utf-8')
client_id, client_secret = decoded_auth.split(':', 1)
org = get_object_or_404(
Organization,
client_id=client_id,
client_secret=client_secret
)
return org
raise Http404("Page not Found!")
def send_email(self, user):
"""
Send a email when a user is activated.
"""
if not self.vp_token.result_verify:
return
email = self.get_email(user)
try:
if settings.ENABLE_EMAIL:
email.send()
return
logger.warning(user.email)
logger.warning(email.body)
except Exception as err:
logger.error(err)
return
def get_context(self):
url_domain = "https://{}/".format(settings.DOMAIN)
context = {
"domain": settings.DOMAIN,
"url_domain": url_domain,
"verification": self.get_verification(),
"code": self.vp_token.code,
}
return context
def get_email(self, user):
context = self.get_context()
subject = loader.render_to_string(self.subject_template_name, context)
# Email subject *must not* contain newlines
subject = ''.join(subject.splitlines())
body = loader.render_to_string(self.email_template_name, context)
from_email = settings.DEFAULT_FROM_EMAIL
to_email = user.email
email_message = EmailMultiAlternatives(
subject, body, from_email, [to_email])
html_email = loader.render_to_string(self.html_email_template_name, context)
email_message.attach_alternative(html_email, 'text/html')
return email_message
def get_verification(self):
return self.vp_token.get_user_info_all()
class AllowCodeView(View):
def get(self, request, *args, **kwargs):
code = self.request.GET.get("code")
if not code:
raise Http404("Page not Found!")
self.authorization = get_object_or_404(
Authorization,
code=code,
code_used=False
)
if self.request.session.get("response_uri"):
url = self.send_api()
if url:
return redirect(url)
promotion = self.authorization.promotions.first()
if not promotion:
return redirect(reverse_lazy('oidc4vp:received_code'))
return redirect(promotion.get_url(code))
def send_api(self):
vp = self.get_vp_token()
if not vp:
return
data = {
"vp_token": vp,
"code": self.authorization.code
}
url = self.request.session.get("response_uri")
result = requests.post(url, data=data)
return result.json().get('redirect_uri')
def get_vp_token(self):
vp = self.authorization.vp_tokens.first()
if not vp:
return
return base64.b64encode(vp.vp_token.encode()).decode()
def get_response_uri(self):
data = {
"code": self.authorization.code,
}
query_dict = QueryDict('', mutable=True)
query_dict.update(data)
response_uri = self.request.session.get("response_uri")
url = '{response_uri}?{params}'.format(
response_uri=response_uri,
params=query_dict.urlencode()
)
return url
class ReceivedCodeView(View):
template_name = "received_code.html"
def get(self, request, *args, **kwargs):
self.context = {}
template = loader.get_template(
self.template_name,
).render()
return HttpResponse(template)