Rename async--> run_async

On Python3.5 async becames a reserved keyword.
This commit is contained in:
Santiago L 2021-03-30 12:51:12 +02:00
parent 350d93f820
commit 9d2d0befc4
17 changed files with 231 additions and 206 deletions

View file

@ -27,7 +27,7 @@ class crontab_parser(object):
_range = r'(\w+?)-(\w+)' _range = r'(\w+?)-(\w+)'
_steps = r'/(\w+)?' _steps = r'/(\w+)?'
_star = r'\*' _star = r'\*'
def __init__(self, max_=60, min_=0): def __init__(self, max_=60, min_=0):
self.max_ = max_ self.max_ = max_
self.min_ = min_ self.min_ = min_
@ -45,14 +45,14 @@ class crontab_parser(object):
raise self.ParseException('empty part') raise self.ParseException('empty part')
acc |= set(self._parse_part(part)) acc |= set(self._parse_part(part))
return acc return acc
def _parse_part(self, part): def _parse_part(self, part):
for regex, handler in self.pats: for regex, handler in self.pats:
m = regex.match(part) m = regex.match(part)
if m: if m:
return handler(m.groups()) return handler(m.groups())
return self._expand_range((part, )) return self._expand_range((part, ))
def _expand_range(self, toks): def _expand_range(self, toks):
fr = self._expand_number(toks[0]) fr = self._expand_number(toks[0])
if len(toks) > 1: if len(toks) > 1:
@ -62,19 +62,19 @@ class crontab_parser(object):
list(range(self.min_, to + 1))) list(range(self.min_, to + 1)))
return list(range(fr, to + 1)) return list(range(fr, to + 1))
return [fr] return [fr]
def _range_steps(self, toks): def _range_steps(self, toks):
if len(toks) != 3 or not toks[2]: if len(toks) != 3 or not toks[2]:
raise self.ParseException('empty filter') raise self.ParseException('empty filter')
return self._expand_range(toks[:2])[::int(toks[2])] return self._expand_range(toks[:2])[::int(toks[2])]
def _star_steps(self, toks): def _star_steps(self, toks):
if not toks or not toks[0]: if not toks or not toks[0]:
raise self.ParseException('empty filter') raise self.ParseException('empty filter')
return self._expand_star()[::int(toks[0])] return self._expand_star()[::int(toks[0])]
def _expand_star(self, *args): def _expand_star(self, *args):
return list(range(self.min_, self.max_ + self.min_)) return list(range(self.min_, self.max_ + self.min_))
def _expand_number(self, s): def _expand_number(self, s):
if isinstance(s, str) and s[0] == '-': if isinstance(s, str) and s[0] == '-':
raise self.ParseException('negative numbers not supported') raise self.ParseException('negative numbers not supported')
@ -99,7 +99,7 @@ class Setting(object):
def __init__(self, manage): def __init__(self, manage):
self.manage = manage self.manage = manage
self.settings_file = self.get_settings_file(manage) self.settings_file = self.get_settings_file(manage)
def get_settings(self): def get_settings(self):
""" get db settings from settings.py file without importing """ """ get db settings from settings.py file without importing """
settings = {'__file__': self.settings_file} settings = {'__file__': self.settings_file}
@ -111,7 +111,7 @@ class Setting(object):
content += line content += line
exec(content, settings) exec(content, settings)
return settings return settings
def get_settings_file(self, manage): def get_settings_file(self, manage):
with open(manage, 'r') as handler: with open(manage, 'r') as handler:
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"') regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
@ -128,7 +128,7 @@ class Setting(object):
class DB(object): class DB(object):
def __init__(self, settings): def __init__(self, settings):
self.settings = settings['DATABASES']['default'] self.settings = settings['DATABASES']['default']
def connect(self): def connect(self):
if self.settings['ENGINE'] == 'django.db.backends.sqlite3': if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
import sqlite3 import sqlite3
@ -138,7 +138,7 @@ class DB(object):
self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings)) self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
else: else:
raise ValueError("%s engine not supported." % self.settings['ENGINE']) raise ValueError("%s engine not supported." % self.settings['ENGINE'])
def query(self, query): def query(self, query):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
@ -147,7 +147,7 @@ class DB(object):
finally: finally:
cur.close() cur.close()
return result return result
def close(self): def close(self):
self.conn.close() self.conn.close()
@ -161,7 +161,7 @@ def fire_pending_tasks(manage, db):
"WHERE p.crontab_id = c.id AND p.enabled = {}" "WHERE p.crontab_id = c.id AND p.enabled = {}"
).format(enabled) ).format(enabled)
return db.query(query) return db.query(query)
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
return ( return (
@ -171,14 +171,14 @@ def fire_pending_tasks(manage, db):
n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and
n_month_of_year in crontab_parser(12, 1).parse(month_of_year) n_month_of_year in crontab_parser(12, 1).parse(month_of_year)
) )
now = datetime.utcnow() now = datetime.utcnow()
now = tuple(map(int, now.strftime("%M %H %w %d %m").split())) now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db): for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format( command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
manage=manage, task_id=task_id) manage=manage, task_id=task_id)
proc = run(command, async=True) proc = run(command, run_async=True)
yield proc yield proc
@ -187,7 +187,7 @@ def fire_pending_messages(settings, db):
MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24)) MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
now = datetime.utcnow() now = datetime.utcnow()
query_or = [] query_or = []
for num, seconds in enumerate(MAILER_DEFERE_SECONDS): for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds) delta = timedelta(seconds=seconds)
epoch = now-delta epoch = now-delta
@ -198,10 +198,10 @@ def fire_pending_messages(settings, db):
WHERE (mailer_message.state = 'QUEUED' WHERE (mailer_message.state = 'QUEUED'
OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or) OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
return bool(db.query(query)) return bool(db.query(query))
if has_pending_messages(settings, db): if has_pending_messages(settings, db):
command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage) command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage)
proc = run(command, async=True) proc = run(command, run_async=True)
yield proc yield proc

View file

@ -15,21 +15,21 @@ class Operation():
MONITOR = 'monitor' MONITOR = 'monitor'
EXCEEDED = 'exceeded' EXCEEDED = 'exceeded'
RECOVERY = 'recovery' RECOVERY = 'recovery'
def __str__(self): def __str__(self):
return '%s.%s(%s)' % (self.backend, self.action, self.instance) return '%s.%s(%s)' % (self.backend, self.action, self.instance)
def __repr__(self): def __repr__(self):
return str(self) return str(self)
def __hash__(self): def __hash__(self):
""" set() """ """ set() """
return hash((self.backend, self.instance, self.action)) return hash((self.backend, self.instance, self.action))
def __eq__(self, operation): def __eq__(self, operation):
""" set() """ """ set() """
return hash(self) == hash(operation) return hash(self) == hash(operation)
def __init__(self, backend, instance, action, routes=None): def __init__(self, backend, instance, action, routes=None):
self.backend = backend self.backend = backend
# instance should maintain any dynamic attribute until backend execution # instance should maintain any dynamic attribute until backend execution
@ -37,13 +37,13 @@ class Operation():
self.instance = copy.deepcopy(instance) self.instance = copy.deepcopy(instance)
self.action = action self.action = action
self.routes = routes self.routes = routes
@classmethod @classmethod
def execute(cls, operations, serialize=False, async=None): def execute(cls, operations, serialize=False, run_async=None):
from . import manager from . import manager
scripts, backend_serialize = manager.generate(operations) scripts, backend_serialize = manager.generate(operations)
return manager.execute(scripts, serialize=(serialize or backend_serialize), async=async) return manager.execute(scripts, serialize=(serialize or backend_serialize), run_async=run_async)
@classmethod @classmethod
def create_for_action(cls, instances, action): def create_for_action(cls, instances, action):
if not isinstance(instances, collections.Iterable): if not isinstance(instances, collections.Iterable):
@ -56,13 +56,13 @@ class Operation():
cls(backend_cls, instance, action) cls(backend_cls, instance, action)
) )
return operations return operations
@classmethod @classmethod
def execute_action(cls, instances, action): def execute_action(cls, instances, action):
""" instances can be an object or an iterable for batch processing """ """ instances can be an object or an iterable for batch processing """
operations = cls.create_for_action(instances, action) operations = cls.create_for_action(instances, action)
return cls.execute(operations) return cls.execute(operations)
def preload_context(self): def preload_context(self):
""" """
Heuristic: Running get_context will prevent most of related objects do not exist errors Heuristic: Running get_context will prevent most of related objects do not exist errors
@ -70,7 +70,7 @@ class Operation():
if self.action == self.DELETE: if self.action == self.DELETE:
if hasattr(self.backend, 'get_context'): if hasattr(self.backend, 'get_context'):
self.backend().get_context(self.instance) self.backend().get_context(self.instance)
def store(self, log): def store(self, log):
from .models import BackendOperation from .models import BackendOperation
return BackendOperation.objects.create( return BackendOperation.objects.create(
@ -79,7 +79,7 @@ class Operation():
instance=self.instance, instance=self.instance,
action=self.action, action=self.action,
) )
@classmethod @classmethod
def load(cls, operation, log=None): def load(cls, operation, log=None):
routes = None routes = None
@ -88,4 +88,4 @@ class Operation():
(operation.backend, operation.action): AttrDict(host=log.server) (operation.backend, operation.action): AttrDict(host=log.server)
} }
return cls(operation.backend_class, operation.instance, operation.action, routes=routes) return cls(operation.backend_class, operation.instance, operation.action, routes=routes)

