django-orchestra/orchestra/apps/orchestration/manager.py

91 lines
3 KiB
Python
Raw Normal View History

import logging
2014-05-08 16:59:35 +00:00
import threading
from django import db
2014-05-13 13:46:40 +00:00
from orchestra.utils.python import import_class
2014-05-08 16:59:35 +00:00
from . import settings
from .helpers import send_report
logger = logging.getLogger(__name__)
2015-03-04 21:06:16 +00:00
router = import_class(settings.ORCHESTRATION_ROUTER)
2014-05-08 16:59:35 +00:00
def as_task(execute):
def wrapper(*args, **kwargs):
""" send report """
# Tasks run on a separate transaction pool (thread), no need to temper with the transaction
log = execute(*args, **kwargs)
2014-05-08 16:59:35 +00:00
if log.state != log.SUCCESS:
send_report(execute, args, log)
return log
return wrapper
def close_connection(execute):
""" Threads have their own connection pool, closing it when finishing """
def wrapper(*args, **kwargs):
2014-10-27 14:31:04 +00:00
try:
log = execute(*args, **kwargs)
except:
2014-10-28 09:51:27 +00:00
logger.error('EXCEPTION executing backend %s %s' % (str(args), str(kwargs)))
2014-10-27 14:31:04 +00:00
raise
else:
# Using the wrapper function as threader messenger for the execute output
wrapper.log = log
finally:
db.connection.close()
2014-05-08 16:59:35 +00:00
return wrapper
def execute(operations, async=False):
2014-05-08 16:59:35 +00:00
""" generates and executes the operations on the servers """
scripts = {}
2014-07-17 16:09:24 +00:00
cache = {}
2014-10-04 09:29:18 +00:00
# Generate scripts per server+backend
2014-05-08 16:59:35 +00:00
for operation in operations:
2014-10-02 15:58:27 +00:00
logger.debug("Queued %s" % str(operation))
2015-03-04 21:06:16 +00:00
if operation.servers is None:
operation.servers = router.get_servers(operation, cache=cache)
for server in operation.servers:
2014-05-08 16:59:35 +00:00
key = (server, operation.backend)
if key not in scripts:
scripts[key] = (operation.backend(), [operation])
2014-07-25 15:17:50 +00:00
scripts[key][0].prepare()
2014-05-08 16:59:35 +00:00
else:
scripts[key][1].append(operation)
2014-10-04 09:29:18 +00:00
# Get and call backend action method
2014-05-08 16:59:35 +00:00
method = getattr(scripts[key][0], operation.action)
method(operation.instance)
# Execute scripts on each server
threads = []
executions = []
for key, value in scripts.iteritems():
server, __ = key
backend, operations = value
backend.commit()
execute = as_task(backend.execute)
execute = close_connection(execute)
2015-02-25 13:24:11 +00:00
# DEBUG: substitute all thread related stuff for this function
#execute(server, async=async)
thread = threading.Thread(target=execute, args=(server,), kwargs={'async': async})
2014-05-08 16:59:35 +00:00
thread.start()
threads.append(thread)
executions.append((execute, operations))
[ thread.join() for thread in threads ]
logs = []
2014-10-04 09:29:18 +00:00
# collect results
2014-05-08 16:59:35 +00:00
for execution, operations in executions:
for operation in operations:
logger.info("Executed %s" % str(operation))
2014-05-08 16:59:35 +00:00
operation.log = execution.log
operation.save()
2014-10-04 09:29:18 +00:00
stdout = execution.log.stdout.strip()
stdout and logger.debug('STDOUT %s', stdout)
stderr = execution.log.stderr.strip()
stderr and logger.debug('STDERR %s', stderr)
2014-05-08 16:59:35 +00:00
logs.append(execution.log)
return logs