IdHub/idhub/management/commands/initial_datas.py
2024-01-21 13:46:01 +01:00

116 lines
4 KiB
Python

import os
import csv
import json
from pathlib import Path
from utils import credtools
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth import get_user_model
from django.core.cache import cache
from decouple import config
from idhub.models import DID, Schemas
from oidc4vp.models import Organization
from promotion.models import Promotion
User = get_user_model()
class Command(BaseCommand):
help = "Insert minimum datas for the project"
def handle(self, *args, **kwargs):
ADMIN_EMAIL = config('ADMIN_EMAIL', 'admin@example.org')
ADMIN_PASSWORD = config('ADMIN_PASSWORD', '1234')
USER_EMAIL = config('USER_EMAIL', 'user1@example.org')
USER_PASSWORD = config('USER_PASSWORD', '1234')
self.create_admin_users(ADMIN_EMAIL, ADMIN_PASSWORD)
self.create_users(USER_EMAIL, USER_PASSWORD)
BASE_DIR = Path(__file__).resolve().parent.parent.parent.parent
ORGANIZATION = os.path.join(BASE_DIR, 'examples/organizations.csv')
with open(ORGANIZATION, newline='\n') as csvfile:
f = csv.reader(csvfile, delimiter=';', quotechar='"')
for r in f:
self.create_organizations(r[0].strip(), r[1].strip())
self.sync_credentials_organizations("pangea.org", "somconnexio.coop")
self.sync_credentials_organizations("local 8000", "local 9000")
self.create_schemas()
def create_admin_users(self, email, password):
su = User.objects.create_superuser(email=email, password=password)
su.set_encrypted_sensitive_data(password)
su.save()
key = su.decrypt_sensitive_data(password)
key_dids = {su.id: key}
cache.set("KEY_DIDS", key_dids, None)
self.create_defaults_dids(su, key)
def create_users(self, email, password):
u = User.objects.create(email=email, password=password)
u.set_password(password)
u.set_encrypted_sensitive_data(password)
u.save()
key = u.decrypt_sensitive_data(password)
self.create_defaults_dids(u, key)
def create_organizations(self, name, url):
Organization.objects.create(name=name, response_uri=url)
def sync_credentials_organizations(self, test1, test2):
org1 = Organization.objects.get(name=test1)
org2 = Organization.objects.get(name=test2)
org2.my_client_id = org1.client_id
org2.my_client_secret = org1.client_secret
org1.my_client_id = org2.client_id
org1.my_client_secret = org2.client_secret
org1.save()
org2.save()
def create_defaults_dids(self, u, password):
did = DID(label="Default", user=u, type=DID.Types.KEY)
did.set_did(password)
did.save()
def create_schemas(self):
schemas_files = os.listdir(settings.SCHEMAS_DIR)
schemas = [x for x in schemas_files
if not Schemas.objects.filter(file_schema=x).exists()]
for x in schemas_files:
if Schemas.objects.filter(file_schema=x).exists():
continue
self._create_schemas(x)
def _create_schemas(self, file_name):
data = self.open_file(file_name)
try:
ldata = json.loads(data)
assert credtools.validate_schema(ldata)
dname = ldata.get('name')
title = ldata.get('title')
assert dname
assert title
except Exception:
title = ''
return
name = ''
try:
for x in dname:
if settings.LANGUAGE_CODE in x['lang']:
name = x.get('value', '')
except Exception:
return
Schemas.objects.create(file_schema=file_name, data=data, type=title)
def open_file(self, file_name):
data = ''
filename = Path(settings.SCHEMAS_DIR).joinpath(file_name)
with filename.open() as schema_file:
data = schema_file.read()
return data