#!/usr/bin/env python3

# High performance alternative to beat management command
# Looks for pending work before firing up all the Django machinery on separate processes
#
# Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails
#
# USAGE: beat /path/to/project/manage.py


import json
import os
import re
import sys
from datetime import datetime, timedelta

from orchestra.utils.sys import run, join, LockFile


class crontab_parser(object):
    """
    from celery.schedules import crontab_parser
    Too expensive to import celery
    """
    ParseException = ValueError

    _range = r'(\w+?)-(\w+)'
    _steps = r'/(\w+)?'
    _star = r'\*'

    def __init__(self, max_=60, min_=0):
        self.max_ = max_
        self.min_ = min_
        self.pats = (
            (re.compile(self._range + self._steps), self._range_steps),
            (re.compile(self._range), self._expand_range),
            (re.compile(self._star + self._steps), self._star_steps),
            (re.compile('^' + self._star + '$'), self._expand_star),
        )

    def parse(self, spec):
        acc = set()
        for part in spec.split(','):
            if not part:
                raise self.ParseException('empty part')
            acc |= set(self._parse_part(part))
        return acc

    def _parse_part(self, part):
        for regex, handler in self.pats:
            m = regex.match(part)
            if m:
                return handler(m.groups())
        return self._expand_range((part, ))

    def _expand_range(self, toks):
        fr = self._expand_number(toks[0])
        if len(toks) > 1:
            to = self._expand_number(toks[1])
            if to < fr:  # Wrap around max_ if necessary
                return (list(range(fr, self.min_ + self.max_)) +
                        list(range(self.min_, to + 1)))
            return list(range(fr, to + 1))
        return [fr]

    def _range_steps(self, toks):
        if len(toks) != 3 or not toks[2]:
            raise self.ParseException('empty filter')
        return self._expand_range(toks[:2])[::int(toks[2])]

    def _star_steps(self, toks):
        if not toks or not toks[0]:
            raise self.ParseException('empty filter')
        return self._expand_star()[::int(toks[0])]
    def _expand_star(self, *args):
        return list(range(self.min_, self.max_ + self.min_))

    def _expand_number(self, s):
        if isinstance(s, str) and s[0] == '-':
            raise self.ParseException('negative numbers not supported')
        try:
            i = int(s)
        except ValueError:
            try:
                i = weekday(s)
            except KeyError:
                raise ValueError('Invalid weekday literal {0!r}.'.format(s))
        max_val = self.min_ + self.max_ - 1
        if i > max_val:
            raise ValueError(
                'Invalid end range: {0} > {1}.'.format(i, max_val))
        if i < self.min_:
            raise ValueError(
                'Invalid beginning range: {0} < {1}.'.format(i, self.min_))
        return i


class Setting(object):
    def __init__(self, manage):
        self.manage = manage
        self.settings_file = self.get_settings_file(manage)

    def get_settings(self):
        """ get db settings from settings.py file without importing """
        settings = {'__file__': self.settings_file}
        with open(self.settings_file) as f:
            content = ''
            for line in f.readlines():
                # This is very costly, skip
                if not line.startswith(('import djcelery', 'djcelery.setup_loader()')):
                    content += line
            exec(content, settings)
        return settings

    def get_settings_file(self, manage):
        with open(manage, 'r') as handler:
            regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"')
            for line in handler.readlines():
                match = regex.search(line)
                if match:
                    settings_module = match.groups()[0]
                    settings_file = os.path.join(*settings_module.split('.')) + '.py'
                    settings_file = os.path.join(os.path.dirname(manage), settings_file)
                    return settings_file
        raise ValueError("settings module not found in %s" % manage)


class DB(object):
    def __init__(self, settings):
        self.settings = settings['DATABASES']['default']

    def connect(self):
        if self.settings['ENGINE'] == 'django.db.backends.sqlite3':
            import sqlite3
            self.conn = sqlite3.connect(self.settings['NAME'])
        elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2':
            import psycopg2
            self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings))
        else:
            raise ValueError("%s engine not supported." % self.settings['ENGINE'])

    def query(self, query):
        cur = self.conn.cursor()
        try:
            cur.execute(query)
            result = cur.fetchall()
        finally:
            cur.close()
        return result

    def close(self):
        self.conn.close()


def fire_pending_tasks(manage, db):
    def get_tasks(db):
        enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True
        query = (
            "SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id "
            "FROM djcelery_periodictask as p, djcelery_crontabschedule as c "
            "WHERE p.crontab_id = c.id AND p.enabled = {}"
        ).format(enabled)
        return db.query(query)

    def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
        n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now
        return (
            n_minute in crontab_parser(60).parse(minute) and
            n_hour in crontab_parser(24).parse(hour) and
            n_day_of_week in crontab_parser(7).parse(day_of_week) and
            n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and
            n_month_of_year in crontab_parser(12, 1).parse(month_of_year)
        )

    now = datetime.utcnow()
    now = tuple(map(int, now.strftime("%M %H %w %d %m").split()))
    for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db):
        if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year):
            command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format(
                manage=manage, task_id=task_id)
            proc = run(command, run_async=True)
            yield proc


def fire_pending_messages(settings, db):
    def has_pending_messages(settings, db):
        MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24))
        now = datetime.utcnow()
        query_or = []

        for num, seconds in enumerate(MAILER_DEFERE_SECONDS):
            delta = timedelta(seconds=seconds)
            epoch = now-delta
            query_or.append("""(mailer_message.retries = %i AND mailer_message.last_try <= '%s')"""
                % (num, epoch.isoformat().replace('T', ' ')))
        query = """\
            SELECT 1 FROM mailer_message
            WHERE (mailer_message.state = 'QUEUED'
                OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or)
        return bool(db.query(query))

    if has_pending_messages(settings, db):
        command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage)
        proc = run(command, run_async=True)
        yield proc


if __name__ == "__main__":
    with LockFile('/dev/shm/beat.lock', expire=20):
        manage = sys.argv[1]
        procs = []
        settings = Setting(manage).get_settings()
        db = DB(settings)
        db.connect()
        try:
            # Non-blocking loop, we need to finish this in time for the next minute.
            if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
                if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
                    for proc in fire_pending_tasks(manage, db):
                        procs.append(proc)
            if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
                for proc in fire_pending_messages(settings, db):
                    procs.append(proc)
        finally:
            db.close()
    sys.exit(0)