Fixes on mailboxes, tasks, and mailer

This commit is contained in:
Marc Aymerich 2015-05-05 19:42:55 +00:00
parent 82b657aa03
commit e4f1673de8
35 changed files with 602 additions and 286 deletions

19
TODO.md
View file

@ -358,3 +358,22 @@ TODO mount the filesystem with "nosuid" option
# autoiscover modules on app.ready()
# uwse uwsgi cron: decorator or config cron = 59 2 -1 -1 -1 %(virtualenv)/bin/python manage.py runmyfunnytask
# SecondaryMailServerBackend and check_origin signal
try: import uwsgi to know its running uwsgi
# avoid cron email errors when failing hard
# mailboxes.address settings multiple local domains, not only one?
# backend.context = self.get_context() or save(obj, context=None)
# smtplib.SMTPConnectError: (421, b'4.7.0 mail.pangea.org Error: too many connections from 77.246.181.209')
# create registered periodic_task on beat execution: and management command: syncperiodictasks
# MERGE beats and inspect INSTALLED_APPS and get IS_ENABLED
# make exceptions fot check origin shit
# rename virtual_maps to virtual_alias_maps and remove virtual_alias_domains ?
# Message last_retry auto_now doesn't work!

View file

@ -37,7 +37,7 @@ class SendEmail(object):
raise PermissionDenied
initial={
'email_from': self.default_from,
'to': ' '.join(self.get_queryset_emails())
'to': ' '.join(self.get_email_addresses())
}
form = self.form(initial=initial)
if request.POST.get('post'):
@ -62,7 +62,7 @@ class SendEmail(object):
# Display confirmation page
return render(request, self.template, self.context)
def get_queryset_emails(self):
def get_email_addresses(self):
return self.queryset.values_list('email', flat=True)
def confirm_email(self, request, **options):
@ -74,7 +74,7 @@ class SendEmail(object):
if request.POST.get('post') == 'email_confirmation':
emails = []
num = 0
for email in self.get_queryset_emails():
for email in self.get_email_addresses():
emails.append((subject, message, email_from, [email]))
num += 1
if extra_to:
@ -99,7 +99,7 @@ class SendEmail(object):
'content_message': _(
"Are you sure you want to send the following message to the following %s?"
) % self.opts.verbose_name_plural,
'display_objects': ["%s (%s)" % (contact, contact.email) for contact in self.queryset],
'display_objects': ["%s (%s)" % (contact, email) for contact, email in zip(self.queryset, self.get_email_addresses())],
'form': form,
'subject': subject,
'message': message,

View file

@ -3,7 +3,7 @@ from django.core.urlresolvers import reverse
from django.utils.text import capfirst
from django.utils.translation import ugettext_lazy as _
from orchestra.core import services, accounts
from orchestra.core import services, accounts, administration
from orchestra.utils.apps import isinstalled
@ -27,15 +27,20 @@ def api_link(context):
return reverse('api-root')
def get_services():
def process_registered_models(register):
childrens = []
for model, options in services.get().items():
for model, options in register.get().items():
if options.get('menu', True):
opts = model._meta
url = reverse('admin:{}_{}_changelist'.format(
opts.app_label, opts.model_name))
name = capfirst(options.get('verbose_name_plural'))
childrens.append(items.MenuItem(name, url))
return childrens
def get_services():
childrens = process_registered_models(services)
return sorted(childrens, key=lambda i: i.title)
@ -47,13 +52,7 @@ def get_accounts():
if isinstalled('orchestra.contrib.issues'):
url = reverse('admin:issues_ticket_changelist')
childrens.append(items.MenuItem(_("Tickets"), url))
for model, options in accounts.get().items():
if options.get('menu', True):
opts = model._meta
url = reverse('admin:{}_{}_changelist'.format(
opts.app_label, opts.model_name))
name = capfirst(options.get('verbose_name_plural'))
childrens.append(items.MenuItem(name, url))
childrens.extend(process_registered_models(accounts))
return sorted(childrens, key=lambda i: i.title)
@ -100,6 +99,7 @@ def get_administration_items():
items.MenuItem(_("Periodic tasks"), periodic),
items.MenuItem(_("Workers"), worker),
]))
childrens.extend(process_registered_models(administration))
return childrens

152
orchestra/bin/orchestra-beat Executable file
View file