View file

@ -30,25 +30,25 @@ STATE_COLORS = {
class RouteAdmin(ExtendedModelAdmin): class RouteAdmin(ExtendedModelAdmin):
list_display = ( list_display = (
'display_backend', 'host', 'match', 'display_model', 'display_actions', 'async', 'display_backend', 'host', 'match', 'display_model', 'display_actions', 'run_async',
'is_active' 'is_active'
) )
list_editable = ('host', 'match', 'async', 'is_active') list_editable = ('host', 'match', 'run_async', 'is_active')
list_filter = ('host', 'is_active', 'async', 'backend') list_filter = ('host', 'is_active', 'run_async', 'backend')
list_prefetch_related = ('host',) list_prefetch_related = ('host',)
ordering = ('backend',) ordering = ('backend',)
add_fields = ('backend', 'host', 'match', 'async', 'is_active') add_fields = ('backend', 'host', 'match', 'run_async', 'is_active')
change_form = RouteForm change_form = RouteForm
actions = (orchestrate,) actions = (orchestrate,)
change_view_actions = actions change_view_actions = actions
BACKEND_HELP_TEXT = helpers.get_backends_help_text(ServiceBackend.get_backends()) BACKEND_HELP_TEXT = helpers.get_backends_help_text(ServiceBackend.get_backends())
DEFAULT_MATCH = { DEFAULT_MATCH = {
backend.get_name(): backend.default_route_match for backend in ServiceBackend.get_backends() backend.get_name(): backend.default_route_match for backend in ServiceBackend.get_backends()
} }
display_backend = display_plugin_field('backend') display_backend = display_plugin_field('backend')
def display_model(self, route): def display_model(self, route):
try: try:
return escape(route.backend_class.model) return escape(route.backend_class.model)
@ -56,7 +56,7 @@ class RouteAdmin(ExtendedModelAdmin):
return "<span style='color: red;'>NOT AVAILABLE</span>" return "<span style='color: red;'>NOT AVAILABLE</span>"
display_model.short_description = _("model") display_model.short_description = _("model")
display_model.allow_tags = True display_model.allow_tags = True
def display_actions(self, route): def display_actions(self, route):
try: try:
return '<br>'.join(route.backend_class.get_actions()) return '<br>'.join(route.backend_class.get_actions())
@ -64,7 +64,7 @@ class RouteAdmin(ExtendedModelAdmin):
return "<span style='color: red;'>NOT AVAILABLE</span>" return "<span style='color: red;'>NOT AVAILABLE</span>"
display_actions.short_description = _("actions") display_actions.short_description = _("actions")
display_actions.allow_tags = True display_actions.allow_tags = True
def formfield_for_dbfield(self, db_field, **kwargs): def formfield_for_dbfield(self, db_field, **kwargs):
""" Provides dynamic help text on backend form field """ """ Provides dynamic help text on backend form field """
if db_field.name == 'backend': if db_field.name == 'backend':
@ -79,23 +79,23 @@ class RouteAdmin(ExtendedModelAdmin):
request._host_choices_cache = choices = list(field.choices) request._host_choices_cache = choices = list(field.choices)
field.choices = choices field.choices = choices
return field return field
def get_form(self, request, obj=None, **kwargs): def get_form(self, request, obj=None, **kwargs):
""" Include dynamic help text for existing objects """ """ Include dynamic help text for existing objects """
form = super(RouteAdmin, self).get_form(request, obj, **kwargs) form = super(RouteAdmin, self).get_form(request, obj, **kwargs)
if obj: if obj:
form.base_fields['backend'].help_text = self.BACKEND_HELP_TEXT.get(obj.backend, '') form.base_fields['backend'].help_text = self.BACKEND_HELP_TEXT.get(obj.backend, '')
return form return form
def show_orchestration_disabled(self, request): def show_orchestration_disabled(self, request):
if settings.ORCHESTRATION_DISABLE_EXECUTION: if settings.ORCHESTRATION_DISABLE_EXECUTION:
msg = _("Orchestration execution is disabled by <tt>ORCHESTRATION_DISABLE_EXECUTION</tt> setting.") msg = _("Orchestration execution is disabled by <tt>ORCHESTRATION_DISABLE_EXECUTION</tt> setting.")
self.message_user(request, mark_safe(msg), messages.WARNING) self.message_user(request, mark_safe(msg), messages.WARNING)
def changelist_view(self, request, extra_context=None): def changelist_view(self, request, extra_context=None):
self.show_orchestration_disabled(request) self.show_orchestration_disabled(request)
return super(RouteAdmin, self).changelist_view(request, extra_context) return super(RouteAdmin, self).changelist_view(request, extra_context)
def changeform_view(self, request, object_id=None, form_url='', extra_context=None): def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
self.show_orchestration_disabled(request) self.show_orchestration_disabled(request)
return super(RouteAdmin, self).changeform_view( return super(RouteAdmin, self).changeform_view(
@ -108,12 +108,12 @@ class BackendOperationInline(admin.TabularInline):
readonly_fields = ('action', 'instance_link') readonly_fields = ('action', 'instance_link')
extra = 0 extra = 0
can_delete = False can_delete = False
class Media: class Media:
css = { css = {
'all': ('orchestra/css/hide-inline-id.css',) 'all': ('orchestra/css/hide-inline-id.css',)
} }
def instance_link(self, operation): def instance_link(self, operation):
link = admin_link('instance')(self, operation) link = admin_link('instance')(self, operation)
if link == '---': if link == '---':
@ -122,10 +122,10 @@ class BackendOperationInline(admin.TabularInline):
return link return link
instance_link.allow_tags = True instance_link.allow_tags = True
instance_link.short_description = _("Instance") instance_link.short_description = _("Instance")
def has_add_permission(self, *args, **kwargs): def has_add_permission(self, *args, **kwargs):
return False return False
def get_queryset(self, request): def get_queryset(self, request):
queryset = super(BackendOperationInline, self).get_queryset(request) queryset = super(BackendOperationInline, self).get_queryset(request)
return queryset.prefetch_related('instance') return queryset.prefetch_related('instance')
@ -149,7 +149,7 @@ class BackendLogAdmin(ChangeViewActionsMixin, admin.ModelAdmin):
readonly_fields = fields readonly_fields = fields
actions = (retry_backend,) actions = (retry_backend,)
change_view_actions = actions change_view_actions = actions
server_link = admin_link('server') server_link = admin_link('server')
display_created = admin_date('created_at', short_description=_("Created")) display_created = admin_date('created_at', short_description=_("Created"))
display_state = admin_colored('state', colors=STATE_COLORS) display_state = admin_colored('state', colors=STATE_COLORS)
@ -157,17 +157,17 @@ class BackendLogAdmin(ChangeViewActionsMixin, admin.ModelAdmin):
mono_stdout = display_mono('stdout') mono_stdout = display_mono('stdout')
mono_stderr = display_mono('stderr') mono_stderr = display_mono('stderr')
mono_traceback = display_mono('traceback') mono_traceback = display_mono('traceback')
class Media: class Media:
css = { css = {
'all': ('orchestra/css/pygments/github.css',) 'all': ('orchestra/css/pygments/github.css',)
} }
def get_queryset(self, request): def get_queryset(self, request):
""" Order by structured name and imporve performance """ """ Order by structured name and imporve performance """
qs = super(BackendLogAdmin, self).get_queryset(request) qs = super(BackendLogAdmin, self).get_queryset(request)
return qs.select_related('server').defer('script', 'stdout') return qs.select_related('server').defer('script', 'stdout')
def has_add_permission(self, *args, **kwargs): def has_add_permission(self, *args, **kwargs):
return False return False
@ -177,17 +177,17 @@ class ServerAdmin(ExtendedModelAdmin):
list_filter = ('os',) list_filter = ('os',)
actions = (orchestrate,) actions = (orchestrate,)
change_view_actions = actions change_view_actions = actions
def display_ping(self, instance): def display_ping(self, instance):
return self._remote_state[instance.pk][0] return self._remote_state[instance.pk][0]
display_ping.short_description = _("Ping") display_ping.short_description = _("Ping")
display_ping.allow_tags = True display_ping.allow_tags = True
def display_uptime(self, instance): def display_uptime(self, instance):
return self._remote_state[instance.pk][1] return self._remote_state[instance.pk][1]
display_uptime.short_description = _("Uptime") display_uptime.short_description = _("Uptime")
display_uptime.allow_tags = True display_uptime.allow_tags = True
def get_queryset(self, request): def get_queryset(self, request):
""" Order by structured name and imporve performance """ """ Order by structured name and imporve performance """
qs = super(ServerAdmin, self).get_queryset(request) qs = super(ServerAdmin, self).get_queryset(request)

View file

@ -31,7 +31,7 @@ class ServiceMount(plugins.PluginMount):
class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
""" """
Service management backend base class Service management backend base class
It uses the _unit of work_ design principle, which allows bulk operations to It uses the _unit of work_ design principle, which allows bulk operations to
be conviniently supported. Each backend generates the configuration for all be conviniently supported. Each backend generates the configuration for all
the changes of all modified objects, reloading the daemon just once. the changes of all modified objects, reloading the daemon just once.
@ -52,15 +52,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
# By default backend will not run if actions do not generate insctructions, # By default backend will not run if actions do not generate insctructions,
# If your backend uses prepare() or commit() only then you should set force_empty_action_execution = True # If your backend uses prepare() or commit() only then you should set force_empty_action_execution = True
force_empty_action_execution = False force_empty_action_execution = False
def __str__(self): def __str__(self):
return type(self).__name__ return type(self).__name__
def __init__(self): def __init__(self):
self.head = [] self.head = []
self.content = [] self.content = []
self.tail = [] self.tail = []
def __getattribute__(self, attr): def __getattribute__(self, attr):
""" Select head, content or tail section depending on the method name """ """ Select head, content or tail section depending on the method name """
IGNORE_ATTRS = ( IGNORE_ATTRS = (
@ -83,29 +83,29 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
elif attr not in IGNORE_ATTRS and attr in self.actions: elif attr not in IGNORE_ATTRS and attr in self.actions:
self.set_content() self.set_content()
return super(ServiceBackend, self).__getattribute__(attr) return super(ServiceBackend, self).__getattribute__(attr)
def set_head(self): def set_head(self):
self.cmd_section = self.head self.cmd_section = self.head
def set_tail(self): def set_tail(self):
self.cmd_section = self.tail self.cmd_section = self.tail
def set_content(self): def set_content(self):
self.cmd_section = self.content self.cmd_section = self.content
@classmethod @classmethod
def get_actions(cls): def get_actions(cls):
return [ action for action in cls.actions if action in dir(cls) ] return [ action for action in cls.actions if action in dir(cls) ]
@classmethod @classmethod
def get_name(cls): def get_name(cls):
return cls.__name__ return cls.__name__
@classmethod @classmethod
def is_main(cls, obj): def is_main(cls, obj):
opts = obj._meta opts = obj._meta
return cls.model == '%s.%s' % (opts.app_label, opts.object_name) return cls.model == '%s.%s' % (opts.app_label, opts.object_name)
@classmethod @classmethod
def get_related(cls, obj): def get_related(cls, obj):
opts = obj._meta opts = obj._meta
@ -122,7 +122,7 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
return related.all() return related.all()
return [related] return [related]
return [] return []
@classmethod @classmethod
def get_backends(cls, instance=None, action=None): def get_backends(cls, instance=None, action=None):
backends = cls.get_plugins() backends = cls.get_plugins()
@ -140,15 +140,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
if include: if include:
included.append(backend) included.append(backend)
return included return included
@classmethod @classmethod
def get_backend(cls, name): def get_backend(cls, name):
return cls.get(name) return cls.get(name)
@classmethod @classmethod
def model_class(cls): def model_class(cls):
return apps.get_model(cls.model) return apps.get_model(cls.model)
@property @property
def scripts(self): def scripts(self):
""" group commands based on their method """ """ group commands based on their method """
@ -163,12 +163,12 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
except KeyError: except KeyError:
pass pass
return list(scripts.items()) return list(scripts.items())
def get_banner(self): def get_banner(self):
now = timezone.localtime(timezone.now()) now = timezone.localtime(timezone.now())
time = now.strftime("%h %d, %Y %I:%M:%S %Z") time = now.strftime("%h %d, %Y %I:%M:%S %Z")
return "Generated by Orchestra at %s" % time return "Generated by Orchestra at %s" % time
def create_log(self, server, **kwargs): def create_log(self, server, **kwargs):
from .models import BackendLog from .models import BackendLog
state = BackendLog.RECEIVED state = BackendLog.RECEIVED
@ -181,8 +181,8 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
manager = manager.using(using) manager = manager.using(using)
log = manager.create(backend=self.get_name(), state=state, server=server) log = manager.create(backend=self.get_name(), state=state, server=server)
return log return log
def execute(self, server, async=False, log=None): def execute(self, server, run_async=False, log=None):
from .models import BackendLog from .models import BackendLog
if log is None: if log is None:
log = self.create_log(server) log = self.create_log(server)
@ -190,11 +190,11 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
if run: if run:
scripts = self.scripts scripts = self.scripts
for method, commands in scripts: for method, commands in scripts:
method(log, server, commands, async) method(log, server, commands, run_async)
if log.state != BackendLog.SUCCESS: if log.state != BackendLog.SUCCESS:
break break
return log return log
def append(self, *cmd): def append(self, *cmd):
# aggregate commands acording to its execution method # aggregate commands acording to its execution method
if isinstance(cmd[0], str): if isinstance(cmd[0], str):
@ -207,10 +207,10 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
self.cmd_section.append((method, [cmd])) self.cmd_section.append((method, [cmd]))
else: else:
self.cmd_section[-1][1].append(cmd) self.cmd_section[-1][1].append(cmd)
def get_context(self, obj): def get_context(self, obj):
return {} return {}
def prepare(self): def prepare(self):
""" """
hook for executing something at the beging hook for executing something at the beging
@ -221,7 +221,7 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
set -o pipefail set -o pipefail
exit_code=0""") exit_code=0""")
) )
def commit(self): def commit(self):
""" """
hook for executing something at the end hook for executing something at the end
@ -235,11 +235,11 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
class ServiceController(ServiceBackend): class ServiceController(ServiceBackend):
actions = ('save', 'delete') actions = ('save', 'delete')
abstract = True abstract = True
@classmethod @classmethod
def get_verbose_name(cls): def get_verbose_name(cls):
return _("[S] %s") % super(ServiceController, cls).get_verbose_name() return _("[S] %s") % super(ServiceController, cls).get_verbose_name()
@classmethod @classmethod
def get_backends(cls): def get_backends(cls):
""" filter controller classes """ """ filter controller classes """

View file

@ -105,7 +105,7 @@ def get_backend_url(ids):
def get_messages(logs): def get_messages(logs):
messages = [] messages = []
total, successes, async = 0, 0, 0 total, successes, run_async = 0, 0, 0
ids = [] ids = []
async_ids = [] async_ids = []
for log in logs: for log in logs:
@ -118,17 +118,17 @@ def get_messages(logs):
if log.is_success: if log.is_success:
successes += 1 successes += 1
elif not log.has_finished: elif not log.has_finished:
async += 1 run_async += 1
async_ids.append(log.id) async_ids.append(log.id)
errors = total-successes-async errors = total-successes-run_async
url = get_backend_url(ids) url = get_backend_url(ids)
async_url = get_backend_url(async_ids) async_url = get_backend_url(async_ids)
async_msg = '' async_msg = ''
if async: if run_async:
async_msg = ungettext( async_msg = ungettext(
_('<a href="{async_url}">{name}</a> is running on the background'), _('<a href="{async_url}">{name}</a> is running on the background'),
_('<a href="{async_url}">{async} backends</a> are running on the background'), _('<a href="{async_url}">{run_async} backends</a> are running on the background'),
async) run_async)
if errors: if errors:
if total == 1: if total == 1:
msg = _('<a href="{url}">{name}</a> has fail to execute') msg = _('<a href="{url}">{name}</a> has fail to execute')
@ -139,7 +139,7 @@ def get_messages(logs):
errors) errors)
if async_msg: if async_msg:
msg += ', ' + str(async_msg) msg += ', ' + str(async_msg)
msg = msg.format(errors=errors, async=async, async_url=async_url, total=total, url=url, msg = msg.format(errors=errors, run_async=run_async, async_url=async_url, total=total, url=url,
name=log.backend) name=log.backend)
messages.append(('error', msg + '.')) messages.append(('error', msg + '.'))
elif successes: elif successes:
@ -158,12 +158,12 @@ def get_messages(logs):
_('<a href="{url}">{total} backends</a> have been executed'), _('<a href="{url}">{total} backends</a> have been executed'),
total) total)
msg = msg.format( msg = msg.format(
total=total, url=url, async_url=async_url, async=async, successes=successes, total=total, url=url, async_url=async_url, run_async=run_async, successes=successes,
name=log.backend name=log.backend
) )
messages.append(('success', msg + '.')) messages.append(('success', msg + '.'))
else: else:
msg = async_msg.format(url=url, async_url=async_url, async=async, name=log.backend) msg = async_msg.format(url=url, async_url=async_url, run_async=run_async, name=log.backend)
messages.append(('success', msg + '.')) messages.append(('success', msg + '.'))
return messages return messages

