the script base
This commit is contained in:
parent
78de86c43a
commit
121efcbd09
108
migration-script.py
Normal file
108
migration-script.py
Normal file
|
@ -0,0 +1,108 @@
|
|||
import os
|
||||
import json
|
||||
import django
|
||||
import logging
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dhub.settings')
|
||||
|
||||
django.setup()
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from utils.save_snapshots import move_json, save_in_disk
|
||||
from evidence.parse import Build
|
||||
from evidence.models import Annotation
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
SEPARATOR = ";"
|
||||
PATH_SNAPTHOPS = "examples/snapshots"
|
||||
email_user = "user@example.org"
|
||||
|
||||
|
||||
### read csv ###
|
||||
def get_dict(row, header):
|
||||
if not row or not header:
|
||||
return
|
||||
if len(row) != len(header):
|
||||
return
|
||||
|
||||
return {row[i]: header[i] for i in range(len(header))}
|
||||
|
||||
|
||||
def open_csv(csv):
|
||||
# return a list of dictionaries whith the header of csv as keys of the dicts
|
||||
with open(csv) as f:
|
||||
_file = f.read()
|
||||
|
||||
rows = _file.split("\n")
|
||||
if len(rows) < 2:
|
||||
return []
|
||||
|
||||
header = rows[0].split(SEPARATOR)
|
||||
return [get_dict(row, header) for row in rows[1:]]
|
||||
### end read csv ###
|
||||
|
||||
|
||||
### read snapshot ###
|
||||
def search_snapshot_from_uuid(uuid):
|
||||
# search snapshot from uuid
|
||||
for root, _, files in os.walk(PATH_SNAPTHOPS):
|
||||
for f in files:
|
||||
if uuid in f:
|
||||
return os.path.join(root, f)
|
||||
|
||||
|
||||
def open_snapshot(uuid):
|
||||
snapshot_path = search_snapshot_from_uuid(uuid)
|
||||
if not snapshot_path:
|
||||
return None, None
|
||||
|
||||
with open(snapshot_path) as f:
|
||||
try:
|
||||
snap = json.loads(f.read())
|
||||
except Exception as err:
|
||||
logger.error("uuid: {}, error: {}".format(uuid, err))
|
||||
return None, None
|
||||
return snap, snapshot_path
|
||||
### end read snapshot ###
|
||||
|
||||
|
||||
def create_custom_id(dhid, uuid, user):
|
||||
tag = Annotation.objects.filter(
|
||||
uuid=uuid,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
key='CUSTOM_ID',
|
||||
owner=user.institution
|
||||
).first()
|
||||
|
||||
if tag:
|
||||
return
|
||||
|
||||
Annotation.objects.create(
|
||||
uuid=uuid,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
key='CUSTOM_ID',
|
||||
value=dhid,
|
||||
owner=user.institution,
|
||||
user=user
|
||||
)
|
||||
|
||||
|
||||
def migrate_snapshots(row, user):
|
||||
dhid = row.get("dhid")
|
||||
uuid = row.get("uuid")
|
||||
snapshot, snapshot_path = open_snapshot(uuid)
|
||||
if not snapshot or not snapshot_path:
|
||||
return
|
||||
|
||||
# insert snapshot
|
||||
path_name = save_in_disk(snapshot, user.institution.name)
|
||||
Build(path_name, user)
|
||||
move_json(path_name, user.institution.name)
|
||||
|
||||
# insert dhid
|
||||
create_custom_id(dhid, uuid)
|
Loading…
Reference in a new issue