@ -0,0 +1,152 @@
#!/usr/bin/env python3
# High performance alternative to beat management command
# Looks for pending work before firing up all the Django machinery on separate processes
#
# Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails
#
# USAGE: beat /path/to/project/manage.py
import json
import os
import re
import sys
from datetime import datetime, timedelta
from celery.schedules import crontab_parser as CrontabParser
from orchestra.utils.sys import run, join
class Setting(object):
def __init__(self, manage):
self.manage = manage
self.settings_file = self.get_settings_file(manage)
def get_settings(self):
""" get db settings from settings.py file without importing """
settings = {'__file__': self.settings_file}
with open(self.settings_file) as f:
__file__ = 'rata'
exec(f.read(), settings)
return settings
def get_settings_file(self, manage):
with open(manage, 'r') as handler:
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
for line in handler.readlines():
match = regex.search(line)
if match:
settings_module = match.groups()[0]
settings_file = os.path.join(*settings_module.split('.')) + '.py'
settings_file = os.path.join(os.path.dirname(manage), settings_file)
return settings_file
raise ValueError("settings module not found in %s" % manage)
class DB(object):
def __init__(self, settings):
self.settings = settings['DATABASES']['default']
def connect(self):
if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
import sqlite3
self.conn = sqlite3.connect(self.settings['NAME'])
elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2':
import psycopg2
self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
else:
raise ValueError("%s engine not supported." % self.settings['ENGINE'])
def query(self, query):
cur = self.conn.cursor()
try:
cur.execute(query)
result = cur.fetchall()
finally:
cur.close()
return result
def close(self):
self.conn.close()
def fire_pending_tasks(manage, db):
def get_tasks(db):
enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True
query = (
"SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id "
"FROM djcelery_periodictask as p, djcelery_crontabschedule as c "
"WHERE p.crontab_id = c.id AND p.enabled = {}"
).format(enabled)
return db.query(query)
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
return (
n_minute in CrontabParser(60).parse(minute) and
n_hour in CrontabParser(24).parse(hour) and
n_day_of_week in CrontabParser(7).parse(day_of_week) and
n_day_of_month in CrontabParser(31, 1).parse(day_of_month) and
n_month_of_year in CrontabParser(12, 1).parse(month_of_year)
)
now = datetime.utcnow()
now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
manage=manage, task_id=task_id)
proc = run(command, async=True)
yield proc
def fire_pending_messages(settings, db):
def has_pending_messages(settings, db):
MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
now = datetime.utcnow()
query_or = []
for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds)
epoch = now-delta
query_or.append("""(mailer_message.retries = 0 AND mailer_message.last_retry <= '%s')"""
% epoch.isoformat().replace('T', ' '))
query = """\
SELECT 1 FROM mailer_message
WHERE (mailer_message.state = 'QUEUED'
OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
return bool(db.query(query))
if has_pending_messages(settings, db):
command = 'python3 -W ignore::DeprecationWarning {manage} send'.format(manage=manage)
proc = run(command, async=True)
yield proc
if __name__ == "__main__":
# TODO aquire lock
manage = sys.argv[1]
procs = []
settings = Setting(manage).get_settings()
db = DB(settings)
db.connect()
try:
if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
for proc in fire_pending_tasks(manage, db):
procs.append(proc)
if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
for proc in fire_pending_messages(settings, db):
procs.append(proc)
exit_code = 0
for proc in procs:
result = join(proc)
sys.stdout.write(result.stdout.decode('utf8'))
sys.stderr.write(result.stderr.decode('utf8'))
if result.return_code != 0:
exit_code = result.return_code
finally:
db.close()
sys.exit(exit_code)

View file

@ -102,7 +102,7 @@ class Bind9MasterDomainBackend(ServiceController):
servers.append(server.get_ip())
return servers
def get_masters(self, domain):
def get_masters_ips(self, domain):
ips = list(settings.DOMAINS_MASTERS)
if not ips:
ips += self.get_servers(domain, Bind9MasterDomainBackend)
@ -110,24 +110,23 @@ class Bind9MasterDomainBackend(ServiceController):
def get_slaves(self, domain):
ips = []
masters = self.get_masters(domain)
ns_queryset = domain.records.filter(type=Record.NS).values_list('value', flat=True)
ns_records = ns_queryset or settings.DOMAINS_DEFAULT_NS
for ns in ns_records:
hostname = ns.rstrip('.')
masters_ips = self.get_masters_ips(domain)
records = domain.get_records()
for record in records.by_type(Record.NS):
hostname = record.value.rstrip('.')
# First try with a DNS query, a more reliable source
try:
addr = socket.gethostbyname(hostname)
except socket.gaierror:
# check if domain is declared
# check if hostname is declared
try:
domain = Domain.objects.get(name=ns)
domain = Domain.objects.get(name=hostname)
except Domain.DoesNotExist:
continue
else:
a_record = domain.records.filter(name=Record.A) or [settings.DOMAINS_DEFAULT_A]
addr = a_record[0]
if addr not in masters:
# default to domain A record address
addr = records.by_type(Record.A)[0].value
if addr not in masters_ips:
ips.append(addr)
return OrderedSet(sorted(ips))
@ -185,7 +184,7 @@ class Bind9SlaveDomainBackend(Bind9MasterDomainBackend):
'name': domain.name,
'banner': self.get_banner(),
'subdomains': domain.subdomains.all(),
'masters': '; '.join(self.get_masters(domain)) or 'none',
'masters': '; '.join(self.get_masters_ips(domain)) or 'none',
'conf_path': self.CONF_PATH,
}
context['conf'] = textwrap.dedent("""

View file

