django-orchestra/orchestra/bin/orchestra-beat

226 lines
8.3 KiB
Python
Executable file

#!/usr/bin/env python3
# High performance alternative to beat management command
# Looks for pending work before firing up all the Django machinery on separate processes
#
# Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails
#
# USAGE: beat /path/to/project/manage.py
import json
import os
import re
import sys
from datetime import datetime, timedelta
from orchestra.utils.sys import run, join, LockFile
class crontab_parser(object):
"""
from celery.schedules import crontab_parser
Too expensive to import celery
"""
ParseException = ValueError
_range = r'(\w+?)-(\w+)'
_steps = r'/(\w+)?'
_star = r'\*'
def __init__(self, max_=60, min_=0):
self.max_ = max_
self.min_ = min_
self.pats = (
(re.compile(self._range + self._steps), self._range_steps),
(re.compile(self._range), self._expand_range),
(re.compile(self._star + self._steps), self._star_steps),
(re.compile('^' + self._star + '$'), self._expand_star),
)
def parse(self, spec):
acc = set()
for part in spec.split(','):
if not part:
raise self.ParseException('empty part')
acc |= set(self._parse_part(part))
return acc
def _parse_part(self, part):
for regex, handler in self.pats:
m = regex.match(part)
if m:
return handler(m.groups())
return self._expand_range((part, ))
def _expand_range(self, toks):
fr = self._expand_number(toks[0])
if len(toks) > 1:
to = self._expand_number(toks[1])
if to < fr: # Wrap around max_ if necessary
return (list(range(fr, self.min_ + self.max_)) +
list(range(self.min_, to + 1)))
return list(range(fr, to + 1))
return [fr]
def _range_steps(self, toks):
if len(toks) != 3 or not toks[2]:
raise self.ParseException('empty filter')
return self._expand_range(toks[:2])[::int(toks[2])]
def _star_steps(self, toks):
if not toks or not toks[0]:
raise self.ParseException('empty filter')
return self._expand_star()[::int(toks[0])]
def _expand_star(self, *args):
return list(range(self.min_, self.max_ + self.min_))
def _expand_number(self, s):
if isinstance(s, str) and s[0] == '-':
raise self.ParseException('negative numbers not supported')
try:
i = int(s)
except ValueError:
try:
i = weekday(s)
except KeyError:
raise ValueError('Invalid weekday literal {0!r}.'.format(s))
max_val = self.min_ + self.max_ - 1
if i > max_val:
raise ValueError(
'Invalid end range: {0} > {1}.'.format(i, max_val))
if i < self.min_:
raise ValueError(
'Invalid beginning range: {0} < {1}.'.format(i, self.min_))
return i
class Setting(object):
def __init__(self, manage):
self.manage = manage
self.settings_file = self.get_settings_file(manage)
def get_settings(self):
""" get db settings from settings.py file without importing """
settings = {'__file__': self.settings_file}
with open(self.settings_file) as f:
content = ''
for line in f.readlines():
# This is very costly, skip
if not line.startswith(('import djcelery', 'djcelery.setup_loader()')):
content += line
exec(content, settings)
return settings
def get_settings_file(self, manage):
with open(manage, 'r') as handler:
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
for line in handler.readlines():
match = regex.search(line)
if match:
settings_module = match.groups()[0]
settings_file = os.path.join(*settings_module.split('.')) + '.py'
settings_file = os.path.join(os.path.dirname(manage), settings_file)
return settings_file
raise ValueError("settings module not found in %s" % manage)
class DB(object):
def __init__(self, settings):
self.settings = settings['DATABASES']['default']
def connect(self):
if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
import sqlite3
self.conn = sqlite3.connect(self.settings['NAME'])
elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2':
import psycopg2
self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
else:
raise ValueError("%s engine not supported." % self.settings['ENGINE'])
def query(self, query):
cur = self.conn.cursor()
try:
cur.execute(query)
result = cur.fetchall()
finally:
cur.close()
return result
def close(self):
self.conn.close()
def fire_pending_tasks(manage, db):
def get_tasks(db):
enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True
query = (
"SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id "
"FROM djcelery_periodictask as p, djcelery_crontabschedule as c "
"WHERE p.crontab_id = c.id AND p.enabled = {}"
).format(enabled)
return db.query(query)
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
return (
n_minute in crontab_parser(60).parse(minute) and
n_hour in crontab_parser(24).parse(hour) and
n_day_of_week in crontab_parser(7).parse(day_of_week) and
n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and
n_month_of_year in crontab_parser(12, 1).parse(month_of_year)
)
now = datetime.utcnow()
now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
manage=manage, task_id=task_id)
proc = run(command, async=True)
yield proc
def fire_pending_messages(settings, db):
def has_pending_messages(settings, db):
MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
now = datetime.utcnow()
query_or = []
for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds)
epoch = now-delta
query_or.append("""(mailer_message.retries = 0 AND mailer_message.last_retry <= '%s')"""
% epoch.isoformat().replace('T', ' '))
query = """\
SELECT 1 FROM mailer_message
WHERE (mailer_message.state = 'QUEUED'
OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
return bool(db.query(query))
if has_pending_messages(settings, db):
command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage)
proc = run(command, async=True)
yield proc
if __name__ == "__main__":
with LockFile('/dev/shm/beat.lock', expire=20):
manage = sys.argv[1]
procs = []
settings = Setting(manage).get_settings()
db = DB(settings)
db.connect()
try:
# Non-blocking loop, we need to finish this in time for the next minute.
if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
for proc in fire_pending_tasks(manage, db):
procs.append(proc)
if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
for proc in fire_pending_messages(settings, db):
procs.append(proc)
finally:
db.close()
sys.exit(0)