This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
devicehub-teal/ereuse_devicehub/modules/oidc/views.py

333 lines
9.6 KiB
Python
Raw Normal View History

2023-06-16 10:39:03 +00:00
import json
import logging
2023-12-08 19:54:44 +00:00
import base64
2023-06-16 10:39:03 +00:00
import requests
from authlib.integrations.flask_oauth2 import current_token
from authlib.oauth2 import OAuth2Error
from flask import (
Blueprint,
g,
jsonify,
redirect,
render_template,
request,
session,
url_for,
2023-12-12 19:40:11 +00:00
current_app as app
2023-06-16 10:39:03 +00:00
)
from flask_login import login_required
from ereuse_devicehub import __version__, messages
2023-12-12 19:40:11 +00:00
from ereuse_devicehub.db import db
2023-06-16 10:39:03 +00:00
from ereuse_devicehub.modules.oidc.forms import (
AuthorizeForm,
CreateClientForm,
ListInventoryForm,
)
2023-12-12 19:40:11 +00:00
from ereuse_devicehub.modules.oidc.models import (
MemberFederated,
OAuth2Client,
Code2Roles
)
2023-06-16 10:39:03 +00:00
from ereuse_devicehub.modules.oidc.oauth2 import (
authorization,
generate_user_info,
require_oauth,
)
from ereuse_devicehub.views import GenericMixin
oidc = Blueprint('oidc', __name__, url_prefix='/', template_folder='templates')
logger = logging.getLogger(__name__)
##########
# Server #
##########
class CreateClientView(GenericMixin):
methods = ['GET', 'POST']
decorators = [login_required]
template_name = 'create_client.html'
title = "Edit Open Id Connect Client"
def dispatch_request(self):
form = CreateClientForm()
if form.validate_on_submit():
form.save()
next_url = url_for('core.user-profile')
return redirect(next_url)
self.get_context()
self.context.update(
{
'form': form,
'title': self.title,
}
)
return render_template(self.template_name, **self.context)
class AuthorizeView(GenericMixin):
methods = ['GET', 'POST']
decorators = [login_required]
template_name = 'authorize.html'
title = "Authorize"
def dispatch_request(self):
form = AuthorizeForm()
client = OAuth2Client.query.filter_by(
client_id=request.args.get('client_id')
).first()
if not client:
messages.error('Not exist client')
return redirect(url_for('core.user-profile'))
if form.validate_on_submit():
if not form.consent.data:
return redirect(url_for('core.user-profile'))
return authorization.create_authorization_response(grant_user=g.user)
try:
grant = authorization.validate_consent_request(end_user=g.user)
except OAuth2Error as error:
messages.error(error.error)
return redirect(url_for('core.user-profile'))
self.get_context()
self.context.update(
{'form': form, 'title': self.title, 'user': g.user, 'grant': grant}
)
return render_template(self.template_name, **self.context)
class IssueTokenView(GenericMixin):
methods = ['POST']
decorators = []
def dispatch_request(self):
return authorization.create_token_response()
class OauthProfileView(GenericMixin):
methods = ['GET']
decorators = []
template_name = 'authorize.html'
title = "Authorize"
@require_oauth('profile')
def dispatch_request(self):
return jsonify(generate_user_info(current_token.user, current_token.scope))
##########
# Client #
##########
class SelectInventoryView(GenericMixin):
methods = ['GET', 'POST']
decorators = []
template_name = 'select_inventory.html'
title = "Select an Inventory"
def dispatch_request(self):
2023-12-12 19:40:11 +00:00
host = app.config.get('HOST', '').strip("/")
url = "https://ebsi-pcp-wallet-ui.vercel.app/oid4vp?"
url += f"client_id=https://{host}&"
url += "presentation_definition_uri=https%3A%2F%2Fiotaledger.github.io"
url += "%2Febsi-stardust-components%2Fpublic%2Fpresentation-definition-ex1.json&"
url += f"response_uri=https://{host}/allow_code_oidc4vp"
url += "&state=1700822573400&response_type=vp_token&response_mode=direct_post"
url += "&nonce=DybC3A%3D%3D"
2023-06-16 10:39:03 +00:00
next = request.args.get('next', '#')
2023-12-12 19:40:11 +00:00
session['next_url'] = next
return redirect(url, code=302)
2023-06-16 10:39:03 +00:00
class AllowCodeView(GenericMixin):
methods = ['GET', 'POST']
decorators = []
userinfo = None
token = None
discovery = {}
def dispatch_request(self):
self.code = request.args.get('code')
self.oidc = session.get('oidc')
if not self.code or not self.oidc:
return self.redirect()
self.member = MemberFederated.query.filter(
MemberFederated.dlt_id_provider == self.oidc,
MemberFederated.client_id.isnot(None),
MemberFederated.client_secret.isnot(None),
).first()
if not self.member:
return self.redirect()
self.get_token()
if 'error' in self.token:
messages.error(self.token.get('error', ''))
return self.redirect()
self.get_user_info()
return self.redirect()
def get_discovery(self):
if self.discovery:
return self.discovery
try:
url_well_known = self.member.domain + '.well-known/openid-configuration'
self.discovery = requests.get(url_well_known).json()
except Exception:
self.discovery = {'code': 404}
return self.discovery
def get_token(self):
data = {'grant_type': 'authorization_code', 'code': self.code}
url = self.member.domain + '/oauth/token'
url = self.get_discovery().get('token_endpoint', url)
auth = (self.member.client_id, self.member.client_secret)
msg = requests.post(url, data=data, auth=auth)
self.token = json.loads(msg.text)
def redirect(self):
url = session.get('next_url') or '/login'
return redirect(url)
def get_user_info(self):
if self.userinfo:
return self.userinfo
if 'access_token' not in self.token:
return
url = self.member.domain + '/oauth/userinfo'
url = self.get_discovery().get('userinfo_endpoint', url)
access_token = self.token['access_token']
token_type = self.token.get('token_type', 'Bearer')
headers = {"Authorization": f"{token_type} {access_token}"}
msg = requests.get(url, headers=headers)
self.userinfo = json.loads(msg.text)
2023-06-23 09:59:45 +00:00
rols = self.userinfo.get('rols', [])
2023-06-16 10:39:03 +00:00
session['rols'] = [(k, k) for k in rols]
return self.userinfo
2023-12-06 13:03:49 +00:00
class AllowCodeOidc4vpView(GenericMixin):
methods = ['POST']
decorators = []
userinfo = None
token = None
discovery = {}
def dispatch_request(self):
2023-12-12 19:40:11 +00:00
vcredential = self.get_credential()
if not vcredential:
return jsonify({"error": "No there are credentials"})
roles = self.verify(vcredential)
if not roles:
return jsonify({"error": "No there are roles"})
uri = self.get_response_uri(roles)
return jsonify({"redirect_uri": uri})
def get_credential(self):
2023-12-08 19:54:44 +00:00
self.vp_token = request.values.get("vp_token")
2023-12-11 20:30:56 +00:00
pv = self.vp_token.split(".")
token = json.loads(base64.b64decode(pv[1]).decode())
2023-12-12 19:40:11 +00:00
return token.get('vp', {}).get("verifiableCredential")
def verify(self, vcredential):
WALLET_INX_EBSI_PLUGIN_TOKEN = app.config.get(
'WALLET_INX_EBSI_PLUGIN_TOKEN'
)
WALLET_INX_EBSI_PLUGIN_URL = app.config.get(
'WALLET_INX_EBSI_PLUGIN_URL'
)
2023-12-08 19:54:44 +00:00
headers = {
'Content-Type': 'application/json',
2023-12-12 19:40:11 +00:00
'Authorization': f'Bearer {WALLET_INX_EBSI_PLUGIN_TOKEN}'
2023-12-08 19:54:44 +00:00
}
data = json.dumps({
"type": "VerificationRequest",
2023-12-11 20:30:56 +00:00
"jwtCredential": vcredential
2023-12-08 19:54:44 +00:00
})
2023-12-12 19:40:11 +00:00
result = requests.post(
WALLET_INX_EBSI_PLUGIN_URL,
headers=headers,
data=data
)
2023-12-11 20:30:56 +00:00
if result.status_code != 200:
return
vps = json.loads(result.text)
if not vps.get('verified'):
return
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
return vps['credential']['credentialSubject'].get('role')
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
def get_response_uri(selfi, roles):
code = Code2Roles(roles=roles)
db.session.add(code)
db.session.commit()
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
url = "https://{host}/allow_code_oidc4vp2?code={code}".format(
host=app.config.get('HOST'),
code=code.code
)
return url
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
class AllowCodeOidc4vp2View(GenericMixin):
methods = ['GET', 'POST']
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
def dispatch_request(self):
self.code = request.args.get('code')
if not self.code:
return self.redirect()
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
self.get_user_info()
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
return self.redirect()
2023-12-06 13:03:49 +00:00
def redirect(self):
2023-12-13 09:39:00 +00:00
url = session.pop('next_url', '/login')
2023-12-06 13:03:49 +00:00
return redirect(url)
def get_user_info(self):
2023-12-12 19:40:11 +00:00
code = Code2Roles.query.filter(code=self.code).first()
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
if not code:
return
2023-12-06 13:03:49 +00:00
2023-12-12 19:40:11 +00:00
session['rols'] = [(k.strip(), k.strip()) for k in code.roles.split(",")]
db.session.delete(code)
db.session.commit()
2023-12-06 13:03:49 +00:00
2023-06-16 10:39:03 +00:00
##########
# Routes #
##########
oidc.add_url_rule('/create_client', view_func=CreateClientView.as_view('create_client'))
oidc.add_url_rule('/oauth/authorize', view_func=AuthorizeView.as_view('autorize_oidc'))
oidc.add_url_rule('/allow_code', view_func=AllowCodeView.as_view('allow_code'))
2023-12-06 13:03:49 +00:00
oidc.add_url_rule('/allow_code_oidc4vp', view_func=AllowCodeOidc4vpView.as_view('allow_code_oidc4vp'))
2023-12-12 19:40:11 +00:00
oidc.add_url_rule('/allow_code_oidc4vp2', view_func=AllowCodeOidc4vp2View.as_view('allow_code_oidc4vp2'))
2023-06-16 10:39:03 +00:00
oidc.add_url_rule('/oauth/token', view_func=IssueTokenView.as_view('oauth_issue_token'))
oidc.add_url_rule(
'/oauth/userinfo', view_func=OauthProfileView.as_view('oauth_user_info')
)
oidc.add_url_rule(
'/oidc/client/select',
view_func=SelectInventoryView.as_view('login_other_inventory'),
)