@ -9,10 +9,10 @@ def domain_for_validation(instance, records):
so when validation calls render_zone() it will use the new provided data
"""
domain = copy.copy(instance)
def get_records(records=records):
def get_declared_records(records=records):
for data in records:
yield Record(type=data['type'], value=data['value'])
domain.get_records = get_records
domain.get_declared_records = get_declared_records
if not domain.pk:
# top domain lookup for new domains

View file

@ -85,7 +85,7 @@ class Domain(models.Model):
def get_absolute_url(self):
return 'http://%s' % self.name
def get_records(self):
def get_declared_records(self):
""" proxy method, needed for input validation, see helpers.domain_for_validation """
return self.records.all()
@ -122,10 +122,10 @@ class Domain(models.Model):
self.serial = serial
self.save(update_fields=['serial'])
def render_records(self):
def get_records(self):
types = {}
records = []
for record in self.get_records():
records = utils.RecordStorage()
for record in self.get_declared_records():
types[record.type] = True
if record.type == record.SOA:
# Update serial and insert at 0
@ -183,8 +183,11 @@ class Domain(models.Model):
type=Record.AAAA,
value=default_aaaa
))
return records
def render_records(self):
result = ''
for record in records:
for record in self.get_records():
name = '{name}.{spaces}'.format(
name=self.name,
spaces=' ' * (37-len(self.name))
@ -206,6 +209,14 @@ class Domain(models.Model):
)
return result
def has_default_mx(self):
records = self.get_records()
for record in records.by_type('MX'):
for default in settings.DOMAINS_DEFAULT_MX:
if record.value.endswith(' %s' % default.split()[-1]):
return True
return False
class Record(models.Model):
""" Represents a domain resource record """

View file

@ -1,6 +1,32 @@
from collections import defaultdict
from django.utils import timezone
class RecordStorage(object):
"""
list-dict implementation for fast lookups of record types
"""
def __init__(self, *args):
self.records = list(*args)
self.type = defaultdict(list)
def __iter__(self):
return iter(self.records)
def append(self, record):
self.records.append(record)
self.type[record['type']].append(record)
def insert(self, ix, record):
self.records.insert(ix, record)
self.type[record['type']].insert(ix, record)
def by_type(self, type):
return self.type[type]
def generate_zone_serial():
today = timezone.now()
return int("%.4d%.2d%.2d%.2d" % (today.year, today.month, today.day, 0))

View file

@ -1,3 +1,4 @@
import re
import textwrap
from django.utils.translation import ugettext_lazy as _
@ -10,12 +11,73 @@ from . import settings
from .models import List
class MailmanBackend(ServiceController):
class MailmanVirtualDomainBackend(ServiceController):
"""
Only syncs virtualdomains used on mailman addresses
"""
verbose_name = _("Mailman virtdomain-only")
model = 'lists.List'
doc_settings = (settings,
('LISTS_VIRTUAL_ALIAS_DOMAINS_PATH',)
)
def is_local_domain(self, domain):
""" whether or not domain MX points to this server """
return domain.has_default_mx()
def include_virtual_alias_domain(self, context):
domain = context['address_domain']
if domain and self.is_local_domain(domain):
self.append(textwrap.dedent("""
[[ $(grep '^\s*%(address_domain)s\s*$' %(virtual_alias_domains)s) ]] || {
echo '%(address_domain)s' >> %(virtual_alias_domains)s
UPDATED_VIRTUAL_ALIAS_DOMAINS=1
}""") % self.context
)
def is_last_domain(self, domain):
return not List.objects.filter(address_domain=domain).exists()
def exclude_virtual_alias_domain(self, context):
domain = context['address_domain']
if domain and self.is_last_domain(domain):
self.append("sed -i '/^%(address_domain)s\s*$/d' %(virtual_alias_domains)s" % context)
def save(self, mail_list):
context = self.get_context(mail_list)
self.include_virtual_alias_domain(context)
def delete(self, mail_list):
context = self.get_context(mail_list)
self.include_virtual_alias_domain(context)
def commit(self):
context = self.get_context_files()
self.append(textwrap.dedent("""
if [[ $UPDATED_VIRTUAL_ALIAS_DOMAINS == 1 ]]; then
service postfix reload
fi""") % context
)
def get_context_files(self):
return {
'virtual_alias_domains': settings.LISTS_VIRTUAL_ALIAS_DOMAINS_PATH,
}
def get_context(self, mail_list):
context = self.get_context_files()
context.update({
'address_domain': mail_list.address_domain,
})
return replace(context, "'", '"')
class MailmanBackend(MailmanVirtualDomainBackend):
"""
Mailman 2 backend based on <tt>newlist</tt>, it handles custom domains.
Includes <tt>MailmanVirtualDomainBackend</tt>
"""
verbose_name = "Mailman"
model = 'lists.List'
addresses = [
'',
'-admin',
@ -35,23 +97,6 @@ class MailmanBackend(ServiceController):
'LISTS_MAILMAN_ROOT_DIR'
))
def include_virtual_alias_domain(self, context):
if context['address_domain']:
# Check if the domain is hosted on this mail server
# TODO this is dependent on the domain model
if Domain.objects.filter(records__type=Record.MX, name=context['address_domain']).exists():
self.append(textwrap.dedent("""
[[ $(grep '^\s*%(address_domain)s\s*$' %(virtual_alias_domains)s) ]] || {
echo '%(address_domain)s' >> %(virtual_alias_domains)s
UPDATED_VIRTUAL_ALIAS_DOMAINS=1
}""") % context
)
def exclude_virtual_alias_domain(self, context):
address_domain = context['address_domain']
if not List.objects.filter(address_domain=address_domain).exists():
self.append("sed -i '/^%(address_domain)s\s*$/d' %(virtual_alias_domains)s" % context)
def get_virtual_aliases(self, context):
aliases = ['# %(banner)s' % context]
for address in self.addresses:
@ -163,7 +208,6 @@ class MailmanBackend(ServiceController):
return replace(context, "'", '"')
class MailmanTraffic(ServiceMonitor):
"""
Parses mailman log file looking for email size and multiples it by <tt>list_members</tt> count.

View file

@ -2,6 +2,12 @@ from orchestra.admin.actions import SendEmail
class SendMailboxEmail(SendEmail):
def get_queryset_emails(self):
def get_email_addresses(self):
for mailbox in self.queryset.all():
yield mailbox.get_local_address()
class SendAddressEmail(SendEmail):
def get_email_addresses(self):
for address in self.queryset.all():
yield address.emails

View file

