django-orchestra-test/orchestra/apps/systemusers/tests/functional_tests/tests.py

336 lines
13 KiB
Python

import ftplib
import os
import re
import socket
from functools import partial
import paramiko
from django.conf import settings as djsettings
from django.core.management.base import CommandError
from django.core.urlresolvers import reverse
from selenium.webdriver.support.select import Select
from orchestra.apps.accounts.models import Account
from orchestra.apps.orchestration.models import Server, Route
from orchestra.utils.system import run, sshrun
from orchestra.utils.tests import BaseLiveServerTestCase, random_ascii, snapshot_on_error
from ... import backends, settings
from ...models import SystemUser
r = partial(run, silent=True, display=False)
sshr = partial(sshrun, silent=True, display=False)
class SystemUserMixin(object):
MASTER_SERVER = os.environ.get('ORCHESTRA_MASTER_SERVER', 'localhost')
DEPENDENCIES = (
'orchestra.apps.orchestration',
'orcgestra.apps.systemusers',
)
def setUp(self):
super(SystemUserMixin, self).setUp()
self.add_route()
djsettings.DEBUG = True
def add_route(self):
master = Server.objects.create(name=self.MASTER_SERVER)
backend = backends.SystemUserBackend.get_name()
Route.objects.create(backend=backend, match=True, host=master)
def save(self):
raise NotImplementedError
def add(self):
raise NotImplementedError
def delete(self):
raise NotImplementedError
def update(self):
raise NotImplementedError
def disable(self):
raise NotImplementedError
def add_group(self, username, groupname):
raise NotImplementedError
def validate_user(self, username):
idcmd = sshr(self.MASTER_SERVER, "id %s" % username)
self.assertEqual(0, idcmd.return_code)
user = SystemUser.objects.get(username=username)
groups = list(user.groups.values_list('username', flat=True))
groups.append(user.username)
idgroups = idcmd.stdout.strip().split(' ')[2]
idgroups = re.findall(r'\d+\((\w+)\)', idgroups)
self.assertEqual(set(groups), set(idgroups))
def validate_delete(self, username):
self.assertRaises(SystemUser.DoesNotExist, SystemUser.objects.get, username=username)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER,'id %s' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/groups' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/passwd' % username, display=False)
self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/shadow' % username, display=False)
def validate_ftp(self, username, password):
connection = ftplib.FTP(self.MASTER_SERVER)
connection.login(user=username, passwd=password)
connection.close()
def validate_sftp(self, username, password):
transport = paramiko.Transport((self.MASTER_SERVER, 22))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.listdir()
sftp.close()
def validate_ssh(self, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.MASTER_SERVER, username=username, password=password)
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('ls')
self.assertEqual(0, channel.recv_exit_status())
channel.close()
def test_create(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(partial(self.delete, username))
self.validate_user(username)
def test_ftp(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/dev/null')
self.addCleanup(partial(self.delete, username))
self.assertRaises(paramiko.AuthenticationException, self.validate_sftp, username, password)
self.assertRaises(paramiko.AuthenticationException, self.validate_ssh, username, password)
def test_sftp(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/rssh')
self.addCleanup(partial(self.delete, username))
self.validate_sftp(username, password)
self.assertRaises(AssertionError, self.validate_ssh, username, password)
def test_ssh(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/bash')
self.addCleanup(partial(self.delete, username))
self.validate_ssh(username, password)
def test_delete(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%sppppP001' % random_ascii(5)
self.add(username, password)
self.validate_user(username)
self.delete(username)
self.validate_delete(username)
def test_add_group(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password)
self.addCleanup(partial(self.delete, username))
self.validate_user(username)
username2 = '%s_systemuser' % random_ascii(10)
password2 = '@!?%spppP001' % random_ascii(5)
self.add(username2, password2)
self.addCleanup(partial(self.delete, username2))
self.validate_user(username2)
self.add_group(username, username2)
user = SystemUser.objects.get(username=username)
groups = list(user.groups.values_list('username', flat=True))
self.assertIn(username2, groups)
self.validate_user(username)
def test_disable(self):
username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/dev/null')
self.addCleanup(partial(self.delete, username))
self.validate_ftp(username, password)
self.disable(username)
self.validate_user(username)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
def test_change_password(self):
pass
# TODO
class RESTSystemUserMixin(SystemUserMixin):
def setUp(self):
super(RESTSystemUserMixin, self).setUp()
self.rest_login()
# create main user
self.save(self.account.username)
self.addCleanup(partial(self.delete, self.account.username))
def add(self, username, password, shell='/dev/null'):
self.rest.systemusers.create(username=username, password=password, shell=shell)
def delete(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.delete()
def add_group(self, username, groupname):
user = self.rest.systemusers.retrieve(username=username).get()
group = self.rest.systemusers.retrieve(username=groupname).get()
user.groups.append(group) # TODO how to do it with the api?
user.save()
def disable(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.is_active = False
user.save()
def save(self, username):
user = self.rest.systemusers.retrieve(username=username).get()
user.save()
class AdminSystemUserMixin(SystemUserMixin):
def setUp(self):
super(AdminSystemUserMixin, self).setUp()
self.admin_login()
# create main user
self.save(self.account.username)
self.addCleanup(partial(self.delete, self.account.username))
@snapshot_on_error
def add(self, username, password, shell='/dev/null'):
url = self.live_server_url + reverse('admin:systemusers_systemuser_add')
self.selenium.get(url)
username_field = self.selenium.find_element_by_id('id_username')
username_field.send_keys(username)
password_field = self.selenium.find_element_by_id('id_password1')
password_field.send_keys(password)
password_field = self.selenium.find_element_by_id('id_password2')
password_field.send_keys(password)
account_input = self.selenium.find_element_by_id('id_account')
account_select = Select(account_input)
account_select.select_by_value(str(self.account.pk))
shell_input = self.selenium.find_element_by_id('id_shell')
shell_select = Select(shell_input)
shell_select.select_by_value(shell)
username_field.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def delete(self, username):
user = SystemUser.objects.get(username=username)
delete = reverse('admin:systemusers_systemuser_delete', args=(user.pk,))
url = self.live_server_url + delete
self.selenium.get(url)
confirmation = self.selenium.find_element_by_name('post')
confirmation.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def disable(self, username):
user = SystemUser.objects.get(username=username)
change = reverse('admin:systemusers_systemuser_change', args=(user.pk,))
url = self.live_server_url + change
self.selenium.get(url)
is_active = self.selenium.find_element_by_id('id_is_active')
is_active.click()
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def add_group(self, username, groupname):
user = SystemUser.objects.get(username=username)
change = reverse('admin:systemusers_systemuser_change', args=(user.pk,))
url = self.live_server_url + change
self.selenium.get(url)
groups = self.selenium.find_element_by_id('id_groups_add_all_link')
groups.click()
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error
def save(self, username):
user = SystemUser.objects.get(username=username)
change = reverse('admin:systemusers_systemuser_change', args=(user.pk,))
url = self.live_server_url + change
self.selenium.get(url)
save = self.selenium.find_element_by_name('_save')
save.submit()
self.assertNotEqual(url, self.selenium.current_url)
class RESTSystemUserTest(RESTSystemUserMixin, BaseLiveServerTestCase):
pass
class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
@snapshot_on_error
def test_create_account(self):
url = self.live_server_url + reverse('admin:accounts_account_add')
self.selenium.get(url)
account_username = '%s_account' % random_ascii(10)
username = self.selenium.find_element_by_id('id_username')
username.send_keys(account_username)
account_password = '@!?%spppP001' % random_ascii(5)
password = self.selenium.find_element_by_id('id_password1')
password.send_keys(account_password)
password = self.selenium.find_element_by_id('id_password2')
password.send_keys(account_password)
account_email = 'orchestra@orchestra.lan'
email = self.selenium.find_element_by_id('id_email')
email.send_keys(account_email)
contact_short_name = random_ascii(10)
short_name = self.selenium.find_element_by_id('id_contacts-0-short_name')
short_name.send_keys(contact_short_name)
email = self.selenium.find_element_by_id('id_contacts-0-email')
email.send_keys(account_email)
email.submit()
account = Account.objects.get(username=account_username)
self.addCleanup(account.delete)
self.assertNotEqual(url, self.selenium.current_url)
self.assertEqual(0, sshr(self.MASTER_SERVER, "id %s" % account.username).return_code)
@snapshot_on_error
def test_delete_account(self):
home = self.account.systemusers.get(is_main=True).get_home()
delete = reverse('admin:accounts_account_delete', args=(self.account.pk,))
url = self.live_server_url + delete
self.selenium.get(url)
confirmation = self.selenium.find_element_by_name('post')
confirmation.submit()
self.assertNotEqual(url, self.selenium.current_url)
self.assertRaises(CommandError, run, 'ls %s' % home, display=False)
# Recreate a fucking fake account for test cleanup
self.account = self.create_account(username=self.account.username, superuser=True)
self.selenium.delete_all_cookies()
self.admin_login()