View file

@ -12,7 +12,7 @@ from orchestra.utils.sys import confirm
class Command(BaseCommand): class Command(BaseCommand):
help = 'Runs orchestration backends.' help = 'Runs orchestration backends.'
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument('model', nargs='?', parser.add_argument('model', nargs='?',
help='Label of a model to execute the orchestration.') help='Label of a model to execute the orchestration.')
@ -30,8 +30,8 @@ class Command(BaseCommand):
help='List available baclends.') help='List available baclends.')
parser.add_argument('--dry-run', action='store_true', dest='dry', default=False, parser.add_argument('--dry-run', action='store_true', dest='dry', default=False,
help='Only prints scrtipt.') help='Only prints scrtipt.')
def collect_operations(self, **options): def collect_operations(self, **options):
model = options.get('model') model = options.get('model')
backends = options.get('backends') or set() backends = options.get('backends') or set()
@ -66,7 +66,7 @@ class Command(BaseCommand):
model = apps.get_model(*model.split('.')) model = apps.get_model(*model.split('.'))
queryset = model.objects.filter(**kwargs).order_by('id') queryset = model.objects.filter(**kwargs).order_by('id')
querysets = [queryset] querysets = [queryset]
operations = OrderedSet() operations = OrderedSet()
route_cache = {} route_cache = {}
for queryset in querysets: for queryset in querysets:
@ -88,7 +88,7 @@ class Command(BaseCommand):
result.append(operation) result.append(operation)
operations = result operations = result
return operations return operations
def handle(self, *args, **options): def handle(self, *args, **options):
list_backends = options.get('list_backends') list_backends = options.get('list_backends')
if list_backends: if list_backends:
@ -116,7 +116,7 @@ class Command(BaseCommand):
if not confirm("\n\nAre your sure to execute the previous scripts on %(servers)s (yes/no)? " % context): if not confirm("\n\nAre your sure to execute the previous scripts on %(servers)s (yes/no)? " % context):
return return
if not dry: if not dry:
logs = manager.execute(scripts, serialize=serialize, async=True) logs = manager.execute(scripts, serialize=serialize, run_async=True)
running = list(logs) running = list(logs)
stdout = 0 stdout = 0
stderr = 0 stderr = 0