@ -13,7 +13,7 @@ from orchestra.contrib.accounts.admin import SelectAccountAdminMixin
from orchestra.contrib.accounts.filters import IsActiveListFilter
from . import settings
from .actions import SendMailboxEmail
from .actions import SendMailboxEmail, SendAddressEmail
from .filters import HasMailboxListFilter, HasForwardListFilter, HasAddressListFilter
from .forms import MailboxCreationForm, MailboxChangeForm, AddressForm
from .models import Mailbox, Address, Autoresponse
@ -84,9 +84,9 @@ class MailboxAdmin(ChangePasswordAdminMixin, SelectAccountAdminMixin, ExtendedMo
def get_actions(self, request):
if settings.MAILBOXES_LOCAL_ADDRESS_DOMAIN:
self.actions = (SendMailboxEmail(),)
type(self).actions = (SendMailboxEmail(),)
else:
self.actions = ()
type(self).actions = ()
return super(MailboxAdmin, self).get_actions(request)
def formfield_for_dbfield(self, db_field, **kwargs):
@ -127,6 +127,7 @@ class AddressAdmin(SelectAccountAdminMixin, ExtendedModelAdmin):
inlines = [AutoresponseInline]
search_fields = ('forward', 'mailboxes__name', 'account__username', 'computed_email')
readonly_fields = ('account_link', 'domain_link', 'email_link')
actions = (SendAddressEmail(),)
filter_by_account_fields = ('domain', 'mailboxes')
filter_horizontal = ['mailboxes']
form = AddressForm

View file

@ -203,88 +203,64 @@ class DovecotPostfixPasswdVirtualUserBackend(SieveFilteringMixin, ServiceControl
return replace(context, "'", '"')
class PostfixAddressBackend(ServiceController):
class PostfixAddressVirtualDomainBackend(ServiceController):
"""
Addresses based on Postfix virtual alias domains.
Secondary SMTP server without mailboxes in it, only syncs virtual domains.
"""
verbose_name = _("Postfix address")
verbose_name = _("Postfix address virtdomain-only")
model = 'mailboxes.Address'
related_models = (
('mailboxes.Mailbox', 'addresses'),
)
doc_settings = (settings,
('MAILBOXES_LOCAL_DOMAIN', 'MAILBOXES_VIRTUAL_ALIAS_DOMAINS_PATH', 'MAILBOXES_VIRTUAL_ALIAS_MAPS_PATH',)
('MAILBOXES_LOCAL_DOMAIN', 'MAILBOXES_VIRTUAL_ALIAS_DOMAINS_PATH')
)
def is_local_domain(self, domain):
""" whether or not domain MX points to this server """
return domain.has_default_mx()
def include_virtual_alias_domain(self, context):
if context['domain'] != context['local_domain']:
# Check if the domain is hosted on this mail server
# TODO this is dependent on the domain model
if Domain.objects.filter(records__type=Record.MX, name=context['domain']).exists():
self.append(textwrap.dedent("""
[[ $(grep '^\s*%(domain)s\s*$' %(virtual_alias_domains)s) ]] || {
echo '%(domain)s' >> %(virtual_alias_domains)s
UPDATED_VIRTUAL_ALIAS_DOMAINS=1
}""") % context
)
domain = context['domain']
if domain.name != context['local_domain'] and self.is_local_domain(domain):
self.append(textwrap.dedent("""
[[ $(grep '^\s*%(domain)s\s*$' %(virtual_alias_domains)s) ]] || {
echo '%(domain)s' >> %(virtual_alias_domains)s
UPDATED_VIRTUAL_ALIAS_DOMAINS=1
}""") % context
)
def is_last_domain(self, domain):
return not Address.objects.filter(domain=domain).exists()
def exclude_virtual_alias_domain(self, context):
domain = context['domain']
if not Address.objects.filter(domain=domain).exists():
self.append("sed -i '/^%(domain)s\s*/d' %(virtual_alias_domains)s" % context)
def update_virtual_alias_maps(self, address, context):
# Virtual mailbox stuff
# destination = []
# for mailbox in address.get_mailboxes():
# context['mailbox'] = mailbox
# destination.append("%(mailbox)s@%(local_domain)s" % context)
# for forward in address.forward:
# if '@' in forward:
# destination.append(forward)
destination = address.destination
if destination:
context['destination'] = destination
self.append(textwrap.dedent("""
LINE='%(email)s\t%(destination)s'
if [[ ! $(grep '^%(email)s\s' %(virtual_alias_maps)s) ]]; then
echo "${LINE}" >> %(virtual_alias_maps)s
UPDATED_VIRTUAL_ALIAS_MAPS=1
else
if [[ ! $(grep "^${LINE}$" %(virtual_alias_maps)s) ]]; then
sed -i "s/^%(email)s\s.*$/${LINE}/" %(virtual_alias_maps)s
UPDATED_VIRTUAL_ALIAS_MAPS=1
fi
fi""") % context)
else:
logger.warning("Address %i is empty" % address.pk)
self.append("sed -i '/^%(email)s\s/d' %(virtual_alias_maps)s" % context)
self.append('UPDATED_VIRTUAL_ALIAS_MAPS=1')
def exclude_virtual_alias_maps(self, context):
self.append(textwrap.dedent("""
if [[ $(grep '^%(email)s\s' %(virtual_alias_maps)s) ]]; then
sed -i '/^%(email)s\s.*$/d' %(virtual_alias_maps)s
UPDATED_VIRTUAL_ALIAS_MAPS=1
fi""") % context)
if self.is_last_domain(domain):
self.append(textwrap.dedent("""\
sed -i '/^%(domain)s\s*/d;{!q0;q1}' %(virtual_alias_domains)s && \\
UPDATED_VIRTUAL_ALIAS_DOMAINS=1
""") % context
)
def save(self, address):
context = self.get_context(address)
self.include_virtual_alias_domain(context)
self.update_virtual_alias_maps(address, context)
return context
def delete(self, address):
context = self.get_context(address)
self.exclude_virtual_alias_domain(context)
self.exclude_virtual_alias_maps(context)
return context
def commit(self):
context = self.get_context_files()
self.append(textwrap.dedent("""
[[ $UPDATED_VIRTUAL_ALIAS_MAPS == 1 ]] && { postmap %(virtual_alias_maps)s; }
[[ $UPDATED_VIRTUAL_ALIAS_DOMAINS == 1 ]] && { service postfix reload; }
self.append(textwrap.dedent("""\
[[ $UPDATED_VIRTUAL_ALIAS_DOMAINS == 1 ]] && {
service postfix reload
}
exit $exit_code
""") % context
)
self.append('exit 0')
def get_context_files(self):
return {
@ -302,6 +278,77 @@ class PostfixAddressBackend(ServiceController):
return replace(context, "'", '"')
class PostfixAddressBackend(PostfixAddressVirtualDomainBackend):
"""
Addresses based on Postfix virtual alias domains, includes <tt>PostfixAddressVirtualDomainBackend</tt>.
"""
verbose_name = _("Postfix address")
doc_settings = (settings,
('MAILBOXES_LOCAL_DOMAIN', 'MAILBOXES_VIRTUAL_ALIAS_DOMAINS_PATH', 'MAILBOXES_VIRTUAL_ALIAS_MAPS_PATH')
)
def update_virtual_alias_maps(self, address, context):
destination = address.destination
if destination:
context['destination'] = destination
self.append(textwrap.dedent("""
LINE='%(email)s\t%(destination)s'
if [[ ! $(grep '^%(email)s\s' %(virtual_alias_maps)s) ]]; then
# Add new line
echo "${LINE}" >> %(virtual_alias_maps)s
UPDATED_VIRTUAL_ALIAS_MAPS=1
else
# Update existing line, if needed
if [[ ! $(grep "^${LINE}$" %(virtual_alias_maps)s) ]]; then
sed -i "s/^%(email)s\s.*$/${LINE}/" %(virtual_alias_maps)s
UPDATED_VIRTUAL_ALIAS_MAPS=1
fi
fi""") % context)
else:
logger.warning("Address %i is empty" % address.pk)
self.append(textwrap.dedent("""
sed -i '/^%(email)s\s/d;{!q0;q1}' %(virtual_alias_maps)s && \\
UPDATED_VIRTUAL_ALIAS_MAPS=1
""") % context
)
# Virtual mailbox stuff
# destination = []
# for mailbox in address.get_mailboxes():
# context['mailbox'] = mailbox
# destination.append("%(mailbox)s@%(local_domain)s" % context)
# for forward in address.forward:
# if '@' in forward:
# destination.append(forward)
def exclude_virtual_alias_maps(self, context):
self.append(textwrap.dedent("""
sed -i '/^%(email)s\s.*$/d;{!q0;q1}' %(virtual_alias_maps)s && \\
UPDATED_VIRTUAL_ALIAS_MAPS=1
""") % context
)
def save(self, address):
context = super(PostfixAddressBackend, self).save(address)
self.update_virtual_alias_maps(address, context)
def delete(self, address):
context = super(PostfixAddressBackend, self).save(address)
self.exclude_virtual_alias_maps(context)
def commit(self):
context = self.get_context_files()
self.append(textwrap.dedent("""\
[[ $UPDATED_VIRTUAL_ALIAS_DOMAINS == 1 ]] && {
service postfix reload
}
[[ $UPDATED_VIRTUAL_ALIAS_MAPS == 1 ]] && {
postmap %(virtual_alias_maps)s
}
exit $exit_code
""") % context
)
class AutoresponseBackend(ServiceController):
"""
WARNING: not implemented

View file

@ -0,0 +1 @@
default_app_config = 'orchestra.contrib.mailer.apps.MailerConfig'

View file

@ -1,22 +1,57 @@
from django.contrib import admin
from django.core.urlresolvers import reverse
from django.db.models import Count
from django.utils.translation import ugettext_lazy as _
from orchestra.admin.utils import admin_link
from orchestra.admin.utils import admin_link, admin_colored, admin_date
from .models import Message, SMTPLog
COLORS = {
Message.QUEUED: 'purple',
Message.SENT: 'green',
Message.DEFERRED: 'darkorange',
Message.FAILED: 'red',
SMTPLog.SUCCESS: 'green',
SMTPLog.FAILURE: 'red',
}
class MessageAdmin(admin.ModelAdmin):
list_display = (
'id', 'state', 'priority', 'to_address', 'from_address', 'created_at', 'retries', 'last_retry'
'id', 'colored_state', 'priority', 'to_address', 'from_address', 'created_at_delta',
'retries', 'last_retry_delta', 'num_logs',
)
list_filter = ('state', 'priority', 'retries')
colored_state = admin_colored('state', colors=COLORS)
created_at_delta = admin_date('created_at')
last_retry_delta = admin_date('last_retry')
def num_logs(self, instance):
num = instance.logs__count
url = reverse('admin:mailer_smtplog_changelist')
url += '?&message=%i' % instance.pk
return '<a href="%s">%d</a>' % (url, num)
num_logs.short_description = _("Logs")
num_logs.admin_order_field = 'logs__count'
num_logs.allow_tags = True
def get_queryset(self, request):
qs = super(MessageAdmin, self).get_queryset(request)
return qs.annotate(Count('logs'))
class SMTPLogAdmin(admin.ModelAdmin):
list_display = (
'id', 'message_link', 'result', 'date', 'log_message'
'id', 'message_link', 'colored_result', 'date_delta', 'log_message'
)
list_filter = ('result',)
message_link = admin_link('message')
colored_result = admin_colored('result', colors=COLORS, bold=False)
date_delta = admin_date('date')
admin.site.register(Message, MessageAdmin)

View file

@ -0,0 +1,12 @@
from django.apps import AppConfig
from orchestra.core import administration
class MailerConfig(AppConfig):
name = 'orchestra.contrib.mailer'
verbose_name = "Mailer"
def ready(self):
from .models import Message
administration.register(Message)

View file

@ -5,27 +5,27 @@ from .tasks import send_message
class EmailBackend(BaseEmailBackend):
'''
"""
A wrapper that manages a queued SMTP system.
'''
"""
def send_messages(self, email_messages):
if not email_messages:
return
num_sent = 0
# TODO if multiple messages queue, else async?
is_bulk = len(email_messages) > 1
for message in email_messages:
priority = message.extra_headers.get('X-Mail-Priority', Message.NORMAL)
if priority == Message.CRITICAL:
send_message(message).apply_async()
else:
content = message.message().as_string()
for to_email in message.recipients():
message = Message.objects.create(
priority=priority,
to_address=to_email,
from_address=message.from_email,
subject=message.subject,
content=content,
)
content = message.message().as_string()
for to_email in message.recipients():
message = Message.objects.create(
priority=priority,
to_address=to_email,
from_address=message.from_email,
subject=message.subject,
content=content,
)
if not is_bulk or priority == Message.CRITICAL:
# send immidiately
send_message.apply_async(message)
num_sent += 1
return num_sent

View file

@ -7,7 +7,7 @@ from django.utils.encoding import smart_str
from .models import Message
def send_message(message, num, connection, bulk):
def send_message(message, num=0, connection=None, bulk=100):
if num >= bulk:
connection.close()
connection = None
@ -34,6 +34,7 @@ def send_pending(bulk=100):
num = 0
for message in Message.objects.filter(state=Message.QUEUED).order_by('priority'):
send_message(message, num, connection, bulk)
num += 1
from django.utils import timezone
from . import settings
from datetime import timedelta
@ -48,4 +49,3 @@ def send_pending(bulk=100):
send_message(message, num, connection, bulk)
if connection is not None:
connection.close()

View file

@ -16,10 +16,10 @@ class Message(models.Model):
(FAILED, _("Failes")),
)
CRITICAL = '0'
HIGH = '1'
NORMAL = '2'
LOW = '3'
CRITICAL = 0
HIGH = 1
NORMAL = 2
LOW = 3
PRIORITIES = (
(CRITICAL, _("Critical (not queued)")),
(HIGH, _("High")),
@ -31,11 +31,12 @@ class Message(models.Model):
priority = models.PositiveIntegerField(_("Priority"), choices=PRIORITIES, default=NORMAL)
to_address = models.CharField(max_length=256)
from_address = models.CharField(max_length=256)
subject = models.CharField(max_length=256)
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
retries = models.PositiveIntegerField(default=0)
last_retry = models.DateTimeField(auto_now=True)
subject = models.CharField(_("subject"), max_length=256)
content = models.TextField(_("content"))
created_at = models.DateTimeField(_("created at"), auto_now_add=True)
retries = models.PositiveIntegerField(_("retries"), default=0)
# TODO rename to last_try
last_retry = models.DateTimeField(_("last try"), auto_now=True)
def defer(self):
self.state = self.DEFERRED

View file

@ -4,3 +4,8 @@ from orchestra.contrib.settings import Setting
MAILER_DEFERE_SECONDS = Setting('MAILER_DEFERE_SECONDS',
(300, 600, 60*60, 60*60*24),
)
MAILER_MESSAGES_CLEANUP_DAYS = Setting('MAILER_MESSAGES_CLEANUP_DAYS',
10
)

View file

@ -1,6 +1,20 @@
def send_message():
pass
from django.utils import timezone
from celery.task.schedules import crontab
from orchestra.contrib.tasks import task, periodic_task
from . import engine
@task
def send_message(message):
engine.send_message(message)
@periodic_task(run_every=crontab(hour=7, minute=30))
def cleanup_messages():
pass
from .models import Message
delta = timedelta(days=settings.MAILER_MESSAGES_CLEANUP_DAYS)
now = timezone.now()
epoch = (now-delta)
Message.objects.filter(state=Message.SENT, last_retry__lt=epoc).delete()

View file

@ -19,6 +19,7 @@ STATE_COLORS = {
BackendLog.FAILURE: 'red',
BackendLog.ERROR: 'red',
BackendLog.REVOKED: 'magenta',
BackendLog.NOTHING: 'green',
}

View file

@ -46,6 +46,9 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
# Force the backend manager to block in multiple backend executions executing them synchronously
block = False
doc_settings = None
# By default backend will not run if actions do not generate insctructions,
# If your backend uses prepare() or commit() only then you should set force_empty_action_execution = True
force_empty_action_execution = False
def __str__(self):
return type(self).__name__
@ -64,16 +67,29 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
'tail',
'content',
'script_method',
'function_method'
'function_method',
'set_head',
'set_tail',
'set_content',
'actions',
)
if attr == 'prepare':
self.cmd_section = self.head
self.set_head()
elif attr == 'commit':
self.cmd_section = self.tail
elif attr not in IGNORE_ATTRS:
self.cmd_section = self.content
self.set_tail()
elif attr not in IGNORE_ATTRS and attr in self.actions:
self.set_content()
return super(ServiceBackend, self).__getattribute__(attr)
def set_head(self):
self.cmd_section = self.head
def set_tail(self):
self.cmd_section = self.tail
def set_content(self):
self.cmd_section = self.content
@classmethod
def get_actions(cls):
return [ action for action in cls.actions if action in dir(cls) ]
@ -148,13 +164,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount):
from .models import BackendLog
scripts = self.scripts
state = BackendLog.STARTED
if not scripts:
state = BackendLog.SUCCESS
run = bool(scripts) or (self.force_empty_action_execution or bool(self.content))
if not run:
state = BackendLog.NOTHING
log = BackendLog.objects.create(backend=self.get_name(), state=state, server=server)
for method, commands in scripts:
method(log, server, commands, async)
if log.state != BackendLog.SUCCESS:
break
if run:
for method, commands in scripts:
method(log, server, commands, async)
if log.state != BackendLog.SUCCESS:
break
return log
def append(self, *cmd):

View file

@ -78,7 +78,7 @@ def message_user(request, logs):
if log.state != log.EXCEPTION:
# EXCEPTION logs are not stored on the database
ids.append(log.pk)
if log.state == log.SUCCESS:
if log.state in (log.SUCCESS, log.NOTHING):
successes += 1
errors = total-successes
if len(ids) == 1:

View file

@ -12,7 +12,7 @@ from . import settings, Operation
from .backends import ServiceBackend
from .helpers import send_report
from .models import BackendLog
from .signals import pre_action, post_action
from .signals import pre_action, post_action, pre_commit, post_commit, pre_prepare, post_prepare
logger = logging.getLogger(__name__)
@ -54,8 +54,12 @@ def generate(operations):
for server in operation.servers:
key = (server, operation.backend)
if key not in scripts:
scripts[key] = (operation.backend(), [operation])
scripts[key][0].prepare()
backend, operations = (operation.backend(), [operation])
scripts[key] = (backend, operations)
backend.set_head()
pre_prepare.send(sender=backend.__class__, backend=backend)
backend.prepare()
post_prepare.send(sender=backend.__class__, backend=backend)
else:
scripts[key][1].append(operation)
# Get and call backend action method
@ -67,6 +71,7 @@ def generate(operations):
'instance': operation.instance,
'action': operation.action,
}
backend.set_content()
pre_action.send(**kwargs)
method(operation.instance)
post_action.send(**kwargs)
@ -74,7 +79,10 @@ def generate(operations):
block = True
for value in scripts.values():
backend, operations = value
backend.set_tail()
pre_commit.send(sender=backend.__class__, backend=backend)
backend.commit()
post_commit.send(sender=backend.__class__, backend=backend)
return scripts, block

View file

@ -26,7 +26,7 @@ class Server(models.Model):
default=settings.ORCHESTRATION_DEFAULT_OS)
def __str__(self):
return self.name
return self.name or str(self.address)
def get_address(self):
if self.address:
@ -53,6 +53,7 @@ class BackendLog(models.Model):
ERROR = 'ERROR'
REVOKED = 'REVOKED'
ABORTED = 'ABORTED'
NOTHING = 'NOTHING'
# Special state for mocked backendlogs
EXCEPTION = 'EXCEPTION'
@ -65,6 +66,7 @@ class BackendLog(models.Model):
(ERROR, ERROR),
(ABORTED, ABORTED),
(REVOKED, REVOKED),
(NOTHING, NOTHING),
)
backend = models.CharField(_("backend"), max_length=256)

View file

@ -4,3 +4,11 @@ import django.dispatch
pre_action = django.dispatch.Signal(providing_args=['backend', 'instance', 'action'])
post_action = django.dispatch.Signal(providing_args=['backend', 'instance', 'action'])
pre_prepare = django.dispatch.Signal(providing_args=['backend'])
post_prepare = django.dispatch.Signal(providing_args=['backend'])
pre_commit = django.dispatch.Signal(providing_args=['backend'])
post_commit = django.dispatch.Signal(providing_args=['backend'])

View file

@ -8,7 +8,7 @@ from orchestra.contrib.tasks import periodic_task
from .models import BackendLog
@periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1))
@periodic_task(run_every=crontab(hour=7, minute=0))
def backend_logs_cleanup():
days = settings.ORCHESTRATION_BACKEND_CLEANUP_DAYS
epoch = timezone.now()-timedelta(days=days)

View file

@ -1,84 +0,0 @@
#!/usr/bin/env python3
# High performance alternative to beat management command
#
# USAGE: beat /path/to/project/manage.py
import json
import os
import re
import sys
from datetime import datetime
from orchestra.utils import db
from orchestra.utils.python import import_class
from orchestra.utils.sys import run, join
from celery.schedules import crontab_parser as CrontabParser
def get_settings_file(manage):
with open(manage, 'r') as handler:
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
for line in handler.readlines():
match = regex.search(line)
if match:
settings_module = match.groups()[0]
settings_file = os.path.join(*settings_module.split('.')) + '.py'
settings_file = os.path.join(os.path.dirname(manage), settings_file)
return settings_file
raise ValueError("settings module not found in %s" % manage)
def get_tasks(manage):
settings_file = get_settings_file(manage)
settings = db.get_settings(settings_file)
try:
conn = db.get_connection(settings)
except:
sys.stdout.write("ERROR")
sys.stderr.write("I am unable to connect to the database\n")
sys.exit(1)
script, settings_file = sys.argv[:2]
enabled = 1 if 'sqlite' in settings['ENGINE'] else True
query = (
"SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id "
"FROM djcelery_periodictask as p, djcelery_crontabschedule as c "
"WHERE p.crontab_id = c.id AND p.enabled = {}"
).format(enabled)
tasks = db.run_query(conn, query)
conn.close()
return tasks
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
return (
n_minute in CrontabParser(60).parse(minute) and
n_hour in CrontabParser(24).parse(hour) and
n_day_of_week in CrontabParser(7).parse(day_of_week) and
n_day_of_month in CrontabParser(31, 1).parse(day_of_month) and
n_month_of_year in CrontabParser(12, 1).parse(month_of_year)
)
if __name__ == "__main__":
manage = sys.argv[1]
now = datetime.utcnow()
now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
procs = []
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(manage):
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
manage=manage, task_id=task_id)
proc = run(command, async=True)
procs.append(proc)
code = 0
for proc in procs:
result = join(proc)
sys.stdout.write(result.stdout.decode('utf8'))
sys.stderr.write(result.stderr.decode('utf8'))
if result.return_code != 0:
code = result.return_code
sys.exit(code)

View file

@ -76,9 +76,9 @@ def task(fn=None, **kwargs):
return decorator
else:
return celery_shared_task(**kwargs)
fn = update_wraper(partial(celery_shared_task, fn))
fn = celery_shared_task(fn)
if settings.TASKS_BACKEND in ('thread', 'process'):
fn = update_wrapper(apply_async(fn), fn)
fn = apply_async(fn)
return fn
@ -93,7 +93,7 @@ def periodic_task(fn=None, **kwargs):
return decorator
else:
return celery_periodic_task(**kwargs)
fn = update_wraper(celery_periodic_task(fn), fn)
fn = celery_periodic_task(fn)
if settings.TASKS_BACKEND in ('thread', 'process'):
name = kwargs.pop('name', None)
fn = update_wrapper(apply_async(fn, name), fn)

View file

@ -30,3 +30,4 @@ class Register(object):
services = Register()
# TODO rename to something else
accounts = Register()
administration = Register()

View file

@ -3,12 +3,13 @@ import os
from django.core.management.base import BaseCommand, CommandError
from orchestra.utils.paths import get_site_dir
from orchestra.utils.sys import run
from orchestra.utils.sys import run, check_non_root
class Command(BaseCommand):
help = 'Runs periodic tasks.'
help = 'Confingure crontab to run periodic tasks and mailer with orchestra-beat.'
@check_non_root
def handle(self, *args, **options):
context = {
'site_dir': get_site_dir(),

View file

@ -13,33 +13,3 @@ def close_connection(execute):
finally:
db.connection.close()
return wrapper
def get_settings(settings_file):
""" get db settings from settings.py file without importing """
settings = {'__file__': settings_file}
with open(settings_file) as f:
__file__ = 'rata'
exec(f.read(), settings)
settings = settings['DATABASES']['default']
if settings['ENGINE'] not in ('django.db.backends.sqlite3', 'django.db.backends.postgresql_psycopg2'):
raise ValueError("%s engine not supported." % settings['ENGINE'])
return settings
def get_connection(settings):
if settings['ENGINE'] == 'django.db.backends.sqlite3':
import sqlite3
return sqlite3.connect(settings['NAME'])
elif settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2':
import psycopg2
return psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**settings))
return conn
def run_query(conn, query):
cur = conn.cursor()
cur.execute(query)
result = cur.fetchall()
cur.close()
return result

View file

@ -21,6 +21,16 @@ def check_root(func):
return wrapped
def check_non_root(func):
""" Function decorator that checks if user not has root permissions """
def wrapped(*args, **kwargs):
if getpass.getuser() == 'root':
cmd_name = func.__module__.split('.')[-1]
raise CommandError("Sorry, you don't want to execute '%s' as superuser (root)." % cmd_name)
return func(*args, **kwargs)
return wrapped
class _Attribute(object):
""" Simple string subclass to allow arbitrary attribute access. """
def __init__(self, stdout):

View file

@ -21,6 +21,8 @@ HOME="/home/$USER"
PROJECT_NAME='panel'
BASE_DIR="$HOME/$PROJECT_NAME"
PYTHON_BIN="python3"
CELERY=false
surun () {
echo " ${bold}\$ su $USER -c \"${@}\"${normal}"
@ -93,17 +95,23 @@ fi
run "$PYTHON_BIN $MANAGE migrate --noinput accounts"
run "$PYTHON_BIN $MANAGE migrate --noinput"
sudo $PYTHON_BIN $MANAGE setupcelery --username $USER --processes 2
if [[ $CELERY == true ]]; then
run apt-get install -y rabbitmq
sudo $PYTHON_BIN $MANAGE setupcelery --username $USER --processes 2
else
run "$PYTHON_BIN $MANAGE setupcronbeat"
fi
# Install and configure Nginx+uwsgi web services
surun "mkdir -p $BASE_DIR/static"
surun "$PYTHON_BIN $MANAGE collectstatic --noinput"
run "apt-get install -y nginx uwsgi uwsgi-plugin-python3"
run "$PYTHON_BIN $MANAGE setupnginx"
run "$PYTHON_BIN $MANAGE setupnginx --user $USER --noinput"
run "service nginx start"
# Apply changes on related services
run "$PYTHON_BIN $MANAGE restartservices"
run "$PYTHON_BIN $MANAGE reloadservices"
# Create orchestra superuser
cat <<- EOF | $PYTHON_BIN $MANAGE shell

View file

@ -29,7 +29,7 @@ setup(
include_package_data = True,
scripts=[
'orchestra/bin/orchestra-admin',
'orchestra/contrib/tasks/bin/orchestra-beat',
'orchestra//bin/orchestra-beat',
],
packages = packages,
classifiers = [