django-orchestra-test/orchestra/contrib/mailer/admin.py

158 lines
5.6 KiB
Python
Raw Normal View History

import base64
import email
2015-05-26 12:59:16 +00:00
from django import forms
2015-05-04 19:52:53 +00:00
from django.contrib import admin
from django.urls import reverse
2015-05-19 13:27:04 +00:00
from django.db.models import Count
2015-05-07 19:00:02 +00:00
from django.shortcuts import redirect
from django.utils.html import format_html
from django.utils.safestring import mark_safe
2015-05-05 19:42:55 +00:00
from django.utils.translation import ugettext_lazy as _
2015-05-04 19:52:53 +00:00
from orchestra.admin import ExtendedModelAdmin
2015-05-07 19:00:02 +00:00
from orchestra.admin.utils import admin_link, admin_colored, admin_date, wrap_admin_view
from orchestra.contrib.tasks import task
2015-05-04 19:52:53 +00:00
from .actions import last
2015-05-07 19:00:02 +00:00
from .engine import send_pending
2015-05-04 19:52:53 +00:00
from .models import Message, SMTPLog
COLORS = {
2015-05-05 19:42:55 +00:00
Message.QUEUED: 'purple',
Message.SENT: 'green',
Message.DEFERRED: 'darkorange',
Message.FAILED: 'red',
SMTPLog.SUCCESS: 'green',
SMTPLog.FAILURE: 'red',
}
class MessageAdmin(ExtendedModelAdmin):
2015-05-04 19:52:53 +00:00
list_display = (
2015-05-26 12:59:16 +00:00
'display_subject', 'colored_state', 'priority', 'to_address', 'from_address',
2015-08-05 22:58:35 +00:00
'created_at_delta', 'display_retries', 'last_try_delta',
2015-05-04 19:52:53 +00:00
)
2015-05-05 19:42:55 +00:00
list_filter = ('state', 'priority', 'retries')
list_prefetch_related = ('logs',)
2015-08-31 11:58:59 +00:00
search_fields = ('to_address', 'from_address', 'subject',)
fieldsets = (
(None, {
2015-08-05 22:58:35 +00:00
'fields': ('state', 'priority', ('retries', 'last_try_delta', 'created_at_delta'),
'display_full_subject', 'display_from', 'display_to',
'display_content'),
}),
(_("Edit"), {
'classes': ('collapse',),
'fields': ('subject', 'from_address', 'to_address', 'content'),
}),
)
readonly_fields = (
2015-08-05 22:58:35 +00:00
'retries', 'last_try_delta', 'created_at_delta', 'display_full_subject',
'display_to', 'display_from', 'display_content',
)
date_hierarchy = 'created_at'
change_view_actions = (last,)
2015-05-05 19:42:55 +00:00
colored_state = admin_colored('state', colors=COLORS)
created_at_delta = admin_date('created_at')
2015-08-05 22:58:35 +00:00
last_try_delta = admin_date('last_try')
2015-05-07 19:00:02 +00:00
def display_subject(self, instance):
subject = instance.subject
2015-08-05 22:58:35 +00:00
if len(subject) > 64:
return mark_safe(subject[:64] + '…')
return subject
2015-05-07 19:00:02 +00:00
display_subject.short_description = _("Subject")
display_subject.admin_order_field = 'subject'
2015-08-05 22:58:35 +00:00
def display_retries(self, instance):
num_logs = instance.logs__count
if num_logs == 1:
2015-05-07 19:00:02 +00:00
pk = instance.logs.all()[0].id
url = reverse('admin:mailer_smtplog_change', args=(pk,))
else:
url = reverse('admin:mailer_smtplog_changelist')
url += '?&message=%i' % instance.pk
return format_html('<a href="{}" onclick="return showAddAnotherPopup(this);">{}</a>', url, instance.retries)
2015-08-05 22:58:35 +00:00
display_retries.short_description = _("Retries")
display_retries.admin_order_field = 'retries'
def display_content(self, instance):
part = email.message_from_string(instance.content)
payload = part.get_payload()
if isinstance(payload, list):
2015-09-23 12:22:32 +00:00
for cpart in payload:
cpayload = cpart.get_payload()
if cpart.get_content_type().startswith('text/'):
part = cpart
payload = cpayload
if cpart.get_content_type() == 'text/html':
payload = '<div style="padding-left:110px">%s</div>' % payload
# prioritize HTML
break
if part.get('Content-Transfer-Encoding') == 'base64':
payload = base64.b64decode(payload)
2015-09-23 12:22:32 +00:00
charset = part.get_charsets()[0]
if charset:
payload = payload.decode(charset)
if part.get_content_type() == 'text/plain':
payload = payload.replace('\n', '<br>').replace(' ', '&nbsp;')
return mark_safe(payload)
display_content.short_description = _("Content")
def display_full_subject(self, instance):
return instance.subject
display_full_subject.short_description = _("Subject")
def display_from(self, instance):
return instance.from_address
display_from.short_description = _("From")
def display_to(self, instance):
return instance.to_address
display_to.short_description = _("To")
2015-05-07 19:00:02 +00:00
def get_urls(self):
from django.conf.urls import url
urls = super().get_urls()
2015-05-07 19:00:02 +00:00
info = self.model._meta.app_label, self.model._meta.model_name
urls.insert(0,
2015-05-26 12:59:16 +00:00
url(r'^send-pending/$',
wrap_admin_view(self, self.send_pending_view),
name='%s_%s_send_pending' % info)
2015-05-07 19:00:02 +00:00
)
return urls
2015-05-05 19:42:55 +00:00
def get_queryset(self, request):
qs = super().get_queryset(request)
return qs.annotate(Count('logs')).defer('content')
2015-05-07 19:00:02 +00:00
def send_pending_view(self, request):
task(send_pending).apply_async()
self.message_user(request, _("Pending messages are being sent on the background."))
return redirect('..')
2015-05-26 12:59:16 +00:00
def formfield_for_dbfield(self, db_field, **kwargs):
if db_field.name == 'subject':
kwargs['widget'] = forms.TextInput(attrs={'size':'100'})
return super().formfield_for_dbfield(db_field, **kwargs)
2015-05-04 19:52:53 +00:00
2015-05-04 19:52:53 +00:00
class SMTPLogAdmin(admin.ModelAdmin):
list_display = (
2015-05-05 19:42:55 +00:00
'id', 'message_link', 'colored_result', 'date_delta', 'log_message'
2015-05-04 19:52:53 +00:00
)
2015-05-05 19:42:55 +00:00
list_filter = ('result',)
2015-05-07 19:00:02 +00:00
fields = ('message_link', 'colored_result', 'date_delta', 'log_message')
readonly_fields = fields
2015-05-04 19:52:53 +00:00
message_link = admin_link('message')
2015-05-05 19:42:55 +00:00
colored_result = admin_colored('result', colors=COLORS, bold=False)
date_delta = admin_date('date')
2015-05-04 19:52:53 +00:00
admin.site.register(Message, MessageAdmin)
admin.site.register(SMTPLog, SMTPLogAdmin)