View file

@ -97,12 +97,12 @@ def generate(operations):
return scripts, serialize return scripts, serialize
def execute(scripts, serialize=False, async=None): def execute(scripts, serialize=False, run_async=None):
""" """
executes the operations on the servers executes the operations on the servers
serialize: execute one backend at a time serialize: execute one backend at a time
async: do not join threads (overrides route.async) run_async: do not join threads (overrides route.run_async)
""" """
if settings.ORCHESTRATION_DISABLE_EXECUTION: if settings.ORCHESTRATION_DISABLE_EXECUTION:
logger.info('Orchestration execution is dissabled by ORCHESTRATION_DISABLE_EXECUTION.') logger.info('Orchestration execution is dissabled by ORCHESTRATION_DISABLE_EXECUTION.')
@ -115,12 +115,12 @@ def execute(scripts, serialize=False, async=None):
route, __, async_action = key route, __, async_action = key
backend, operations = value backend, operations = value
args = (route.host,) args = (route.host,)
if async is None: if run_async is None:
is_async = not serialize and (route.async or async_action) is_async = not serialize and (route.run_async or async_action)
else: else:
is_async = not serialize and (async or async_action) is_async = not serialize and (run_async or async_action)
kwargs = { kwargs = {
'async': is_async, 'run_async': is_async,
} }
# we clone the connection just in case we are isolated inside a transaction # we clone the connection just in case we are isolated inside a transaction
with db.clone(model=BackendLog) as handle: with db.clone(model=BackendLog) as handle:

View file

