django-orchestra/orchestra/bin/orchestra-beat

147 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python3
# High performance alternative to beat management command
# Looks for pending work before firing up all the Django machinery on separate processes
#
# Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails
#
# USAGE: beat /path/to/project/manage.py
import json
import os
import re
import sys
from datetime import datetime, timedelta
from celery.schedules import crontab_parser as CrontabParser
from orchestra.utils.sys import run, join, LockFile
class Setting(object):
def __init__(self, manage):
self.manage = manage
self.settings_file = self.get_settings_file(manage)
def get_settings(self):
""" get db settings from settings.py file without importing """
settings = {'__file__': self.settings_file}
with open(self.settings_file) as f:
__file__ = 'rata'
exec(f.read(), settings)
return settings
def get_settings_file(self, manage):
with open(manage, 'r') as handler:
regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
for line in handler.readlines():
match = regex.search(line)
if match:
settings_module = match.groups()[0]
settings_file = os.path.join(*settings_module.split('.')) + '.py'
settings_file = os.path.join(os.path.dirname(manage), settings_file)
return settings_file
raise ValueError("settings module not found in %s" % manage)
class DB(object):
def __init__(self, settings):
self.settings = settings['DATABASES']['default']
def connect(self):
if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
import sqlite3
self.conn = sqlite3.connect(self.settings['NAME'])
elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2':
import psycopg2
self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
else:
raise ValueError("%s engine not supported." % self.settings['ENGINE'])
def query(self, query):
cur = self.conn.cursor()
try:
cur.execute(query)
result = cur.fetchall()
finally:
cur.close()
return result
def close(self):
self.conn.close()
def fire_pending_tasks(manage, db):
def get_tasks(db):
enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True
query = (
"SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id "
"FROM djcelery_periodictask as p, djcelery_crontabschedule as c "
"WHERE p.crontab_id = c.id AND p.enabled = {}"
).format(enabled)
return db.query(query)
def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
return (
n_minute in CrontabParser(60).parse(minute) and
n_hour in CrontabParser(24).parse(hour) and
n_day_of_week in CrontabParser(7).parse(day_of_week) and
n_day_of_month in CrontabParser(31, 1).parse(day_of_month) and
n_month_of_year in CrontabParser(12, 1).parse(month_of_year)
)
now = datetime.utcnow()
now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
manage=manage, task_id=task_id)
proc = run(command, async=True)
yield proc
def fire_pending_messages(settings, db):
def has_pending_messages(settings, db):
MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
now = datetime.utcnow()
query_or = []
for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds)
epoch = now-delta
query_or.append("""(mailer_message.retries = 0 AND mailer_message.last_retry <= '%s')"""
% epoch.isoformat().replace('T', ' '))
query = """\
SELECT 1 FROM mailer_message
WHERE (mailer_message.state = 'QUEUED'
OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
return bool(db.query(query))
if has_pending_messages(settings, db):
command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage)
proc = run(command, async=True)
yield proc
if __name__ == "__main__":
with LockFile('/dev/shm/beat.lock', expire=20):
manage = sys.argv[1]
procs = []
settings = Setting(manage).get_settings()
db = DB(settings)
db.connect()
try:
# Non-blocking loop, we need to finish this in time for the next minute.
if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
for proc in fire_pending_tasks(manage, db):
procs.append(proc)
if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
for proc in fire_pending_messages(settings, db):
procs.append(proc)
finally:
db.close()
sys.exit(0)