stages/email: Implement MonitoredTask, but only for failed emails
This commit is contained in:
parent
4ac87d8739
commit
8fedd9ec07
|
@ -69,10 +69,14 @@ class TaskInfo:
|
||||||
class MonitoredTask(Task):
|
class MonitoredTask(Task):
|
||||||
"""Task which can save its state to the cache"""
|
"""Task which can save its state to the cache"""
|
||||||
|
|
||||||
|
# For tasks that should only be listed if they failed, set this to False
|
||||||
|
save_on_success: bool
|
||||||
|
|
||||||
_result: TaskResult
|
_result: TaskResult
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs) -> None:
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
self.save_on_success = True
|
||||||
self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[])
|
self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[])
|
||||||
|
|
||||||
def set_status(self, result: TaskResult):
|
def set_status(self, result: TaskResult):
|
||||||
|
@ -83,6 +87,7 @@ class MonitoredTask(Task):
|
||||||
def after_return(
|
def after_return(
|
||||||
self, status, retval, task_id, args: List[Any], kwargs: Dict[str, Any], einfo
|
self, status, retval, task_id, args: List[Any], kwargs: Dict[str, Any], einfo
|
||||||
):
|
):
|
||||||
|
if self.save_on_success:
|
||||||
TaskInfo(
|
TaskInfo(
|
||||||
task_name=self.__name__,
|
task_name=self.__name__,
|
||||||
task_description=self.__doc__,
|
task_description=self.__doc__,
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
"""email stage tasks"""
|
"""email stage tasks"""
|
||||||
|
from email.utils import make_msgid
|
||||||
from smtplib import SMTPException
|
from smtplib import SMTPException
|
||||||
from typing import Any, Dict, List
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
from celery import group
|
from celery import group
|
||||||
from django.core.mail import EmailMultiAlternatives
|
from django.core.mail import EmailMultiAlternatives
|
||||||
|
from django.core.mail.utils import DNS_NAME
|
||||||
from structlog import get_logger
|
from structlog import get_logger
|
||||||
|
|
||||||
|
from passbook.lib.tasks import MonitoredTask, TaskResult, TaskResultStatus
|
||||||
from passbook.root.celery import CELERY_APP
|
from passbook.root.celery import CELERY_APP
|
||||||
from passbook.stages.email.models import EmailStage
|
from passbook.stages.email.models import EmailStage
|
||||||
|
|
||||||
|
@ -16,7 +19,7 @@ def send_mails(stage: EmailStage, *messages: List[EmailMultiAlternatives]):
|
||||||
"""Wrapper to convert EmailMessage to dict and send it from worker"""
|
"""Wrapper to convert EmailMessage to dict and send it from worker"""
|
||||||
tasks = []
|
tasks = []
|
||||||
for message in messages:
|
for message in messages:
|
||||||
tasks.append(_send_mail_task.s(stage.pk, message.__dict__))
|
tasks.append(send_mail.s(stage.pk, message.__dict__))
|
||||||
lazy_group = group(*tasks)
|
lazy_group = group(*tasks)
|
||||||
promise = lazy_group()
|
promise = lazy_group()
|
||||||
return promise
|
return promise
|
||||||
|
@ -29,11 +32,13 @@ def send_mails(stage: EmailStage, *messages: List[EmailMultiAlternatives]):
|
||||||
ConnectionError,
|
ConnectionError,
|
||||||
),
|
),
|
||||||
retry_backoff=True,
|
retry_backoff=True,
|
||||||
|
base=MonitoredTask,
|
||||||
)
|
)
|
||||||
# pylint: disable=unused-argument
|
def send_mail(self: MonitoredTask, email_stage_pk: int, message: Dict[Any, Any]):
|
||||||
def _send_mail_task(self, email_stage_pk: int, message: Dict[Any, Any]):
|
|
||||||
"""Send Email according to EmailStage parameters from background worker.
|
"""Send Email according to EmailStage parameters from background worker.
|
||||||
Automatically retries if message couldn't be sent."""
|
Automatically retries if message couldn't be sent."""
|
||||||
|
self.save_on_success = False
|
||||||
|
try:
|
||||||
stage: EmailStage = EmailStage.objects.get(pk=email_stage_pk)
|
stage: EmailStage = EmailStage.objects.get(pk=email_stage_pk)
|
||||||
backend = stage.backend
|
backend = stage.backend
|
||||||
backend.open()
|
backend.open()
|
||||||
|
@ -43,5 +48,19 @@ def _send_mail_task(self, email_stage_pk: int, message: Dict[Any, Any]):
|
||||||
for key, value in message.items():
|
for key, value in message.items():
|
||||||
setattr(message_object, key, value)
|
setattr(message_object, key, value)
|
||||||
message_object.from_email = stage.from_address
|
message_object.from_email = stage.from_address
|
||||||
|
# Because we use the Message-ID as UID for the task, manually assign it
|
||||||
|
message_id = make_msgid(DNS_NAME)
|
||||||
|
message_object.extra_headers["Message-ID"] = message_id
|
||||||
|
|
||||||
LOGGER.debug("Sending mail", to=message_object.to)
|
LOGGER.debug("Sending mail", to=message_object.to)
|
||||||
stage.backend.send_messages([message_object])
|
stage.backend.send_messages([message_object])
|
||||||
|
self.set_status(
|
||||||
|
TaskResult(
|
||||||
|
TaskResultStatus.SUCCESSFUL,
|
||||||
|
messages=["Successfully sent Mail."],
|
||||||
|
uid=message_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except (SMTPException, ConnectionError) as exc:
|
||||||
|
self.set_status(TaskResult(TaskResultStatus.ERROR, [str(exc)], exc))
|
||||||
|
raise exc
|
||||||
|
|
Reference in a new issue