@ -17,7 +17,7 @@ from . import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}): def Paramiko(backend, log, server, cmds, run_async=False, paramiko_connections={}):
""" """
Executes cmds to remote server using Pramaiko Executes cmds to remote server using Pramaiko
""" """
@ -55,7 +55,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
channel.shutdown_write() channel.shutdown_write()
# Log results # Log results
logger.debug('%s running on %s' % (backend, server)) logger.debug('%s running on %s' % (backend, server))
if async: if run_async:
second = False second = False
while True: while True:
# Non-blocking is the secret ingridient in the async sauce # Non-blocking is the secret ingridient in the async sauce
@ -78,7 +78,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
else: else:
log.stdout += channel.makefile('rb', -1).read().decode('utf-8') log.stdout += channel.makefile('rb', -1).read().decode('utf-8')
log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8') log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8')
log.exit_code = channel.recv_exit_status() log.exit_code = channel.recv_exit_status()
log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE
logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) logger.debug('%s execution state on %s is %s' % (backend, server, log.state))
@ -97,7 +97,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}):
channel.close() channel.close()
def OpenSSH(backend, log, server, cmds, async=False): def OpenSSH(backend, log, server, cmds, run_async=False):
""" """
Executes cmds to remote server using SSH with connection resuse for maximum performance Executes cmds to remote server using SSH with connection resuse for maximum performance
""" """
@ -110,9 +110,9 @@ def OpenSSH(backend, log, server, cmds, async=False):
return return
try: try:
ssh = sshrun(server.get_address(), script, executable=backend.script_executable, ssh = sshrun(server.get_address(), script, executable=backend.script_executable,
persist=True, async=async, silent=True) persist=True, run_async=run_async, silent=True)
logger.debug('%s running on %s' % (backend, server)) logger.debug('%s running on %s' % (backend, server))
if async: if run_async:
for state in ssh: for state in ssh:
log.stdout += state.stdout.decode('utf8') log.stdout += state.stdout.decode('utf8')
log.stderr += state.stderr.decode('utf8') log.stderr += state.stderr.decode('utf8')
@ -148,7 +148,7 @@ def SSH(*args, **kwargs):
return method(*args, **kwargs) return method(*args, **kwargs)
def Python(backend, log, server, cmds, async=False): def Python(backend, log, server, cmds, run_async=False):
script = '' script = ''
functions = set() functions = set()
for cmd in cmds: for cmd in cmds:
@ -170,7 +170,7 @@ def Python(backend, log, server, cmds, async=False):
log.stdout += line + '\n' log.stdout += line + '\n'
if result: if result:
log.stdout += '# Result: %s\n' % result log.stdout += '# Result: %s\n' % result
if async: if run_async:
log.save(update_fields=('stdout', 'updated_at')) log.save(update_fields=('stdout', 'updated_at'))
except: except:
log.exit_code = 1 log.exit_code = 1

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2021-03-30 10:49
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('orchestration', '0008_auto_20190805_1134'),
]
operations = [
migrations.RenameField(
model_name='route',
old_name='async',
new_name='run_async',
),
migrations.AlterField(
model_name='route',
name='backend',
field=models.CharField(choices=[('Apache2Traffic', '[M] Apache 2 Traffic'), ('ApacheTrafficByName', '[M] ApacheTrafficByName'), ('DokuWikiMuTraffic', '[M] DokuWiki MU Traffic'), ('DovecotMaildirDisk', '[M] Dovecot Maildir size'), ('Exim4Traffic', '[M] Exim4 traffic'), ('MailmanSubscribers', '[M] Mailman subscribers'), ('MailmanTraffic', '[M] Mailman traffic'), ('MysqlDisk', '[M] MySQL disk'), ('PostfixMailscannerTraffic', '[M] Postfix-Mailscanner traffic'), ('ProxmoxOpenVZTraffic', '[M] ProxmoxOpenVZTraffic'), ('UNIXUserDisk', '[M] UNIX user disk'), ('VsFTPdTraffic', '[M] VsFTPd traffic'), ('WordpressMuTraffic', '[M] Wordpress MU Traffic'), ('NextCloudDiskQuota', '[M] nextCloud SaaS Disk Quota'), ('NextcloudTraffic', '[M] nextCloud SaaS Traffic'), ('OwnCloudDiskQuota', '[M] ownCloud SaaS Disk Quota'), ('OwncloudTraffic', '[M] ownCloud SaaS Traffic'), ('PhpListTraffic', '[M] phpList SaaS Traffic'), ('Apache2Controller', '[S] Apache 2'), ('BSCWController', '[S] BSCW SaaS'), ('Bind9MasterDomainController', '[S] Bind9 master domain'), ('Bind9SlaveDomainController', '[S] Bind9 slave domain'), ('DokuWikiMuController', '[S] DokuWiki multisite'), ('DrupalMuController', '[S] Drupal multisite'), ('GitLabSaaSController', '[S] GitLab SaaS'), ('LetsEncryptController', "[S] Let's encrypt!"), ('LxcController', '[S] LxcController'), ('AutoresponseController', '[S] Mail autoresponse'), ('MailmanController', '[S] Mailman'), ('MailmanVirtualDomainController', '[S] Mailman virtdomain-only'), ('MoodleController', '[S] Moodle'), ('MoodleWWWRootController', '[S] Moodle WWWRoot (required)'), ('MoodleMuController', '[S] Moodle multisite'), ('MySQLController', '[S] MySQL database'), ('MySQLUserController', '[S] MySQL user'), ('PHPController', '[S] PHP FPM/FCGID'), ('PostfixAddressController', '[S] Postfix address'), ('PostfixAddressVirtualDomainController', '[S] Postfix address virtdomain-only'), ('ProxmoxOVZ', '[S] ProxmoxOVZ'), ('uWSGIPythonController', '[S] Python uWSGI'), ('RoundcubeIdentityController', '[S] Roundcube Identity Controller'), ('StaticController', '[S] Static'), ('SymbolicLinkController', '[S] Symbolic link webapp'), ('UNIXUserMaildirController', '[S] UNIX maildir user'), ('UNIXUserController', '[S] UNIX user'), ('WebalizerAppController', '[S] Webalizer App'), ('WebalizerController', '[S] Webalizer Content'), ('WordPressForceSSLController', '[S] WordPress Force SSL'), ('WordPressURLController', '[S] WordPress URL'), ('WordPressController', '[S] Wordpress'), ('WordpressMuController', '[S] Wordpress multisite'), ('NextCloudController', '[S] nextCloud SaaS'), ('OwnCloudController', '[S] ownCloud SaaS'), ('PhpListSaaSController', '[S] phpList SaaS')], max_length=256, verbose_name='backend'),
),
]

View file

