diff --git a/orchestra/bin/orchestra-beat b/orchestra/bin/orchestra-beat
index 09d13fa2..b11eda09 100755
--- a/orchestra/bin/orchestra-beat
+++ b/orchestra/bin/orchestra-beat
@@ -27,7 +27,7 @@ class crontab_parser(object):
_range = r'(\w+?)-(\w+)'
_steps = r'/(\w+)?'
_star = r'\*'
-
+
def __init__(self, max_=60, min_=0):
self.max_ = max_
self.min_ = min_
@@ -45,14 +45,14 @@ class crontab_parser(object):
raise self.ParseException('empty part')
acc |= set(self._parse_part(part))
return acc
-
+
def _parse_part(self, part):
for regex, handler in self.pats:
m = regex.match(part)
if m:
return handler(m.groups())
return self._expand_range((part, ))
-
+
def _expand_range(self, toks):
fr = self._expand_number(toks[0])
if len(toks) > 1:
@@ -62,19 +62,19 @@ class crontab_parser(object):
list(range(self.min_, to + 1)))
return list(range(fr, to + 1))
return [fr]
-
+
def _range_steps(self, toks):
if len(toks) != 3 or not toks[2]:
raise self.ParseException('empty filter')
return self._expand_range(toks[:2])[::int(toks[2])]
-
+
def _star_steps(self, toks):
if not toks or not toks[0]:
raise self.ParseException('empty filter')
return self._expand_star()[::int(toks[0])]
def _expand_star(self, *args):
return list(range(self.min_, self.max_ + self.min_))
-
+
def _expand_number(self, s):
if isinstance(s, str) and s[0] == '-':
raise self.ParseException('negative numbers not supported')
@@ -99,7 +99,7 @@ class Setting(object):
def __init__(self, manage):
self.manage = manage
self.settings_file = self.get_settings_file(manage)
-
+
def get_settings(self):
""" get db settings from settings.py file without importing """
settings = {'__file__': self.settings_file}
@@ -111,7 +111,7 @@ class Setting(object):
content += line
exec(content, settings)
return settings
-
+
def get_settings_file(self, manage):
with open(manage, 'r') as handler:
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
@@ -128,7 +128,7 @@ class Setting(object):
class DB(object):
def __init__(self, settings):
self.settings = settings['DATABASES']['default']
-
+
def connect(self):
if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
import sqlite3
@@ -138,7 +138,7 @@ class DB(object):
self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
else:
raise ValueError("%s engine not supported." % self.settings['ENGINE'])
-
+
def query(self, query):
cur = self.conn.cursor()
try:
@@ -147,7 +147,7 @@ class DB(object):
finally:
cur.close()
return result
-
+
def close(self):
self.conn.close()
@@ -161,7 +161,7 @@ def fire_pending_tasks(manage, db):
"WHERE p.crontab_id = c.id AND p.enabled = {}"
).format(enabled)
return db.query(query)
-
+
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
return (
@@ -171,14 +171,14 @@ def fire_pending_tasks(manage, db):
n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and
n_month_of_year in crontab_parser(12, 1).parse(month_of_year)
)
-
+
now = datetime.utcnow()
now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
manage=manage, task_id=task_id)
- proc = run(command, async=True)
+ proc = run(command, run_async=True)
yield proc
@@ -187,7 +187,7 @@ def fire_pending_messages(settings, db):
MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
now = datetime.utcnow()
query_or = []
-
+
for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds)
epoch = now-delta
@@ -198,10 +198,10 @@ def fire_pending_messages(settings, db):
WHERE (mailer_message.state = 'QUEUED'
OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
return bool(db.query(query))
-
+
if has_pending_messages(settings, db):
command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage)
- proc = run(command, async=True)
+ proc = run(command, run_async=True)
yield proc
diff --git a/orchestra/contrib/orchestration/__init__.py b/orchestra/contrib/orchestration/__init__.py
index b5aded20..c4774481 100644
--- a/orchestra/contrib/orchestration/__init__.py
+++ b/orchestra/contrib/orchestration/__init__.py
@@ -15,21 +15,21 @@ class Operation():
MONITOR = 'monitor'
EXCEEDED = 'exceeded'
RECOVERY = 'recovery'
-
+
def __str__(self):
return '%s.%s(%s)' % (self.backend, self.action, self.instance)
-
+
def __repr__(self):
return str(self)
-
+
def __hash__(self):
""" set() """
return hash((self.backend, self.instance, self.action))
-
+
def __eq__(self, operation):
""" set() """
return hash(self) == hash(operation)
-
+
def __init__(self, backend, instance, action, routes=None):
self.backend = backend
# instance should maintain any dynamic attribute until backend execution
@@ -37,13 +37,13 @@ class Operation():
self.instance = copy.deepcopy(instance)
self.action = action
self.routes = routes
-
+
@classmethod
- def execute(cls, operations, serialize=False, async=None):
+ def execute(cls, operations, serialize=False, run_async=None):
from . import manager
scripts, backend_serialize = manager.generate(operations)
- return manager.execute(scripts, serialize=(serialize or backend_serialize), async=async)
-
+ return manager.execute(scripts, serialize=(serialize or backend_serialize), run_async=run_async)
+
@classmethod
def create_for_action(cls, instances, action):
if not isinstance(instances, collections.Iterable):
@@ -56,13 +56,13 @@ class Operation():
cls(backend_cls, instance, action)
)
return operations
-
+
@classmethod
def execute_action(cls, instances, action):
""" instances can be an object or an iterable for batch processing """
operations = cls.create_for_action(instances, action)
return cls.execute(operations)
-
+
def preload_context(self):
"""
Heuristic: Running get_context will prevent most of related objects do not exist errors
@@ -70,7 +70,7 @@ class Operation():
if self.action == self.DELETE:
if hasattr(self.backend, 'get_context'):
self.backend().get_context(self.instance)
-
+
def store(self, log):
from .models import BackendOperation
return BackendOperation.objects.create(
@@ -79,7 +79,7 @@ class Operation():
instance=self.instance,
action=self.action,
)
-
+
@classmethod
def load(cls, operation, log=None):
routes = None
@@ -88,4 +88,4 @@ class Operation():
(operation.backend, operation.action): AttrDict(host=log.server)
}
return cls(operation.backend_class, operation.instance, operation.action, routes=routes)
-
+
diff --git a/orchestra/contrib/orchestration/admin.py b/orchestra/contrib/orchestration/admin.py
index da9cfe01..60737d09 100644
--- a/orchestra/contrib/orchestration/admin.py
+++ b/orchestra/contrib/orchestration/admin.py
@@ -30,25 +30,25 @@ STATE_COLORS = {
class RouteAdmin(ExtendedModelAdmin):
list_display = (
- 'display_backend', 'host', 'match', 'display_model', 'display_actions', 'async',
+ 'display_backend', 'host', 'match', 'display_model', 'display_actions', 'run_async',
'is_active'
)
- list_editable = ('host', 'match', 'async', 'is_active')
- list_filter = ('host', 'is_active', 'async', 'backend')
+ list_editable = ('host', 'match', 'run_async', 'is_active')
+ list_filter = ('host', 'is_active', 'run_async', 'backend')
list_prefetch_related = ('host',)
ordering = ('backend',)
- add_fields = ('backend', 'host', 'match', 'async', 'is_active')
+ add_fields = ('backend', 'host', 'match', 'run_async', 'is_active')
change_form = RouteForm
actions = (orchestrate,)
change_view_actions = actions
-
+
BACKEND_HELP_TEXT = helpers.get_backends_help_text(ServiceBackend.get_backends())
DEFAULT_MATCH = {
backend.get_name(): backend.default_route_match for backend in ServiceBackend.get_backends()
}
-
+
display_backend = display_plugin_field('backend')
-
+
def display_model(self, route):
try:
return escape(route.backend_class.model)
@@ -56,7 +56,7 @@ class RouteAdmin(ExtendedModelAdmin):
return "NOT AVAILABLE"
display_model.short_description = _("model")
display_model.allow_tags = True
-
+
def display_actions(self, route):
try:
return '
'.join(route.backend_class.get_actions())
@@ -64,7 +64,7 @@ class RouteAdmin(ExtendedModelAdmin):
return "NOT AVAILABLE"
display_actions.short_description = _("actions")
display_actions.allow_tags = True
-
+
def formfield_for_dbfield(self, db_field, **kwargs):
""" Provides dynamic help text on backend form field """
if db_field.name == 'backend':
@@ -79,23 +79,23 @@ class RouteAdmin(ExtendedModelAdmin):
request._host_choices_cache = choices = list(field.choices)
field.choices = choices
return field
-
+
def get_form(self, request, obj=None, **kwargs):
""" Include dynamic help text for existing objects """
form = super(RouteAdmin, self).get_form(request, obj, **kwargs)
if obj:
form.base_fields['backend'].help_text = self.BACKEND_HELP_TEXT.get(obj.backend, '')
return form
-
+
def show_orchestration_disabled(self, request):
if settings.ORCHESTRATION_DISABLE_EXECUTION:
msg = _("Orchestration execution is disabled by ORCHESTRATION_DISABLE_EXECUTION setting.")
self.message_user(request, mark_safe(msg), messages.WARNING)
-
+
def changelist_view(self, request, extra_context=None):
self.show_orchestration_disabled(request)
return super(RouteAdmin, self).changelist_view(request, extra_context)
-
+
def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
self.show_orchestration_disabled(request)
return super(RouteAdmin, self).changeform_view(
@@ -108,12 +108,12 @@ class BackendOperationInline(admin.TabularInline):
readonly_fields = ('action', 'instance_link')
extra = 0
can_delete = False
-
+
class Media:
css = {
'all': ('orchestra/css/hide-inline-id.css',)
}
-
+
def instance_link(self, operation):
link = admin_link('instance')(self, operation)
if link == '---':
@@ -122,10 +122,10 @@ class BackendOperationInline(admin.TabularInline):
return link
instance_link.allow_tags = True
instance_link.short_description = _("Instance")
-
+
def has_add_permission(self, *args, **kwargs):
return False
-
+
def get_queryset(self, request):
queryset = super(BackendOperationInline, self).get_queryset(request)
return queryset.prefetch_related('instance')
@@ -149,7 +149,7 @@ class BackendLogAdmin(ChangeViewActionsMixin, admin.ModelAdmin):
readonly_fields = fields
actions = (retry_backend,)
change_view_actions = actions
-
+
server_link = admin_link('server')
display_created = admin_date('created_at', short_description=_("Created"))
display_state = admin_colored('state', colors=STATE_COLORS)
@@ -157,17 +157,17 @@ class BackendLogAdmin(ChangeViewActionsMixin, admin.ModelAdmin):
mono_stdout = display_mono('stdout')
mono_stderr = display_mono('stderr')
mono_traceback = display_mono('traceback')
-
+
class Media:
css = {
'all': ('orchestra/css/pygments/github.css',)
}
-
+
def get_queryset(self, request):
""" Order by structured name and imporve performance """
qs = super(BackendLogAdmin, self).get_queryset(request)
return qs.select_related('server').defer('script', 'stdout')
-
+
def has_add_permission(self, *args, **kwargs):
return False
@@ -177,17 +177,17 @@ class ServerAdmin(ExtendedModelAdmin):
list_filter = ('os',)
actions = (orchestrate,)
change_view_actions = actions
-
+
def display_ping(self, instance):
return self._remote_state[instance.pk][0]
display_ping.short_description = _("Ping")
display_ping.allow_tags = True
-
+
def display_uptime(self, instance):
return self._remote_state[instance.pk][1]
display_uptime.short_description = _("Uptime")
display_uptime.allow_tags = True
-
+
def get_queryset(self, request):
""" Order by structured name and imporve performance """
qs = super(ServerAdmin, self).get_queryset(request)
diff --git a/orchestra/contrib/orchestration/backends.py b/orchestra/contrib/orchestration/backends.py
index cf861102..b2f22267 100644
--- a/orchestra/contrib/orchestration/backends.py
+++ b/orchestra/contrib/orchestration/backends.py
@@ -31,7 +31,7 @@ class ServiceMount(plugins.PluginMount):
class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
"""
Service management backend base class
-
+
It uses the _unit of work_ design principle, which allows bulk operations to
be conviniently supported. Each backend generates the configuration for all
the changes of all modified objects, reloading the daemon just once.
@@ -52,15 +52,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
# By default backend will not run if actions do not generate insctructions,
# If your backend uses prepare() or commit() only then you should set force_empty_action_execution = True
force_empty_action_execution = False
-
+
def __str__(self):
return type(self).__name__
-
+
def __init__(self):
self.head = []
self.content = []
self.tail = []
-
+
def __getattribute__(self, attr):
""" Select head, content or tail section depending on the method name """
IGNORE_ATTRS = (
@@ -83,29 +83,29 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
elif attr not in IGNORE_ATTRS and attr in self.actions:
self.set_content()
return super(ServiceBackend, self).__getattribute__(attr)
-
+
def set_head(self):
self.cmd_section = self.head
-
+
def set_tail(self):
self.cmd_section = self.tail
-
+
def set_content(self):
self.cmd_section = self.content
-
+
@classmethod
def get_actions(cls):
return [ action for action in cls.actions if action in dir(cls) ]
-
+
@classmethod
def get_name(cls):
return cls.__name__
-
+
@classmethod
def is_main(cls, obj):
opts = obj._meta
return cls.model == '%s.%s' % (opts.app_label, opts.object_name)
-
+
@classmethod
def get_related(cls, obj):
opts = obj._meta
@@ -122,7 +122,7 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
return related.all()
return [related]
return []
-
+
@classmethod
def get_backends(cls, instance=None, action=None):
backends = cls.get_plugins()
@@ -140,15 +140,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
if include:
included.append(backend)
return included
-
+
@classmethod
def get_backend(cls, name):
return cls.get(name)
-
+
@classmethod
def model_class(cls):
return apps.get_model(cls.model)
-
+
@property
def scripts(self):
""" group commands based on their method """
@@ -163,12 +163,12 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
except KeyError:
pass
return list(scripts.items())
-
+
def get_banner(self):
now = timezone.localtime(timezone.now())
time = now.strftime("%h %d, %Y %I:%M:%S %Z")
return "Generated by Orchestra at %s" % time
-
+
def create_log(self, server, **kwargs):
from .models import BackendLog
state = BackendLog.RECEIVED
@@ -181,8 +181,8 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
manager = manager.using(using)
log = manager.create(backend=self.get_name(), state=state, server=server)
return log
-
- def execute(self, server, async=False, log=None):
+
+ def execute(self, server, run_async=False, log=None):
from .models import BackendLog
if log is None:
log = self.create_log(server)
@@ -190,11 +190,11 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
if run:
scripts = self.scripts
for method, commands in scripts:
- method(log, server, commands, async)
+ method(log, server, commands, run_async)
if log.state != BackendLog.SUCCESS:
break
return log
-
+
def append(self, *cmd):
# aggregate commands acording to its execution method
if isinstance(cmd[0], str):
@@ -207,10 +207,10 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
self.cmd_section.append((method, [cmd]))
else:
self.cmd_section[-1][1].append(cmd)
-
+
def get_context(self, obj):
return {}
-
+
def prepare(self):
"""
hook for executing something at the beging
@@ -221,7 +221,7 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
set -o pipefail
exit_code=0""")
)
-
+
def commit(self):
"""
hook for executing something at the end
@@ -235,11 +235,11 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
class ServiceController(ServiceBackend):
actions = ('save', 'delete')
abstract = True
-
+
@classmethod
def get_verbose_name(cls):
return _("[S] %s") % super(ServiceController, cls).get_verbose_name()
-
+
@classmethod
def get_backends(cls):
""" filter controller classes """
diff --git a/orchestra/contrib/orchestration/helpers.py b/orchestra/contrib/orchestration/helpers.py
index 7f0deae8..798164d0 100644
--- a/orchestra/contrib/orchestration/helpers.py
+++ b/orchestra/contrib/orchestration/helpers.py
@@ -105,7 +105,7 @@ def get_backend_url(ids):
def get_messages(logs):
messages = []
- total, successes, async = 0, 0, 0
+ total, successes, run_async = 0, 0, 0
ids = []
async_ids = []
for log in logs:
@@ -118,17 +118,17 @@ def get_messages(logs):
if log.is_success:
successes += 1
elif not log.has_finished:
- async += 1
+ run_async += 1
async_ids.append(log.id)
- errors = total-successes-async
+ errors = total-successes-run_async
url = get_backend_url(ids)
async_url = get_backend_url(async_ids)
async_msg = ''
- if async:
+ if run_async:
async_msg = ungettext(
_('{name} is running on the background'),
- _('{async} backends are running on the background'),
- async)
+ _('{run_async} backends are running on the background'),
+ run_async)
if errors:
if total == 1:
msg = _('{name} has fail to execute')
@@ -139,7 +139,7 @@ def get_messages(logs):
errors)
if async_msg:
msg += ', ' + str(async_msg)
- msg = msg.format(errors=errors, async=async, async_url=async_url, total=total, url=url,
+ msg = msg.format(errors=errors, run_async=run_async, async_url=async_url, total=total, url=url,
name=log.backend)
messages.append(('error', msg + '.'))
elif successes:
@@ -158,12 +158,12 @@ def get_messages(logs):
_('{total} backends have been executed'),
total)
msg = msg.format(
- total=total, url=url, async_url=async_url, async=async, successes=successes,
+ total=total, url=url, async_url=async_url, run_async=run_async, successes=successes,
name=log.backend
)
messages.append(('success', msg + '.'))
else:
- msg = async_msg.format(url=url, async_url=async_url, async=async, name=log.backend)
+ msg = async_msg.format(url=url, async_url=async_url, run_async=run_async, name=log.backend)
messages.append(('success', msg + '.'))
return messages
diff --git a/orchestra/contrib/orchestration/management/commands/orchestrate.py b/orchestra/contrib/orchestration/management/commands/orchestrate.py
index 4b076f73..211f1b59 100644
--- a/orchestra/contrib/orchestration/management/commands/orchestrate.py
+++ b/orchestra/contrib/orchestration/management/commands/orchestrate.py
@@ -12,7 +12,7 @@ from orchestra.utils.sys import confirm
class Command(BaseCommand):
help = 'Runs orchestration backends.'
-
+
def add_arguments(self, parser):
parser.add_argument('model', nargs='?',
help='Label of a model to execute the orchestration.')
@@ -30,8 +30,8 @@ class Command(BaseCommand):
help='List available baclends.')
parser.add_argument('--dry-run', action='store_true', dest='dry', default=False,
help='Only prints scrtipt.')
-
-
+
+
def collect_operations(self, **options):
model = options.get('model')
backends = options.get('backends') or set()
@@ -66,7 +66,7 @@ class Command(BaseCommand):
model = apps.get_model(*model.split('.'))
queryset = model.objects.filter(**kwargs).order_by('id')
querysets = [queryset]
-
+
operations = OrderedSet()
route_cache = {}
for queryset in querysets:
@@ -88,7 +88,7 @@ class Command(BaseCommand):
result.append(operation)
operations = result
return operations
-
+
def handle(self, *args, **options):
list_backends = options.get('list_backends')
if list_backends:
@@ -116,7 +116,7 @@ class Command(BaseCommand):
if not confirm("\n\nAre your sure to execute the previous scripts on %(servers)s (yes/no)? " % context):
return
if not dry:
- logs = manager.execute(scripts, serialize=serialize, async=True)
+ logs = manager.execute(scripts, serialize=serialize, run_async=True)
running = list(logs)
stdout = 0
stderr = 0
diff --git a/orchestra/contrib/orchestration/manager.py b/orchestra/contrib/orchestration/manager.py
index eafe8422..62572d04 100644
--- a/orchestra/contrib/orchestration/manager.py
+++ b/orchestra/contrib/orchestration/manager.py
@@ -97,12 +97,12 @@ def generate(operations):
return scripts, serialize
-def execute(scripts, serialize=False, async=None):
+def execute(scripts, serialize=False, run_async=None):
"""
executes the operations on the servers
-
+
serialize: execute one backend at a time
- async: do not join threads (overrides route.async)
+ run_async: do not join threads (overrides route.run_async)
"""
if settings.ORCHESTRATION_DISABLE_EXECUTION:
logger.info('Orchestration execution is dissabled by ORCHESTRATION_DISABLE_EXECUTION.')
@@ -115,12 +115,12 @@ def execute(scripts, serialize=False, async=None):
route, __, async_action = key
backend, operations = value
args = (route.host,)
- if async is None:
- is_async = not serialize and (route.async or async_action)
+ if run_async is None:
+ is_async = not serialize and (route.run_async or async_action)
else:
- is_async = not serialize and (async or async_action)
+ is_async = not serialize and (run_async or async_action)
kwargs = {
- 'async': is_async,
+ 'run_async': is_async,
}
# we clone the connection just in case we are isolated inside a transaction
with db.clone(model=BackendLog) as handle:
diff --git a/orchestra/contrib/orchestration/methods.py b/orchestra/contrib/orchestration/methods.py
index db665a0d..cd3d7a22 100644
--- a/orchestra/contrib/orchestration/methods.py
+++ b/orchestra/contrib/orchestration/methods.py
@@ -17,7 +17,7 @@ from . import settings
logger = logging.getLogger(__name__)
-def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
+def Paramiko(backend, log, server, cmds, run_async=False, paramiko_connections={}):
"""
Executes cmds to remote server using Pramaiko
"""
@@ -55,7 +55,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
channel.shutdown_write()
# Log results
logger.debug('%s running on %s' % (backend, server))
- if async:
+ if run_async:
second = False
while True:
# Non-blocking is the secret ingridient in the async sauce
@@ -78,7 +78,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
else:
log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
-
+
log.exit_code = channel.recv_exit_status()
log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
@@ -97,7 +97,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
channel.close()
-def OpenSSH(backend, log, server, cmds, async=False):
+def OpenSSH(backend, log, server, cmds, run_async=False):
"""
Executes cmds to remote server using SSH with connection resuse for maximum performance
"""
@@ -110,9 +110,9 @@ def OpenSSH(backend, log, server, cmds, async=False):
return
try:
ssh = sshrun(server.get_address(), script, executable=backend.script_executable,
- persist=True, async=async, silent=True)
+ persist=True, run_async=run_async, silent=True)
logger.debug('%s running on %s' % (backend, server))
- if async:
+ if run_async:
for state in ssh:
log.stdout += state.stdout.decode('utf8')
log.stderr += state.stderr.decode('utf8')
@@ -148,7 +148,7 @@ def SSH(*args, **kwargs):
return method(*args, **kwargs)
-def Python(backend, log, server, cmds, async=False):
+def Python(backend, log, server, cmds, run_async=False):
script = ''
functions = set()
for cmd in cmds:
@@ -170,7 +170,7 @@ def Python(backend, log, server, cmds, async=False):
log.stdout += line + '\n'
if result:
log.stdout += '# Result: %s\n' % result
- if async:
+ if run_async:
log.save(update_fields=('stdout', 'updated_at'))
except:
log.exit_code = 1
diff --git a/orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py b/orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py
new file mode 100644
index 00000000..f70deeeb
--- /dev/null
+++ b/orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py
@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# Generated by Django 1.10.5 on 2021-03-30 10:49
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('orchestration', '0008_auto_20190805_1134'),
+ ]
+
+ operations = [
+ migrations.RenameField(
+ model_name='route',
+ old_name='async',
+ new_name='run_async',
+ ),
+ migrations.AlterField(
+ model_name='route',
+ name='backend',
+ field=models.CharField(choices=[('Apache2Traffic', '[M] Apache 2 Traffic'), ('ApacheTrafficByName', '[M] ApacheTrafficByName'), ('DokuWikiMuTraffic', '[M] DokuWiki MU Traffic'), ('DovecotMaildirDisk', '[M] Dovecot Maildir size'), ('Exim4Traffic', '[M] Exim4 traffic'), ('MailmanSubscribers', '[M] Mailman subscribers'), ('MailmanTraffic', '[M] Mailman traffic'), ('MysqlDisk', '[M] MySQL disk'), ('PostfixMailscannerTraffic', '[M] Postfix-Mailscanner traffic'), ('ProxmoxOpenVZTraffic', '[M] ProxmoxOpenVZTraffic'), ('UNIXUserDisk', '[M] UNIX user disk'), ('VsFTPdTraffic', '[M] VsFTPd traffic'), ('WordpressMuTraffic', '[M] Wordpress MU Traffic'), ('NextCloudDiskQuota', '[M] nextCloud SaaS Disk Quota'), ('NextcloudTraffic', '[M] nextCloud SaaS Traffic'), ('OwnCloudDiskQuota', '[M] ownCloud SaaS Disk Quota'), ('OwncloudTraffic', '[M] ownCloud SaaS Traffic'), ('PhpListTraffic', '[M] phpList SaaS Traffic'), ('Apache2Controller', '[S] Apache 2'), ('BSCWController', '[S] BSCW SaaS'), ('Bind9MasterDomainController', '[S] Bind9 master domain'), ('Bind9SlaveDomainController', '[S] Bind9 slave domain'), ('DokuWikiMuController', '[S] DokuWiki multisite'), ('DrupalMuController', '[S] Drupal multisite'), ('GitLabSaaSController', '[S] GitLab SaaS'), ('LetsEncryptController', "[S] Let's encrypt!"), ('LxcController', '[S] LxcController'), ('AutoresponseController', '[S] Mail autoresponse'), ('MailmanController', '[S] Mailman'), ('MailmanVirtualDomainController', '[S] Mailman virtdomain-only'), ('MoodleController', '[S] Moodle'), ('MoodleWWWRootController', '[S] Moodle WWWRoot (required)'), ('MoodleMuController', '[S] Moodle multisite'), ('MySQLController', '[S] MySQL database'), ('MySQLUserController', '[S] MySQL user'), ('PHPController', '[S] PHP FPM/FCGID'), ('PostfixAddressController', '[S] Postfix address'), ('PostfixAddressVirtualDomainController', '[S] Postfix address virtdomain-only'), ('ProxmoxOVZ', '[S] ProxmoxOVZ'), ('uWSGIPythonController', '[S] Python uWSGI'), ('RoundcubeIdentityController', '[S] Roundcube Identity Controller'), ('StaticController', '[S] Static'), ('SymbolicLinkController', '[S] Symbolic link webapp'), ('UNIXUserMaildirController', '[S] UNIX maildir user'), ('UNIXUserController', '[S] UNIX user'), ('WebalizerAppController', '[S] Webalizer App'), ('WebalizerController', '[S] Webalizer Content'), ('WordPressForceSSLController', '[S] WordPress Force SSL'), ('WordPressURLController', '[S] WordPress URL'), ('WordPressController', '[S] Wordpress'), ('WordpressMuController', '[S] Wordpress multisite'), ('NextCloudController', '[S] nextCloud SaaS'), ('OwnCloudController', '[S] ownCloud SaaS'), ('PhpListSaaSController', '[S] phpList SaaS')], max_length=256, verbose_name='backend'),
+ ),
+ ]
diff --git a/orchestra/contrib/orchestration/models.py b/orchestra/contrib/orchestration/models.py
index 2952b72f..37d74702 100644
--- a/orchestra/contrib/orchestration/models.py
+++ b/orchestra/contrib/orchestration/models.py
@@ -33,22 +33,22 @@ class Server(models.Model):
os = models.CharField(_("operative system"), max_length=32,
choices=settings.ORCHESTRATION_OS_CHOICES,
default=settings.ORCHESTRATION_DEFAULT_OS)
-
+
def __str__(self):
return self.name or str(self.address)
-
+
def get_address(self):
if self.address:
return self.address
return self.name
-
+
def get_ip(self):
address = self.get_address()
try:
return validate_ip_address(address)
except ValidationError:
return socket.gethostbyname(self.name)
-
+
def clean(self):
self.name = self.name.strip()
self.address = self.address.strip()
@@ -75,7 +75,7 @@ class BackendLog(models.Model):
NOTHING = 'NOTHING'
# Special state for mocked backendlogs
EXCEPTION = 'EXCEPTION'
-
+
STATES = (
(RECEIVED, RECEIVED),
(TIMEOUT, TIMEOUT),
@@ -87,7 +87,7 @@ class BackendLog(models.Model):
(REVOKED, REVOKED),
(NOTHING, NOTHING),
)
-
+
backend = models.CharField(_("backend"), max_length=256)
state = models.CharField(_("state"), max_length=16, choices=STATES, default=RECEIVED)
server = models.ForeignKey(Server, verbose_name=_("server"), related_name='execution_logs')
@@ -100,25 +100,25 @@ class BackendLog(models.Model):
help_text="Celery task ID when used as execution backend")
created_at = models.DateTimeField(_("created"), auto_now_add=True, db_index=True)
updated_at = models.DateTimeField(_("updated"), auto_now=True)
-
+
class Meta:
get_latest_by = 'id'
-
+
def __str__(self):
return "%s@%s" % (self.backend, self.server)
-
+
@property
def execution_time(self):
return (self.updated_at-self.created_at).total_seconds()
-
+
@property
def has_finished(self):
return self.state not in (self.STARTED, self.RECEIVED)
-
+
@property
def is_success(self):
return self.state in (self.SUCCESS, self.NOTHING)
-
+
def backend_class(self):
return ServiceBackend.get_backend(self.backend)
@@ -141,20 +141,20 @@ class BackendOperation(models.Model):
content_type = models.ForeignKey(ContentType)
object_id = models.PositiveIntegerField(null=True)
instance_repr = models.CharField(_("instance representation"), max_length=256)
-
+
instance = GenericForeignKey('content_type', 'object_id')
objects = BackendOperationQuerySet.as_manager()
-
+
class Meta:
verbose_name = _("Operation")
verbose_name_plural = _("Operations")
index_together = (
('content_type', 'object_id'),
)
-
+
def __str__(self):
return '%s.%s(%s)' % (self.backend, self.action, self.instance or self.instance_repr)
-
+
@cached_property
def backend_class(self):
return ServiceBackend.get_backend(self.backend)
@@ -203,7 +203,7 @@ class Route(models.Model):
match = models.CharField(_("match"), max_length=256, blank=True, default='True',
help_text=_("Python expression used for selecting the targe host, "
"instance referes to the current object."))
- async = models.BooleanField(default=False,
+ run_async = models.BooleanField(default=False,
help_text=_("Whether or not block the request/response cycle waitting this backend to "
"finish its execution. Usually you want slave servers to run asynchronously."))
async_actions = MultiSelectField(max_length=256, blank=True,
@@ -211,19 +211,19 @@ class Route(models.Model):
# method = models.CharField(_("method"), max_lenght=32, choices=method_choices,
# default=MethodBackend.get_default())
is_active = models.BooleanField(_("active"), default=True)
-
+
objects = RouteQuerySet.as_manager()
-
+
class Meta:
unique_together = ('backend', 'host')
-
+
def __str__(self):
return "%s@%s" % (self.backend, self.host)
-
+
@cached_property
def backend_class(self):
return ServiceBackend.get_backend(self.backend)
-
+
def clean(self):
if not self.match:
self.match = 'True'
@@ -244,10 +244,10 @@ class Route(models.Model):
except Exception as exception:
name = type(exception).__name__
raise ValidationError(': '.join((name, str(exception))))
-
+
def action_is_async(self, action):
return action in self.async_actions
-
+
def matches(self, instance):
safe_locals = {
'instance': instance,
@@ -255,11 +255,11 @@ class Route(models.Model):
instance._meta.model_name: instance,
}
return eval(self.match, safe_locals)
-
+
def enable(self):
self.is_active = True
self.save()
-
+
def disable(self):
self.is_active = False
self.save()
diff --git a/orchestra/contrib/orchestration/utils.py b/orchestra/contrib/orchestration/utils.py
index 5d9d2886..9e4dd51d 100644
--- a/orchestra/contrib/orchestration/utils.py
+++ b/orchestra/contrib/orchestration/utils.py
@@ -6,11 +6,11 @@ def retrieve_state(servers):
pings = []
for server in servers:
address = server.get_address()
- ping = run('ping -c 1 -w 1 %s' % address, async=True)
+ ping = run('ping -c 1 -w 1 %s' % address, run_async=True)
pings.append(ping)
- uptime = sshrun(address, 'uptime', persist=True, async=True, options={'ConnectTimeout': 1})
+ uptime = sshrun(address, 'uptime', persist=True, run_async=True, options={'ConnectTimeout': 1})
uptimes.append(uptime)
-
+
state = {}
for server, ping, uptime in zip(servers, pings, uptimes):
ping = join(ping, silent=True)
@@ -19,7 +19,7 @@ def retrieve_state(servers):
ping = '%s ms' % ping.split('/')[4]
else:
ping = 'Offline'
-
+
uptime = join(uptime, silent=True)
uptime_stderr = uptime.stderr.decode()
uptime = uptime.stdout.decode().split()
@@ -28,5 +28,5 @@ def retrieve_state(servers):
else:
uptime = '%s' % uptime_stderr
state[server.pk] = (ping, uptime)
-
+
return state
diff --git a/orchestra/contrib/resources/actions.py b/orchestra/contrib/resources/actions.py
index a40e7aa1..ea1ec971 100644
--- a/orchestra/contrib/resources/actions.py
+++ b/orchestra/contrib/resources/actions.py
@@ -7,14 +7,14 @@ from django.utils.translation import ungettext, ugettext_lazy as _
def run_monitor(modeladmin, request, queryset):
""" Resource and ResourceData run monitors """
referer = request.META.get('HTTP_REFERER')
- async = modeladmin.model.monitor.__defaults__[0]
+ run_async = modeladmin.model.monitor.__defaults__[0]
logs = set()
for resource in queryset:
rlogs = resource.monitor()
- if not async:
+ if not run_async:
logs = logs.union(set([str(log.pk) for log in rlogs]))
modeladmin.log_change(request, resource, _("Run monitors"))
- if async:
+ if run_async:
num = len(queryset)
# TODO listfilter by uuid: task.request.id + ?task_id__in=ids
link = reverse('admin:djcelery_taskstate_changelist')
diff --git a/orchestra/contrib/resources/models.py b/orchestra/contrib/resources/models.py
index 143d0496..3da69d81 100644
--- a/orchestra/contrib/resources/models.py
+++ b/orchestra/contrib/resources/models.py
@@ -26,7 +26,7 @@ class Resource(models.Model):
Defines a resource, a resource is basically an interpretation of data
gathered by a Monitor
"""
-
+
LAST = 'LAST'
MONTHLY_SUM = 'MONTHLY_SUM'
MONTHLY_AVG = 'MONTHLY_AVG'
@@ -36,7 +36,7 @@ class Resource(models.Model):
(MONTHLY_AVG, _("Monthly avg")),
)
_related = set() # keeps track of related models for resource cleanup
-
+
name = models.CharField(_("name"), max_length=32,
help_text=_("Required. 32 characters or fewer. Lowercase letters, "
"digits and hyphen only."),
@@ -70,27 +70,27 @@ class Resource(models.Model):
choices=ServiceMonitor.get_choices(),
help_text=_("Monitor backends used for monitoring this resource."))
is_active = models.BooleanField(_("active"), default=True)
-
+
objects = ResourceQuerySet.as_manager()
-
+
class Meta:
unique_together = (
('name', 'content_type'),
('verbose_name', 'content_type')
)
-
+
def __str__(self):
return "%s-%s" % (self.content_type, self.name)
-
+
@cached_property
def aggregation_class(self):
return Aggregation.get(self.aggregation)
-
+
@cached_property
def aggregation_instance(self):
""" Per request lived type_instance """
return self.aggregation_class(self)
-
+
def clean(self):
self.verbose_name = self.verbose_name.strip()
if self.on_demand and self.default_allocation:
@@ -114,12 +114,12 @@ class Resource(models.Model):
model_name,
) for error in monitor_errors
]})
-
+
def save(self, *args, **kwargs):
super(Resource, self).save(*args, **kwargs)
# This only works on tests (multiprocessing used on real deployments)
apps.get_app_config('resources').reload_relations()
-
+
def sync_periodic_task(self, delete=False):
""" sync periodic task on save/delete resource operations """
name = 'monitor.%s' % self
@@ -140,21 +140,21 @@ class Resource(models.Model):
if task.crontab != self.crontab:
task.crontab = self.crontab
task.save(update_fields=['crontab'])
-
+
def get_model_path(self, monitor):
""" returns a model path between self.content_type and monitor.model """
resource_model = self.content_type.model_class()
monitor_model = ServiceMonitor.get_backend(monitor).model_class()
return get_model_field_path(monitor_model, resource_model)
-
+
def get_scale(self):
return eval(self.scale)
-
+
def get_verbose_name(self):
return self.verbose_name or self.name
-
- def monitor(self, async=True):
- if async:
+
+ def monitor(self, run_async=True):
+ if run_async:
return tasks.monitor.apply_async(self.pk)
return tasks.monitor(self.pk)
@@ -187,28 +187,28 @@ class ResourceData(models.Model):
allocated = models.PositiveIntegerField(_("allocated"), null=True, blank=True)
content_object_repr = models.CharField(_("content object representation"), max_length=256,
editable=False)
-
+
content_object = GenericForeignKey()
objects = ResourceDataQuerySet.as_manager()
-
+
class Meta:
unique_together = ('resource', 'content_type', 'object_id')
verbose_name_plural = _("resource data")
index_together = (
('content_type', 'object_id'),
)
-
+
def __str__(self):
return "%s: %s" % (self.resource, self.content_object)
-
+
@property
def unit(self):
return self.resource.unit
-
+
@property
def verbose_name(self):
return self.resource.verbose_name
-
+
def get_used(self):
resource = self.resource
total = 0
@@ -220,7 +220,7 @@ class ResourceData(models.Model):
has_result = True
total += usage
return float(total)/resource.get_scale() if has_result else None
-
+
def update(self, current=None):
if current is None:
current = self.get_used()
@@ -228,13 +228,13 @@ class ResourceData(models.Model):
self.updated_at = timezone.now()
self.content_object_repr = str(self.content_object)
self.save(update_fields=('used', 'updated_at', 'content_object_repr'))
-
- def monitor(self, async=False):
+
+ def monitor(self, run_async=False):
ids = (self.object_id,)
- if async:
+ if run_async:
return tasks.monitor.delay(self.resource_id, ids=ids)
return tasks.monitor(self.resource_id, ids=ids)
-
+
def get_monitor_datasets(self):
resource = self.resource
for monitor in resource.monitors:
@@ -275,20 +275,20 @@ class MonitorData(models.Model):
help_text=_("Optional field used to store current state needed for diff-based monitoring."))
content_object_repr = models.CharField(_("content object representation"), max_length=256,
editable=False)
-
+
content_object = GenericForeignKey()
objects = MonitorDataQuerySet.as_manager()
-
+
class Meta:
get_latest_by = 'id'
verbose_name_plural = _("monitor data")
index_together = (
('content_type', 'object_id'),
)
-
+
def __str__(self):
return str(self.monitor)
-
+
@cached_property
def unit(self):
return self.resource.unit
@@ -324,15 +324,15 @@ def create_resource_relation():
)
self.obj.__resource_cache[attr] = rdata
return rdata
-
+
def __get__(self, obj, cls):
""" proxy handled object """
self.obj = obj
return self
-
+
def __iter__(self):
return iter(self.obj.resource_set.all())
-
+
# Clean previous state
for related in Resource._related:
try:
@@ -344,7 +344,7 @@ def create_resource_relation():
related._meta.private_fields = [
field for field in related._meta.private_fields if field.rel.to != ResourceData
]
-
+
for ct, resources in Resource.objects.group_by('content_type').items():
model = ct.model_class()
relation = GenericRelation('resources.ResourceData')
diff --git a/orchestra/contrib/resources/tasks.py b/orchestra/contrib/resources/tasks.py
index 4b176fda..10f80729 100644
--- a/orchestra/contrib/resources/tasks.py
+++ b/orchestra/contrib/resources/tasks.py
@@ -36,8 +36,8 @@ def monitor(resource_id, ids=None):
for obj in model.objects.filter(**kwargs):
op = Operation(backend, obj, Operation.MONITOR)
monitorings.append(op)
- logs += Operation.execute(monitorings, async=False)
-
+ logs += Operation.execute(monitorings, run_async=False)
+
kwargs = {'id__in': ids} if ids else {}
# Update used resources and trigger resource exceeded and revovery
triggers = []
diff --git a/orchestra/contrib/tasks/beat.py b/orchestra/contrib/tasks/beat.py
index 81732b6d..7a4772ac 100644
--- a/orchestra/contrib/tasks/beat.py
+++ b/orchestra/contrib/tasks/beat.py
@@ -23,11 +23,11 @@ def is_due(task, time=None):
)
-def run_task(task, thread=True, process=False, async=False):
+def run_task(task, thread=True, process=False, run_async=False):
args = json.loads(task.args)
kwargs = json.loads(task.kwargs)
task_fn = current_app.tasks.get(task.task)
- if async:
+ if run_async:
method = 'process' if process else 'thread'
return apply_async(task_fn, method=method).apply_async(*args, **kwargs)
return task_fn(*args, **kwargs)
@@ -38,6 +38,6 @@ def run():
procs = []
for task in PeriodicTask.objects.enabled().select_related('crontab'):
if is_due(task, now):
- proc = run_task(task, process=True, async=True)
+ proc = run_task(task, process=True, run_async=True)
procs.append(proc)
[proc.join() for proc in procs]
diff --git a/orchestra/contrib/tasks/utils.py b/orchestra/contrib/tasks/utils.py
index 19972696..96b5bf0b 100644
--- a/orchestra/contrib/tasks/utils.py
+++ b/orchestra/contrib/tasks/utils.py
@@ -13,7 +13,7 @@ def get_name(fn):
def run(method, *args, **kwargs):
- async = kwargs.pop('async', True)
+ run_async = kwargs.pop('run_async', True)
thread = threading.Thread(target=close_connection(method), args=args, kwargs=kwargs)
thread = Process(target=close_connection(counter))
thread.start()
diff --git a/orchestra/utils/sys.py b/orchestra/utils/sys.py
index 07b0fcb4..fc732343 100644
--- a/orchestra/utils/sys.py
+++ b/orchestra/utils/sys.py
@@ -71,43 +71,43 @@ def runiterator(command, display=False, stdin=b''):
""" Subprocess wrapper for running commands concurrently """
if display:
sys.stderr.write("\n\033[1m $ %s\033[0m\n" % command)
-
+
p = subprocess.Popen(command, shell=True, executable='/bin/bash',
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
-
+
p.stdin.write(stdin)
p.stdin.close()
yield
-
+
make_async(p.stdout)
make_async(p.stderr)
-
+
# Async reading of stdout and sterr
while True:
stdout = b''
stderr = b''
# Get complete unicode chunks
select.select([p.stdout, p.stderr], [], [])
-
+
stdoutPiece = read_async(p.stdout)
stderrPiece = read_async(p.stderr)
-
+
stdout += (stdoutPiece or b'')
#.decode('ascii'), errors='replace')
stderr += (stderrPiece or b'')
#.decode('ascii'), errors='replace')
-
+
if display and stdout:
sys.stdout.write(stdout.decode('utf8'))
if display and stderr:
sys.stderr.write(stderr.decode('utf8'))
-
+
state = _Attribute(stdout)
state.stderr = stderr
state.exit_code = p.poll()
state.command = command
yield state
-
+
if state.exit_code != None:
p.stdout.close()
p.stderr.close()
@@ -121,12 +121,12 @@ def join(iterator, display=False, silent=False, valid_codes=(0,)):
for state in iterator:
stdout += state.stdout
stderr += state.stderr
-
+
exit_code = state.exit_code
-
+
out = _Attribute(stdout.strip())
err = stderr.strip()
-
+
out.failed = False
out.exit_code = exit_code
out.stderr = err
@@ -138,7 +138,7 @@ def join(iterator, display=False, silent=False, valid_codes=(0,)):
sys.stderr.write("\n\033[1;31mCommandError: %s %s\033[m\n" % (msg, err))
if not silent:
raise CommandError("%s %s" % (msg, err))
-
+
out.succeeded = not out.failed
return out
@@ -151,10 +151,10 @@ def joinall(iterators, **kwargs):
return results
-def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async=False):
+def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', run_async=False):
iterator = runiterator(command, display, stdin)
next(iterator)
- if async:
+ if run_async:
return iterator
return join(iterator, display=display, silent=silent, valid_codes=valid_codes)
@@ -213,7 +213,7 @@ class LockFile(object):
self.lockfile = lockfile
self.expire = expire
self.unlocked = unlocked
-
+
def acquire(self):
if os.path.exists(self.lockfile):
lock_time = os.path.getmtime(self.lockfile)
@@ -222,17 +222,17 @@ class LockFile(object):
return False
touch(self.lockfile)
return True
-
+
def release(self):
os.remove(self.lockfile)
-
+
def __enter__(self):
if not self.unlocked:
if not self.acquire():
raise OperationLocked("%s lock file exists and its mtime is less than %s seconds" %
(self.lockfile, self.expire))
return True
-
+
def __exit__(self, type, value, traceback):
if not self.unlocked:
self.release()
@@ -240,4 +240,4 @@ class LockFile(object):
def touch_wsgi(delay=0):
from . import paths
- run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True)
+ run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), run_async=True)