Resource monitoring implementation
This commit is contained in:
parent
9b9abc3c91
commit
cc445559d0
|
@ -2,6 +2,8 @@ from django.conf import settings as django_settings
|
|||
from django.db import models
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from orchestra.core import services
|
||||
|
||||
from . import settings
|
||||
|
||||
|
||||
|
@ -23,3 +25,6 @@ class Account(models.Model):
|
|||
def name(self):
|
||||
self._cached_name = getattr(self, '_cached_name', self.user.username)
|
||||
return self._cached_name
|
||||
|
||||
|
||||
services.register(Account, menu=False)
|
||||
|
|
|
@ -30,7 +30,7 @@ class RouteAdmin(admin.ModelAdmin):
|
|||
|
||||
def display_model(self, route):
|
||||
try:
|
||||
return route.get_backend().model
|
||||
return route.backend_class().model
|
||||
except KeyError:
|
||||
return "<span style='color: red;'>NOT AVAILABLE</span>"
|
||||
display_model.short_description = _("model")
|
||||
|
@ -38,7 +38,7 @@ class RouteAdmin(admin.ModelAdmin):
|
|||
|
||||
def display_actions(self, route):
|
||||
try:
|
||||
return '<br>'.join(route.get_backend().get_actions())
|
||||
return '<br>'.join(route.backend_class().get_actions())
|
||||
except KeyError:
|
||||
return "<span style='color: red;'>NOT AVAILABLE</span>"
|
||||
display_actions.short_description = _("actions")
|
||||
|
|
|
@ -67,6 +67,13 @@ class ServiceBackend(object):
|
|||
def get_backends(cls):
|
||||
return cls.plugins
|
||||
|
||||
@classmethod
|
||||
def get_backend(cls, name):
|
||||
for backend in ServiceMonitor.get_backends():
|
||||
if backend.get_name() == name:
|
||||
return backend
|
||||
raise KeyError('This backend is not registered')
|
||||
|
||||
@classmethod
|
||||
def get_choices(cls):
|
||||
backends = cls.get_backends()
|
||||
|
|
|
@ -5,6 +5,7 @@ from django.db import models
|
|||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from orchestra.utils.apps import autodiscover
|
||||
from orchestra.utils.functional import cached
|
||||
|
||||
from . import settings, manager
|
||||
from .backends import ServiceBackend
|
||||
|
@ -145,31 +146,40 @@ class Route(models.Model):
|
|||
# msg = _("%s backend is not compatible with %s method")
|
||||
# raise ValidationError(msg % (self.backend, self.method)
|
||||
|
||||
@classmethod
|
||||
@cached
|
||||
def get_routing_table(cls):
|
||||
table = {}
|
||||
for route in cls.objects.filter(is_active=True):
|
||||
for action in route.backend_class().get_actions():
|
||||
key = (route.backend, action)
|
||||
try:
|
||||
table[key].append(route)
|
||||
except KeyError:
|
||||
table[key] = [route]
|
||||
return table
|
||||
|
||||
@classmethod
|
||||
def get_servers(cls, operation):
|
||||
# TODO use cached data sctructure and refactor
|
||||
backend = operation.backend
|
||||
table = cls.get_routing_table()
|
||||
servers = []
|
||||
key = (operation.backend.get_name(), operation.action)
|
||||
try:
|
||||
routes = cls.objects.filter(is_active=True, backend=backend.get_name())
|
||||
except cls.DoesNotExist:
|
||||
routes = table[key]
|
||||
except KeyError:
|
||||
return servers
|
||||
safe_locals = {
|
||||
'instance': operation.instance
|
||||
}
|
||||
actions = backend.get_actions()
|
||||
for route in routes:
|
||||
if operation.action in actions and eval(route.match, safe_locals):
|
||||
if eval(route.match, safe_locals):
|
||||
servers.append(route.host)
|
||||
return servers
|
||||
|
||||
def get_backend(self):
|
||||
for backend in ServiceBackend.get_backends():
|
||||
if backend.get_name() == self.backend:
|
||||
return backend
|
||||
raise KeyError('This backend is not registered')
|
||||
def backend_class(self):
|
||||
return ServiceBackend.get_backend(self.backend)
|
||||
|
||||
# def get_method_class(self):
|
||||
# def method_class(self):
|
||||
# for method in MethodBackend.get_backends():
|
||||
# if method.get_name() == self.method:
|
||||
# return method
|
||||
|
|
|
@ -5,6 +5,7 @@ from django.utils.translation import ugettext_lazy as _
|
|||
|
||||
from orchestra.admin.filters import UsedContentTypeFilter
|
||||
from orchestra.admin.utils import insertattr, get_modeladmin
|
||||
from orchestra.core import services
|
||||
from orchestra.utils import running_syncdb
|
||||
|
||||
from .forms import ResourceForm
|
||||
|
@ -12,12 +13,14 @@ from .models import Resource, ResourceData, MonitorData
|
|||
|
||||
|
||||
class ResourceAdmin(admin.ModelAdmin):
|
||||
# TODO warning message server/celery should be restarted when creating things
|
||||
|
||||
list_display = (
|
||||
'name', 'verbose_name', 'content_type', 'period', 'ondemand',
|
||||
'default_allocation', 'disable_trigger'
|
||||
)
|
||||
list_filter = (UsedContentTypeFilter, 'period', 'ondemand', 'disable_trigger')
|
||||
|
||||
|
||||
def save_model(self, request, obj, form, change):
|
||||
super(ResourceAdmin, self).save_model(request, obj, form, change)
|
||||
model = obj.content_type.model_class()
|
||||
|
@ -29,6 +32,13 @@ class ResourceAdmin(admin.ModelAdmin):
|
|||
inline = resource_inline_factory(resources)
|
||||
inlines.append(inline)
|
||||
modeladmin.inlines = inlines
|
||||
|
||||
def formfield_for_dbfield(self, db_field, **kwargs):
|
||||
""" filter service content_types """
|
||||
if db_field.name == 'content_type':
|
||||
models = [ model._meta.model_name for model in services.get().keys() ]
|
||||
kwargs['queryset'] = db_field.rel.to.objects.filter(model__in=models)
|
||||
return super(ResourceAdmin, self).formfield_for_dbfield(db_field, **kwargs)
|
||||
|
||||
|
||||
class ResourceDataAdmin(admin.ModelAdmin):
|
||||
|
|
|
@ -9,10 +9,6 @@ class ResourcesConfig(AppConfig):
|
|||
verbose_name = 'Resources'
|
||||
|
||||
def ready(self):
|
||||
from .models import Resource
|
||||
# TODO execute on Resource.save()
|
||||
if not running_syncdb():
|
||||
relation = generic.GenericRelation('resources.ResourceData')
|
||||
for resources in Resource.group_by_content_type():
|
||||
model = resources[0].content_type.model_class()
|
||||
model.add_to_class('resources', relation)
|
||||
from .models import create_resource_relation
|
||||
create_resource_relation()
|
||||
|
|
|
@ -9,7 +9,7 @@ class ResourceForm(forms.ModelForm):
|
|||
required=False)
|
||||
used = forms.IntegerField(label=_("Used"), widget=ShowTextWidget(),
|
||||
required=False)
|
||||
last_update = forms.CharField(label=_("Last update"), widget=ShowTextWidget(),
|
||||
last_update = forms.DateTimeField(label=_("Last update"), widget=ShowTextWidget(),
|
||||
required=False)
|
||||
allocated = forms.IntegerField(label=_("Allocated"))
|
||||
|
||||
|
@ -21,7 +21,7 @@ class ResourceForm(forms.ModelForm):
|
|||
super(ResourceForm, self).__init__(*args, **kwargs)
|
||||
if self.resource:
|
||||
self.fields['verbose_name'].initial = self.resource.verbose_name
|
||||
self.fields['used'].initial = self.resource.get_current()
|
||||
self.fields['used'].initial = self.resource.get_used() # TODO
|
||||
if self.resource.ondemand:
|
||||
self.fields['allocated'].required = False
|
||||
self.fields['allocated'].widget = ReadOnlyWidget(None, '')
|
||||
|
|
|
@ -42,12 +42,35 @@ class Resource(models.Model):
|
|||
null=True, blank=True)
|
||||
is_active = models.BooleanField(_("is active"), default=True)
|
||||
disable_trigger = models.BooleanField(_("disable trigger"), default=False)
|
||||
crontab = models.ForeignKey(CrontabSchedule, verbose_name=_("crontab"),
|
||||
help_text=_("Crontab for periodic execution"))
|
||||
# TODO create custom field that returns backend python objects
|
||||
monitors = MultiSelectField(_("monitors"), max_length=256,
|
||||
choices=ServiceMonitor.get_choices())
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
super(Resource, self).save(*args, **kwargs)
|
||||
# Create Celery periodic task
|
||||
name = 'monitor.%s' % str(self)
|
||||
try:
|
||||
task = PeriodicTask.objects.get(name=name)
|
||||
except PeriodicTask.DoesNotExist:
|
||||
PeriodicTask.objects.create(name=name, task='resources.Monitor',
|
||||
args=[self.pk], crontab=self.crontab)
|
||||
else:
|
||||
if task.crontab != self.crontab:
|
||||
task.crontab = self.crontab
|
||||
task.save()
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
super(Resource, self).delete(*args, **kwargs)
|
||||
name = 'monitor.%s' % str(self)
|
||||
PeriodicTask.objects.filter(name=name, task='resources.Monitor',
|
||||
args=[self.pk]).delete()
|
||||
|
||||
@classmethod
|
||||
def group_by_content_type(cls):
|
||||
prev = None
|
||||
|
@ -63,14 +86,40 @@ class Resource(models.Model):
|
|||
prev = ct
|
||||
if group:
|
||||
yield group
|
||||
|
||||
|
||||
class ResourceData(models.Model):
|
||||
""" Stores computed resource usage and allocation """
|
||||
resource = models.ForeignKey(Resource, related_name='dataset')
|
||||
content_type = models.ForeignKey(ContentType)
|
||||
object_id = models.PositiveIntegerField()
|
||||
used = models.PositiveIntegerField(null=True)
|
||||
last_update = models.DateTimeField(null=True)
|
||||
allocated = models.PositiveIntegerField(null=True)
|
||||
|
||||
def get_current(self):
|
||||
content_object = generic.GenericForeignKey()
|
||||
|
||||
class Meta:
|
||||
unique_together = ('resource', 'content_type', 'object_id')
|
||||
verbose_name_plural = _("resource data")
|
||||
|
||||
@classmethod
|
||||
def get_or_create(cls, obj, resource):
|
||||
try:
|
||||
return cls.objects.get(content_object=obj, resource=resource)
|
||||
except cls.DoesNotExists:
|
||||
return cls.objects.create(content_object=obj, resource=resource,
|
||||
allocated=resource.defalt_allocation)
|
||||
|
||||
def get_used(self):
|
||||
resource = self.resource
|
||||
today = datetime.date.today()
|
||||
result = 0
|
||||
has_result = False
|
||||
for monitor in self.monitors:
|
||||
dataset = MonitorData.objects.filter(monitor=monitor)
|
||||
if self.period == self.MONTHLY_AVG:
|
||||
for monitor in resource.monitors:
|
||||
dataset = MonitorData.objects.filter(monitor=monitor,
|
||||
content_type=self.content_type, object_id=self.object_id)
|
||||
if resource.period == resource.MONTHLY_AVG:
|
||||
try:
|
||||
last = dataset.latest()
|
||||
except MonitorData.DoesNotExist:
|
||||
|
@ -83,14 +132,14 @@ class Resource(models.Model):
|
|||
for data in dataset:
|
||||
slot = (previous-data.date).total_seconds()
|
||||
result += data.value * slot/total
|
||||
elif self.period == self.MONTHLY_SUM:
|
||||
elif resource.period == resource.MONTHLY_SUM:
|
||||
data = dataset.filter(date__year=today.year,
|
||||
date__month=today.month)
|
||||
value = data.aggregate(models.Sum('value'))['value__sum']
|
||||
if value:
|
||||
has_result = True
|
||||
result += value
|
||||
elif self.period == self.LAST:
|
||||
elif resource.period == resource.LAST:
|
||||
try:
|
||||
result += dataset.latest().value
|
||||
except MonitorData.DoesNotExist:
|
||||
|
@ -101,21 +150,6 @@ class Resource(models.Model):
|
|||
return result if has_result else None
|
||||
|
||||
|
||||
class ResourceData(models.Model):
|
||||
""" Stores computed resource usage and allocation """
|
||||
resource = models.ForeignKey(Resource)
|
||||
content_type = models.ForeignKey(ContentType)
|
||||
object_id = models.PositiveIntegerField()
|
||||
used = models.PositiveIntegerField(null=True)
|
||||
last_update = models.DateTimeField(null=True)
|
||||
allocated = models.PositiveIntegerField(null=True)
|
||||
|
||||
content_object = generic.GenericForeignKey()
|
||||
|
||||
class Meta:
|
||||
unique_together = ('resource', 'content_type', 'object_id')
|
||||
verbose_name_plural = _("resource data")
|
||||
|
||||
class MonitorData(models.Model):
|
||||
""" Stores monitored data """
|
||||
monitor = models.CharField(_("monitor"), max_length=256,
|
||||
|
@ -133,3 +167,10 @@ class MonitorData(models.Model):
|
|||
|
||||
def __unicode__(self):
|
||||
return str(self.monitor)
|
||||
|
||||
|
||||
def create_resource_relation():
|
||||
relation = generic.GenericRelation('resources.ResourceData')
|
||||
for resources in Resource.group_by_content_type():
|
||||
model = resources[0].content_type.model_class()
|
||||
model.add_to_class('resources', relation)
|
||||
|
|
|
@ -1,14 +1,38 @@
|
|||
from celery import shared_task
|
||||
|
||||
from orchestra.apps.orchestration.models import BackendOperation as Operation
|
||||
|
||||
from .backends import ServiceMonitor
|
||||
from .models import MonitorData
|
||||
|
||||
|
||||
@shared_task
|
||||
def monitor(backend_name):
|
||||
routes = Route.objects.filter(is_active=True, backend=backend_name)
|
||||
for route in routes:
|
||||
pass
|
||||
for backend in ServiceMonitor.get_backends():
|
||||
if backend.get_name() == backend_name:
|
||||
# TODO execute monitor BackendOperation
|
||||
pass
|
||||
@shared_task(name='resources.Monitor')
|
||||
def monitor(resource_id):
|
||||
resource = Resource.objects.get(pk=resource_id)
|
||||
|
||||
# Execute monitors
|
||||
for monitor_name in resource.monitors:
|
||||
backend = ServiceMonitor.get_backend(monitor_name)
|
||||
model = backend.model
|
||||
operations = []
|
||||
# Execute monitor
|
||||
for obj in model.objects.all():
|
||||
operations.append(Operation.create(backend, obj, Operation.MONITOR))
|
||||
Operation.execute(operations)
|
||||
|
||||
# Update used resources and trigger resource exceeded and revovery
|
||||
operations = []
|
||||
model = resource.model
|
||||
for obj in model.objects.all():
|
||||
data = MonitorData.get_or_create(obj, resource)
|
||||
current = data.get_used()
|
||||
if data.used < data.allocated and current > data.allocated:
|
||||
op = Operation.create(backend, data.content_object, Operation.EXCEED)
|
||||
operations.append(op)
|
||||
elif res.used > res.allocated and current < res.allocated:
|
||||
op = Operation.create(backend, data.content_object, Operation.RECOVERY)
|
||||
operation.append(op)
|
||||
data.used = current
|
||||
data.las_update = datetime.now()
|
||||
data.save()
|
||||
Operation.execute(operations)
|
||||
|
|
|
@ -15,51 +15,51 @@ class OrderedSet(collections.MutableSet):
|
|||
self.map = {} # key --> [key, prev, next]
|
||||
if iterable is not None:
|
||||
self |= iterable
|
||||
|
||||
|
||||
def __len__(self):
|
||||
return len(self.map)
|
||||
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.map
|
||||
|
||||
|
||||
def add(self, key):
|
||||
if key not in self.map:
|
||||
end = self.end
|
||||
curr = end[1]
|
||||
curr[2] = end[1] = self.map[key] = [key, curr, end]
|
||||
|
||||
|
||||
def discard(self, key):
|
||||
if key in self.map:
|
||||
if key in self.map:
|
||||
key, prev, next = self.map.pop(key)
|
||||
prev[2] = next
|
||||
next[1] = prev
|
||||
|
||||
|
||||
def __iter__(self):
|
||||
end = self.end
|
||||
curr = end[2]
|
||||
while curr is not end:
|
||||
yield curr[0]
|
||||
curr = curr[2]
|
||||
|
||||
|
||||
def __reversed__(self):
|
||||
end = self.end
|
||||
curr = end[1]
|
||||
while curr is not end:
|
||||
yield curr[0]
|
||||
curr = curr[1]
|
||||
|
||||
|
||||
def pop(self, last=True):
|
||||
if not self:
|
||||
raise KeyError('set is empty')
|
||||
key = self.end[1][0] if last else self.end[2][0]
|
||||
self.discard(key)
|
||||
return key
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
if not self:
|
||||
return '%s()' % (self.__class__.__name__,)
|
||||
return '%s(%r)' % (self.__class__.__name__, list(self))
|
||||
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, OrderedSet):
|
||||
return len(self) == len(other) and list(self) == list(other)
|
||||
|
|
Loading…
Reference in a new issue