@ -33,22 +33,22 @@ class Server(models.Model):
os = models.CharField(_("operative system"), max_length=32, os = models.CharField(_("operative system"), max_length=32,
choices=settings.ORCHESTRATION_OS_CHOICES, choices=settings.ORCHESTRATION_OS_CHOICES,
default=settings.ORCHESTRATION_DEFAULT_OS) default=settings.ORCHESTRATION_DEFAULT_OS)
def __str__(self): def __str__(self):
return self.name or str(self.address) return self.name or str(self.address)
def get_address(self): def get_address(self):
if self.address: if self.address:
return self.address return self.address
return self.name return self.name
def get_ip(self): def get_ip(self):
address = self.get_address() address = self.get_address()
try: try:
return validate_ip_address(address) return validate_ip_address(address)
except ValidationError: except ValidationError:
return socket.gethostbyname(self.name) return socket.gethostbyname(self.name)
def clean(self): def clean(self):
self.name = self.name.strip() self.name = self.name.strip()
self.address = self.address.strip() self.address = self.address.strip()
@ -75,7 +75,7 @@ class BackendLog(models.Model):
NOTHING = 'NOTHING' NOTHING = 'NOTHING'
# Special state for mocked backendlogs # Special state for mocked backendlogs
EXCEPTION = 'EXCEPTION' EXCEPTION = 'EXCEPTION'
STATES = ( STATES = (
(RECEIVED, RECEIVED), (RECEIVED, RECEIVED),
(TIMEOUT, TIMEOUT), (TIMEOUT, TIMEOUT),
@ -87,7 +87,7 @@ class BackendLog(models.Model):
(REVOKED, REVOKED), (REVOKED, REVOKED),
(NOTHING, NOTHING), (NOTHING, NOTHING),
) )
backend = models.CharField(_("backend"), max_length=256) backend = models.CharField(_("backend"), max_length=256)
state = models.CharField(_("state"), max_length=16, choices=STATES, default=RECEIVED) state = models.CharField(_("state"), max_length=16, choices=STATES, default=RECEIVED)
server = models.ForeignKey(Server, verbose_name=_("server"), related_name='execution_logs') server = models.ForeignKey(Server, verbose_name=_("server"), related_name='execution_logs')
@ -100,25 +100,25 @@ class BackendLog(models.Model):
help_text="Celery task ID when used as execution backend") help_text="Celery task ID when used as execution backend")
created_at = models.DateTimeField(_("created"), auto_now_add=True, db_index=True) created_at = models.DateTimeField(_("created"), auto_now_add=True, db_index=True)
updated_at = models.DateTimeField(_("updated"), auto_now=True) updated_at = models.DateTimeField(_("updated"), auto_now=True)
class Meta: class Meta:
get_latest_by = 'id' get_latest_by = 'id'
def __str__(self): def __str__(self):
return "%s@%s" % (self.backend, self.server) return "%s@%s" % (self.backend, self.server)
@property @property
def execution_time(self): def execution_time(self):
return (self.updated_at-self.created_at).total_seconds() return (self.updated_at-self.created_at).total_seconds()
@property @property
def has_finished(self): def has_finished(self):
return self.state not in (self.STARTED, self.RECEIVED) return self.state not in (self.STARTED, self.RECEIVED)
@property @property
def is_success(self): def is_success(self):
return self.state in (self.SUCCESS, self.NOTHING) return self.state in (self.SUCCESS, self.NOTHING)
def backend_class(self): def backend_class(self):
return ServiceBackend.get_backend(self.backend) return ServiceBackend.get_backend(self.backend)
@ -141,20 +141,20 @@ class BackendOperation(models.Model):
content_type = models.ForeignKey(ContentType) content_type = models.ForeignKey(ContentType)
object_id = models.PositiveIntegerField(null=True) object_id = models.PositiveIntegerField(null=True)
instance_repr = models.CharField(_("instance representation"), max_length=256) instance_repr = models.CharField(_("instance representation"), max_length=256)
instance = GenericForeignKey('content_type', 'object_id') instance = GenericForeignKey('content_type', 'object_id')
objects = BackendOperationQuerySet.as_manager() objects = BackendOperationQuerySet.as_manager()
class Meta: class Meta:
verbose_name = _("Operation") verbose_name = _("Operation")
verbose_name_plural = _("Operations") verbose_name_plural = _("Operations")
index_together = ( index_together = (
('content_type', 'object_id'), ('content_type', 'object_id'),
) )
def __str__(self): def __str__(self):
return '%s.%s(%s)' % (self.backend, self.action, self.instance or self.instance_repr) return '%s.%s(%s)' % (self.backend, self.action, self.instance or self.instance_repr)
@cached_property @cached_property
def backend_class(self): def backend_class(self):
return ServiceBackend.get_backend(self.backend) return ServiceBackend.get_backend(self.backend)
@ -203,7 +203,7 @@ class Route(models.Model):
match = models.CharField(_("match"), max_length=256, blank=True, default='True', match = models.CharField(_("match"), max_length=256, blank=True, default='True',
help_text=_("Python expression used for selecting the targe host, " help_text=_("Python expression used for selecting the targe host, "
"<em>instance</em> referes to the current object.")) "<em>instance</em> referes to the current object."))
async = models.BooleanField(default=False, run_async = models.BooleanField(default=False,
help_text=_("Whether or not block the request/response cycle waitting this backend to " help_text=_("Whether or not block the request/response cycle waitting this backend to "
"finish its execution. Usually you want slave servers to run asynchronously.")) "finish its execution. Usually you want slave servers to run asynchronously."))
async_actions = MultiSelectField(max_length=256, blank=True, async_actions = MultiSelectField(max_length=256, blank=True,
@ -211,19 +211,19 @@ class Route(models.Model):
# method = models.CharField(_("method"), max_lenght=32, choices=method_choices, # method = models.CharField(_("method"), max_lenght=32, choices=method_choices,
# default=MethodBackend.get_default()) # default=MethodBackend.get_default())
is_active = models.BooleanField(_("active"), default=True) is_active = models.BooleanField(_("active"), default=True)
objects = RouteQuerySet.as_manager() objects = RouteQuerySet.as_manager()
class Meta: class Meta:
unique_together = ('backend', 'host') unique_together = ('backend', 'host')
def __str__(self): def __str__(self):
return "%s@%s" % (self.backend, self.host) return "%s@%s" % (self.backend, self.host)
@cached_property @cached_property
def backend_class(self): def backend_class(self):
return ServiceBackend.get_backend(self.backend) return ServiceBackend.get_backend(self.backend)
def clean(self): def clean(self):
if not self.match: if not self.match:
self.match = 'True' self.match = 'True'
@ -244,10 +244,10 @@ class Route(models.Model):
except Exception as exception: except Exception as exception:
name = type(exception).__name__ name = type(exception).__name__
raise ValidationError(': '.join((name, str(exception)))) raise ValidationError(': '.join((name, str(exception))))
def action_is_async(self, action): def action_is_async(self, action):
return action in self.async_actions return action in self.async_actions
def matches(self, instance): def matches(self, instance):
safe_locals = { safe_locals = {
'instance': instance, 'instance': instance,
@ -255,11 +255,11 @@ class Route(models.Model):
instance._meta.model_name: instance, instance._meta.model_name: instance,
} }
return eval(self.match, safe_locals) return eval(self.match, safe_locals)
def enable(self): def enable(self):
self.is_active = True self.is_active = True
self.save() self.save()
def disable(self): def disable(self):
self.is_active = False self.is_active = False
self.save() self.save()

View file

@ -6,11 +6,11 @@ def retrieve_state(servers):
pings = [] pings = []
for server in servers: for server in servers:
address = server.get_address() address = server.get_address()
ping = run('ping -c 1 -w 1 %s' % address, async=True) ping = run('ping -c 1 -w 1 %s' % address, run_async=True)
pings.append(ping) pings.append(ping)
uptime = sshrun(address, 'uptime', persist=True, async=True, options={'ConnectTimeout': 1}) uptime = sshrun(address, 'uptime', persist=True, run_async=True, options={'ConnectTimeout': 1})
uptimes.append(uptime) uptimes.append(uptime)
state = {} state = {}
for server, ping, uptime in zip(servers, pings, uptimes): for server, ping, uptime in zip(servers, pings, uptimes):
ping = join(ping, silent=True) ping = join(ping, silent=True)
@ -19,7 +19,7 @@ def retrieve_state(servers):
ping = '%s ms' % ping.split('/')[4] ping = '%s ms' % ping.split('/')[4]
else: else:
ping = '<span style="color:red">Offline</span>' ping = '<span style="color:red">Offline</span>'
uptime = join(uptime, silent=True) uptime = join(uptime, silent=True)
uptime_stderr = uptime.stderr.decode() uptime_stderr = uptime.stderr.decode()
uptime = uptime.stdout.decode().split() uptime = uptime.stdout.decode().split()
@ -28,5 +28,5 @@ def retrieve_state(servers):
else: else:
uptime = '<span style="color:red">%s</span>' % uptime_stderr uptime = '<span style="color:red">%s</span>' % uptime_stderr
state[server.pk] = (ping, uptime) state[server.pk] = (ping, uptime)
return state return state

View file

@ -7,14 +7,14 @@ from django.utils.translation import ungettext, ugettext_lazy as _
def run_monitor(modeladmin, request, queryset): def run_monitor(modeladmin, request, queryset):
""" Resource and ResourceData run monitors """ """ Resource and ResourceData run monitors """
referer = request.META.get('HTTP_REFERER') referer = request.META.get('HTTP_REFERER')
async = modeladmin.model.monitor.__defaults__[0] run_async = modeladmin.model.monitor.__defaults__[0]
logs = set() logs = set()
for resource in queryset: for resource in queryset:
rlogs = resource.monitor() rlogs = resource.monitor()
if not async: if not run_async:
logs = logs.union(set([str(log.pk) for log in rlogs])) logs = logs.union(set([str(log.pk) for log in rlogs]))
modeladmin.log_change(request, resource, _("Run monitors")) modeladmin.log_change(request, resource, _("Run monitors"))
if async: if run_async:
num = len(queryset) num = len(queryset)
# TODO listfilter by uuid: task.request.id + ?task_id__in=ids # TODO listfilter by uuid: task.request.id + ?task_id__in=ids
link = reverse('admin:djcelery_taskstate_changelist') link = reverse('admin:djcelery_taskstate_changelist')

View file

@ -26,7 +26,7 @@ class Resource(models.Model):
Defines a resource, a resource is basically an interpretation of data Defines a resource, a resource is basically an interpretation of data
gathered by a Monitor gathered by a Monitor
""" """
LAST = 'LAST' LAST = 'LAST'
MONTHLY_SUM = 'MONTHLY_SUM' MONTHLY_SUM = 'MONTHLY_SUM'
MONTHLY_AVG = 'MONTHLY_AVG' MONTHLY_AVG = 'MONTHLY_AVG'
@ -36,7 +36,7 @@ class Resource(models.Model):
(MONTHLY_AVG, _("Monthly avg")), (MONTHLY_AVG, _("Monthly avg")),
) )
_related = set() # keeps track of related models for resource cleanup _related = set() # keeps track of related models for resource cleanup
name = models.CharField(_("name"), max_length=32, name = models.CharField(_("name"), max_length=32,
help_text=_("Required. 32 characters or fewer. Lowercase letters, " help_text=_("Required. 32 characters or fewer. Lowercase letters, "
"digits and hyphen only."), "digits and hyphen only."),
@ -70,27 +70,27 @@ class Resource(models.Model):
choices=ServiceMonitor.get_choices(), choices=ServiceMonitor.get_choices(),
help_text=_("Monitor backends used for monitoring this resource.")) help_text=_("Monitor backends used for monitoring this resource."))
is_active = models.BooleanField(_("active"), default=True) is_active = models.BooleanField(_("active"), default=True)
objects = ResourceQuerySet.as_manager() objects = ResourceQuerySet.as_manager()
class Meta: class Meta:
unique_together = ( unique_together = (
('name', 'content_type'), ('name', 'content_type'),
('verbose_name', 'content_type') ('verbose_name', 'content_type')
) )
def __str__(self): def __str__(self):
return "%s-%s" % (self.content_type, self.name) return "%s-%s" % (self.content_type, self.name)
@cached_property @cached_property
def aggregation_class(self): def aggregation_class(self):
return Aggregation.get(self.aggregation) return Aggregation.get(self.aggregation)
@cached_property @cached_property
def aggregation_instance(self): def aggregation_instance(self):
""" Per request lived type_instance """ """ Per request lived type_instance """
return self.aggregation_class(self) return self.aggregation_class(self)
def clean(self): def clean(self):
self.verbose_name = self.verbose_name.strip() self.verbose_name = self.verbose_name.strip()
if self.on_demand and self.default_allocation: if self.on_demand and self.default_allocation:
@ -114,12 +114,12 @@ class Resource(models.Model):
model_name, model_name,
) for error in monitor_errors ) for error in monitor_errors
]}) ]})
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
super(Resource, self).save(*args, **kwargs) super(Resource, self).save(*args, **kwargs)
# This only works on tests (multiprocessing used on real deployments) # This only works on tests (multiprocessing used on real deployments)
apps.get_app_config('resources').reload_relations() apps.get_app_config('resources').reload_relations()
def sync_periodic_task(self, delete=False): def sync_periodic_task(self, delete=False):
""" sync periodic task on save/delete resource operations """ """ sync periodic task on save/delete resource operations """
name = 'monitor.%s' % self name = 'monitor.%s' % self
@ -140,21 +140,21 @@ class Resource(models.Model):
if task.crontab != self.crontab: if task.crontab != self.crontab:
task.crontab = self.crontab task.crontab = self.crontab
task.save(update_fields=['crontab']) task.save(update_fields=['crontab'])
def get_model_path(self, monitor): def get_model_path(self, monitor):
""" returns a model path between self.content_type and monitor.model """ """ returns a model path between self.content_type and monitor.model """
resource_model = self.content_type.model_class() resource_model = self.content_type.model_class()
monitor_model = ServiceMonitor.get_backend(monitor).model_class() monitor_model = ServiceMonitor.get_backend(monitor).model_class()
return get_model_field_path(monitor_model, resource_model) return get_model_field_path(monitor_model, resource_model)
def get_scale(self): def get_scale(self):
return eval(self.scale) return eval(self.scale)
def get_verbose_name(self): def get_verbose_name(self):
return self.verbose_name or self.name return self.verbose_name or self.name
def monitor(self, async=True): def monitor(self, run_async=True):
if async: if run_async:
return tasks.monitor.apply_async(self.pk) return tasks.monitor.apply_async(self.pk)
return tasks.monitor(self.pk) return tasks.monitor(self.pk)
@ -187,28 +187,28 @@ class ResourceData(models.Model):
allocated = models.PositiveIntegerField(_("allocated"), null=True, blank=True) allocated = models.PositiveIntegerField(_("allocated"), null=True, blank=True)
content_object_repr = models.CharField(_("content object representation"), max_length=256, content_object_repr = models.CharField(_("content object representation"), max_length=256,
editable=False) editable=False)
content_object = GenericForeignKey() content_object = GenericForeignKey()
objects = ResourceDataQuerySet.as_manager() objects = ResourceDataQuerySet.as_manager()
class Meta: class Meta:
unique_together = ('resource', 'content_type', 'object_id') unique_together = ('resource', 'content_type', 'object_id')
verbose_name_plural = _("resource data") verbose_name_plural = _("resource data")
index_together = ( index_together = (
('content_type', 'object_id'), ('content_type', 'object_id'),
) )
def __str__(self): def __str__(self):
return "%s: %s" % (self.resource, self.content_object) return "%s: %s" % (self.resource, self.content_object)
@property @property
def unit(self): def unit(self):
return self.resource.unit return self.resource.unit
@property @property
def verbose_name(self): def verbose_name(self):
return self.resource.verbose_name return self.resource.verbose_name
def get_used(self): def get_used(self):
resource = self.resource resource = self.resource
total = 0 total = 0
@ -220,7 +220,7 @@ class ResourceData(models.Model):
has_result = True has_result = True
total += usage total += usage
return float(total)/resource.get_scale() if has_result else None return float(total)/resource.get_scale() if has_result else None
def update(self, current=None): def update(self, current=None):
if current is None: if current is None:
current = self.get_used() current = self.get_used()
@ -228,13 +228,13 @@ class ResourceData(models.Model):
self.updated_at = timezone.now() self.updated_at = timezone.now()
self.content_object_repr = str(self.content_object) self.content_object_repr = str(self.content_object)
self.save(update_fields=('used', 'updated_at', 'content_object_repr')) self.save(update_fields=('used', 'updated_at', 'content_object_repr'))
def monitor(self, async=False): def monitor(self, run_async=False):
ids = (self.object_id,) ids = (self.object_id,)
if async: if run_async:
return tasks.monitor.delay(self.resource_id, ids=ids) return tasks.monitor.delay(self.resource_id, ids=ids)
return tasks.monitor(self.resource_id, ids=ids) return tasks.monitor(self.resource_id, ids=ids)
def get_monitor_datasets(self): def get_monitor_datasets(self):
resource = self.resource resource = self.resource
for monitor in resource.monitors: for monitor in resource.monitors:
@ -275,20 +275,20 @@ class MonitorData(models.Model):
help_text=_("Optional field used to store current state needed for diff-based monitoring.")) help_text=_("Optional field used to store current state needed for diff-based monitoring."))
content_object_repr = models.CharField(_("content object representation"), max_length=256, content_object_repr = models.CharField(_("content object representation"), max_length=256,
editable=False) editable=False)
content_object = GenericForeignKey() content_object = GenericForeignKey()
objects = MonitorDataQuerySet.as_manager() objects = MonitorDataQuerySet.as_manager()
class Meta: class Meta:
get_latest_by = 'id' get_latest_by = 'id'
verbose_name_plural = _("monitor data") verbose_name_plural = _("monitor data")
index_together = ( index_together = (
('content_type', 'object_id'), ('content_type', 'object_id'),
) )
def __str__(self): def __str__(self):
return str(self.monitor) return str(self.monitor)
@cached_property @cached_property
def unit(self): def unit(self):
return self.resource.unit return self.resource.unit
@ -324,15 +324,15 @@ def create_resource_relation():
) )
self.obj.__resource_cache[attr] = rdata self.obj.__resource_cache[attr] = rdata
return rdata return rdata
def __get__(self, obj, cls): def __get__(self, obj, cls):
""" proxy handled object """ """ proxy handled object """
self.obj = obj self.obj = obj
return self return self
def __iter__(self): def __iter__(self):
return iter(self.obj.resource_set.all()) return iter(self.obj.resource_set.all())
# Clean previous state # Clean previous state
for related in Resource._related: for related in Resource._related:
try: try:
@ -344,7 +344,7 @@ def create_resource_relation():
related._meta.private_fields = [ related._meta.private_fields = [
field for field in related._meta.private_fields if field.rel.to != ResourceData field for field in related._meta.private_fields if field.rel.to != ResourceData
] ]
for ct, resources in Resource.objects.group_by('content_type').items(): for ct, resources in Resource.objects.group_by('content_type').items():
model = ct.model_class() model = ct.model_class()
relation = GenericRelation('resources.ResourceData') relation = GenericRelation('resources.ResourceData')

