diff --git a/passbook/core/celery.py b/passbook/core/celery.py new file mode 100644 index 000000000..e0c607b6d --- /dev/null +++ b/passbook/core/celery.py @@ -0,0 +1,76 @@ +"""passbook core celery""" + +import logging +import os + +import celery +import pymysql +from django.conf import settings + +# from raven import Client +# from raven.contrib.celery import register_logger_signal, register_signal + +pymysql.install_as_MySQLdb() + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "passbook.core.settings") + +LOGGER = logging.getLogger(__name__) + + +class Celery(celery.Celery): + """Custom Celery class with Raven configured""" + + # pylint: disable=method-hidden + # def on_configure(self): + # """Update raven client""" + # try: + # client = Client(settings.RAVEN_CONFIG.get('dsn')) + # # register a custom filter to filter out duplicate logs + # register_logger_signal(client) + # # hook into the Celery error handler + # register_signal(client) + # except RecursionError: # This error happens when pdoc is running + # pass + + +# pylint: disable=unused-argument +@celery.signals.setup_logging.connect +def config_loggers(*args, **kwags): + """Apply logging settings from settings.py to celery""" + logging.config.dictConfig(settings.LOGGING) + + +# pylint: disable=unused-argument +@celery.signals.after_task_publish.connect +def after_task_publish(sender=None, headers=None, body=None, **kwargs): + """Log task_id after it was published""" + info = headers if 'task' in headers else body + LOGGER.debug('%-40s published (name=%s)', info.get('id'), info.get('task')) + + +# pylint: disable=unused-argument +@celery.signals.task_prerun.connect +def task_prerun(task_id, task, *args, **kwargs): + """Log task_id on worker""" + LOGGER.debug('%-40s started (name=%s)', task_id, task.__name__) + + +# pylint: disable=unused-argument +@celery.signals.task_postrun.connect +def task_postrun(task_id, task, *args, retval=None, state=None, **kwargs): + """Log task_id on worker""" + LOGGER.debug('%-40s finished (name=%s, state=%s)', + task_id, task.__name__, state) + + +CELERY_APP = Celery('passbook') + +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +CELERY_APP.config_from_object(settings, namespace='CELERY') + +# Load task modules from all registered Django app configs. +CELERY_APP.autodiscover_tasks() diff --git a/passbook/core/models.py b/passbook/core/models.py index 1e793d552..964fe803d 100644 --- a/passbook/core/models.py +++ b/passbook/core/models.py @@ -8,6 +8,7 @@ from django.db import models from django.utils.translation import gettext as _ from model_utils.managers import InheritanceManager +from passbook.core.celery import CELERY_APP from passbook.lib.models import CreatedUpdatedModel, UUIDModel LOGGER = getLogger(__name__) @@ -31,6 +32,13 @@ class Provider(models.Model): return getattr(self, 'name') return super().__str__() +class TaskModel(CELERY_APP.Task, models.Model): + """Django model which is also a celery task""" + + class Meta: + + abstract = True + class RuleModel(UUIDModel, CreatedUpdatedModel): """Base model which can have rules applied to it""" @@ -91,7 +99,7 @@ class UserSourceConnection(CreatedUpdatedModel): unique_together = (('user', 'source'),) @reversion.register() -class Rule(UUIDModel, CreatedUpdatedModel): +class Rule(TaskModel, UUIDModel, CreatedUpdatedModel): """Rules which specify if a user is authorized to use an Application. Can be overridden by other types to add other fields, more logic, etc.""" @@ -114,6 +122,10 @@ class Rule(UUIDModel, CreatedUpdatedModel): return self.name return "%s action %s" % (self.name, self.action) + def run(self, user_id: int) -> bool: + """Celery wrapper for passes""" + return self.passes(User.objects.get(pk=user_id)) + def passes(self, user: User) -> bool: """Check if user instance passes this rule""" raise NotImplementedError() @@ -140,6 +152,7 @@ class FieldMatcherRule(Rule): match_action = models.CharField(max_length=50, choices=MATCHES) value = models.TextField() + name = 'passbook_core.FieldMatcherRule' form = 'passbook.core.forms.rules.FieldMatcherRuleForm' def __str__(self): @@ -175,6 +188,7 @@ class FieldMatcherRule(Rule): verbose_name = _('Field matcher Rule') verbose_name_plural = _('Field matcher Rules') + @reversion.register() class WebhookRule(Rule): """Rule that asks webhook""" @@ -212,3 +226,6 @@ class WebhookRule(Rule): verbose_name = _('Webhook Rule') verbose_name_plural = _('Webhook Rules') +# Register tasks +for task_model in Rule.__subclasses__(): + CELERY_APP.tasks.register(task_model) diff --git a/passbook/core/settings.py b/passbook/core/settings.py index c93ae9bab..c0afcd0a1 100644 --- a/passbook/core/settings.py +++ b/passbook/core/settings.py @@ -160,7 +160,16 @@ USE_L10N = True USE_TZ = True -OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application' + +# Celery settings +# Add a 10 minute timeout to all Celery tasks. +CELERY_TASK_SOFT_TIME_LIMIT = 600 +CELERY_TIMEZONE = TIME_ZONE +CELERY_BEAT_SCHEDULE = {} +CELERY_CREATE_MISSING_QUEUES = True +CELERY_TASK_DEFAULT_QUEUE = 'passbook' +CELERY_BROKER_URL = 'redis://%s' % CONFIG.get('redis') +CELERY_RESULT_BACKEND = 'redis://%s' % CONFIG.get('redis') # Static files (CSS, JavaScript, Images)