View file

@ -36,8 +36,8 @@ def monitor(resource_id, ids=None):
for obj in model.objects.filter(**kwargs): for obj in model.objects.filter(**kwargs):
op = Operation(backend, obj, Operation.MONITOR) op = Operation(backend, obj, Operation.MONITOR)
monitorings.append(op) monitorings.append(op)
logs += Operation.execute(monitorings, async=False) logs += Operation.execute(monitorings, run_async=False)
kwargs = {'id__in': ids} if ids else {} kwargs = {'id__in': ids} if ids else {}
# Update used resources and trigger resource exceeded and revovery # Update used resources and trigger resource exceeded and revovery
triggers = [] triggers = []

View file

@ -23,11 +23,11 @@ def is_due(task, time=None):
) )
def run_task(task, thread=True, process=False, async=False): def run_task(task, thread=True, process=False, run_async=False):
args = json.loads(task.args) args = json.loads(task.args)
kwargs = json.loads(task.kwargs) kwargs = json.loads(task.kwargs)
task_fn = current_app.tasks.get(task.task) task_fn = current_app.tasks.get(task.task)
if async: if run_async:
method = 'process' if process else 'thread' method = 'process' if process else 'thread'
return apply_async(task_fn, method=method).apply_async(*args, **kwargs) return apply_async(task_fn, method=method).apply_async(*args, **kwargs)
return task_fn(*args, **kwargs) return task_fn(*args, **kwargs)
@ -38,6 +38,6 @@ def run():
procs = [] procs = []
for task in PeriodicTask.objects.enabled().select_related('crontab'): for task in PeriodicTask.objects.enabled().select_related('crontab'):
if is_due(task, now): if is_due(task, now):
proc = run_task(task, process=True, async=True) proc = run_task(task, process=True, run_async=True)
procs.append(proc) procs.append(proc)
[proc.join() for proc in procs] [proc.join() for proc in procs]

View file

@ -13,7 +13,7 @@ def get_name(fn):
def run(method, *args, **kwargs): def run(method, *args, **kwargs):
async = kwargs.pop('async', True) run_async = kwargs.pop('run_async', True)
thread = threading.Thread(target=close_connection(method), args=args, kwargs=kwargs) thread = threading.Thread(target=close_connection(method), args=args, kwargs=kwargs)
thread = Process(target=close_connection(counter)) thread = Process(target=close_connection(counter))
thread.start() thread.start()

View file

@ -71,43 +71,43 @@ def runiterator(command, display=False, stdin=b''):
""" Subprocess wrapper for running commands concurrently """ """ Subprocess wrapper for running commands concurrently """
if display: if display:
sys.stderr.write("\n\033[1m $ %s\033[0m\n" % command) sys.stderr.write("\n\033[1m $ %s\033[0m\n" % command)
p = subprocess.Popen(command, shell=True, executable='/bin/bash', p = subprocess.Popen(command, shell=True, executable='/bin/bash',
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
p.stdin.write(stdin) p.stdin.write(stdin)
p.stdin.close() p.stdin.close()
yield yield
make_async(p.stdout) make_async(p.stdout)
make_async(p.stderr) make_async(p.stderr)
# Async reading of stdout and sterr # Async reading of stdout and sterr
while True: while True:
stdout = b'' stdout = b''
stderr = b'' stderr = b''
# Get complete unicode chunks # Get complete unicode chunks
select.select([p.stdout, p.stderr], [], []) select.select([p.stdout, p.stderr], [], [])
stdoutPiece = read_async(p.stdout) stdoutPiece = read_async(p.stdout)
stderrPiece = read_async(p.stderr) stderrPiece = read_async(p.stderr)
stdout += (stdoutPiece or b'') stdout += (stdoutPiece or b'')
#.decode('ascii'), errors='replace') #.decode('ascii'), errors='replace')
stderr += (stderrPiece or b'') stderr += (stderrPiece or b'')
#.decode('ascii'), errors='replace') #.decode('ascii'), errors='replace')
if display and stdout: if display and stdout:
sys.stdout.write(stdout.decode('utf8')) sys.stdout.write(stdout.decode('utf8'))
if display and stderr: if display and stderr:
sys.stderr.write(stderr.decode('utf8')) sys.stderr.write(stderr.decode('utf8'))
state = _Attribute(stdout) state = _Attribute(stdout)
state.stderr = stderr state.stderr = stderr
state.exit_code = p.poll() state.exit_code = p.poll()
state.command = command state.command = command
yield state yield state
if state.exit_code != None: if state.exit_code != None:
p.stdout.close() p.stdout.close()
p.stderr.close() p.stderr.close()
@ -121,12 +121,12 @@ def join(iterator, display=False, silent=False, valid_codes=(0,)):
for state in iterator: for state in iterator:
stdout += state.stdout stdout += state.stdout
stderr += state.stderr stderr += state.stderr
exit_code = state.exit_code exit_code = state.exit_code
out = _Attribute(stdout.strip()) out = _Attribute(stdout.strip())
err = stderr.strip() err = stderr.strip()
out.failed = False out.failed = False
out.exit_code = exit_code out.exit_code = exit_code
out.stderr = err out.stderr = err
@ -138,7 +138,7 @@ def join(iterator, display=False, silent=False, valid_codes=(0,)):
sys.stderr.write("\n\033[1;31mCommandError: %s %s\033[m\n" % (msg, err)) sys.stderr.write("\n\033[1;31mCommandError: %s %s\033[m\n" % (msg, err))
if not silent: if not silent:
raise CommandError("%s %s" % (msg, err)) raise CommandError("%s %s" % (msg, err))
out.succeeded = not out.failed out.succeeded = not out.failed
return out return out
@ -151,10 +151,10 @@ def joinall(iterators, **kwargs):
return results return results
def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async=False): def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', run_async=False):
iterator = runiterator(command, display, stdin) iterator = runiterator(command, display, stdin)
next(iterator) next(iterator)
if async: if run_async:
return iterator return iterator
return join(iterator, display=display, silent=silent, valid_codes=valid_codes) return join(iterator, display=display, silent=silent, valid_codes=valid_codes)
@ -213,7 +213,7 @@ class LockFile(object):
self.lockfile = lockfile self.lockfile = lockfile
self.expire = expire self.expire = expire
self.unlocked = unlocked self.unlocked = unlocked
def acquire(self): def acquire(self):
if os.path.exists(self.lockfile): if os.path.exists(self.lockfile):
lock_time = os.path.getmtime(self.lockfile) lock_time = os.path.getmtime(self.lockfile)
@ -222,17 +222,17 @@ class LockFile(object):
return False return False
touch(self.lockfile) touch(self.lockfile)
return True return True
def release(self): def release(self):
os.remove(self.lockfile) os.remove(self.lockfile)
def __enter__(self): def __enter__(self):
if not self.unlocked: if not self.unlocked:
if not self.acquire(): if not self.acquire():
raise OperationLocked("%s lock file exists and its mtime is less than %s seconds" % raise OperationLocked("%s lock file exists and its mtime is less than %s seconds" %
(self.lockfile, self.expire)) (self.lockfile, self.expire))
return True return True
def __exit__(self, type, value, traceback): def __exit__(self, type, value, traceback):
if not self.unlocked: if not self.unlocked:
self.release() self.release()
@ -240,4 +240,4 @@ class LockFile(object):
def touch_wsgi(delay=0): def touch_wsgi(delay=0):
from . import paths from . import paths
run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True) run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), run_async=True)