Replace SystemUserBackend with UNIXUserController

This commit is contained in:
Santiago L 2021-03-30 13:11:41 +02:00
parent f4c0a7413c
commit c505f9a3c6
2 changed files with 63 additions and 63 deletions

View file

@ -31,35 +31,35 @@ class SystemUserMixin(object):
'orchestra.contrib.orchestration', 'orchestra.contrib.orchestration',
'orcgestra.apps.systemusers', 'orcgestra.apps.systemusers',
) )
def setUp(self): def setUp(self):
super(SystemUserMixin, self).setUp() super(SystemUserMixin, self).setUp()
self.add_route() self.add_route()
djsettings.DEBUG = True djsettings.DEBUG = True
def add_route(self): def add_route(self):
master = Server.objects.create(name=self.MASTER_SERVER) master = Server.objects.create(name=self.MASTER_SERVER)
backend = backends.SystemUserBackend.get_name() backend = backends.UNIXUserController.get_name()
Route.objects.create(backend=backend, match=True, host=master) Route.objects.create(backend=backend, match=True, host=master)
def save(self): def save(self):
raise NotImplementedError raise NotImplementedError
def add(self): def add(self):
raise NotImplementedError raise NotImplementedError
def delete(self): def delete(self):
raise NotImplementedError raise NotImplementedError
def update(self): def update(self):
raise NotImplementedError raise NotImplementedError
def disable(self): def disable(self):
raise NotImplementedError raise NotImplementedError
def add_group(self, username, groupname): def add_group(self, username, groupname):
raise NotImplementedError raise NotImplementedError
def validate_user(self, username): def validate_user(self, username):
idcmd = sshr(self.MASTER_SERVER, "id %s" % username) idcmd = sshr(self.MASTER_SERVER, "id %s" % username)
self.assertEqual(0, idcmd.exit_code) self.assertEqual(0, idcmd.exit_code)
@ -69,7 +69,7 @@ class SystemUserMixin(object):
idgroups = idcmd.stdout.strip().split(' ')[2] idgroups = idcmd.stdout.strip().split(' ')[2]
idgroups = re.findall(r'\d+\((\w+)\)', idgroups) idgroups = re.findall(r'\d+\((\w+)\)', idgroups)
self.assertEqual(set(groups), set(idgroups)) self.assertEqual(set(groups), set(idgroups))
def validate_delete(self, username): def validate_delete(self, username):
self.assertRaises(SystemUser.DoesNotExist, SystemUser.objects.get, username=username) self.assertRaises(SystemUser.DoesNotExist, SystemUser.objects.get, username=username)
self.assertRaises(CommandError, self.assertRaises(CommandError,
@ -81,19 +81,19 @@ class SystemUserMixin(object):
self.assertRaises(CommandError, self.assertRaises(CommandError,
sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/shadow' % username, display=False) sshrun, self.MASTER_SERVER, 'grep "^%s:" /etc/shadow' % username, display=False)
# Home will be deleted on account delete, see test_delete_account # Home will be deleted on account delete, see test_delete_account
def validate_ftp(self, username, password): def validate_ftp(self, username, password):
ftp = ftplib.FTP(self.MASTER_SERVER) ftp = ftplib.FTP(self.MASTER_SERVER)
ftp.login(user=username, passwd=password) ftp.login(user=username, passwd=password)
ftp.close() ftp.close()
def validate_sftp(self, username, password): def validate_sftp(self, username, password):
transport = paramiko.Transport((self.MASTER_SERVER, 22)) transport = paramiko.Transport((self.MASTER_SERVER, 22))
transport.connect(username=username, password=password) transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport) sftp = paramiko.SFTPClient.from_transport(transport)
sftp.listdir() sftp.listdir()
sftp.close() sftp.close()
def validate_ssh(self, username, password): def validate_ssh(self, username, password):
ssh = paramiko.SSHClient() ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
@ -103,14 +103,14 @@ class SystemUserMixin(object):
channel.exec_command('ls') channel.exec_command('ls')
self.assertEqual(0, channel.recv_exit_status()) self.assertEqual(0, channel.recv_exit_status())
channel.close() channel.close()
def test_add(self): def test_add(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
self.add(username, password) self.add(username, password)
self.addCleanup(self.delete, username) self.addCleanup(self.delete, username)
self.validate_user(username) self.validate_user(username)
def test_ftp(self): def test_ftp(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
@ -120,7 +120,7 @@ class SystemUserMixin(object):
self.validate_sftp, username, password) self.validate_sftp, username, password)
self.assertRaises(paramiko.AuthenticationException, self.assertRaises(paramiko.AuthenticationException,
self.validate_ssh, username, password) self.validate_ssh, username, password)
def test_sftp(self): def test_sftp(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
@ -128,14 +128,14 @@ class SystemUserMixin(object):
self.addCleanup(self.delete, username) self.addCleanup(self.delete, username)
self.validate_sftp(username, password) self.validate_sftp(username, password)
self.assertRaises(AssertionError, self.validate_ssh, username, password) self.assertRaises(AssertionError, self.validate_ssh, username, password)
def test_ssh(self): def test_ssh(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
self.add(username, password, shell='/bin/bash') self.add(username, password, shell='/bin/bash')
self.addCleanup(self.delete, username) self.addCleanup(self.delete, username)
self.validate_ssh(username, password) self.validate_ssh(username, password)
def test_delete(self): def test_delete(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%sppppP001' % random_ascii(5) password = '@!?%sppppP001' % random_ascii(5)
@ -144,7 +144,7 @@ class SystemUserMixin(object):
self.delete(username) self.delete(username)
self.validate_delete(username) self.validate_delete(username)
self.assertRaises(Exception, self.delete, self.account.username) self.assertRaises(Exception, self.delete, self.account.username)
def test_add_group(self): def test_add_group(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
@ -161,7 +161,7 @@ class SystemUserMixin(object):
groups = list(user.groups.values_list('username', flat=True)) groups = list(user.groups.values_list('username', flat=True))
self.assertIn(username2, groups) self.assertIn(username2, groups)
self.validate_user(username) self.validate_user(username)
def test_disable(self): def test_disable(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
@ -171,7 +171,7 @@ class SystemUserMixin(object):
self.disable(username) self.disable(username)
self.validate_user(username) self.validate_user(username)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password) self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
def test_change_password(self): def test_change_password(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
password = '@!?%spppP001' % random_ascii(5) password = '@!?%spppP001' % random_ascii(5)
@ -192,38 +192,38 @@ class RESTSystemUserMixin(SystemUserMixin):
# create main user # create main user
self.save(self.account.username) self.save(self.account.username)
self.addCleanup(self.delete_account, self.account.username) self.addCleanup(self.delete_account, self.account.username)
@save_response_on_error @save_response_on_error
def add(self, username, password, shell='/dev/null'): def add(self, username, password, shell='/dev/null'):
self.rest.systemusers.create(username=username, password=password, shell=shell) self.rest.systemusers.create(username=username, password=password, shell=shell)
@save_response_on_error @save_response_on_error
def delete(self, username): def delete(self, username):
user = self.rest.systemusers.retrieve(username=username).get() user = self.rest.systemusers.retrieve(username=username).get()
user.delete() user.delete()
@save_response_on_error @save_response_on_error
def add_group(self, username, groupname): def add_group(self, username, groupname):
user = self.rest.systemusers.retrieve(username=username).get() user = self.rest.systemusers.retrieve(username=username).get()
user.groups.append({'username': groupname}) user.groups.append({'username': groupname})
user.save() user.save()
@save_response_on_error @save_response_on_error
def disable(self, username): def disable(self, username):
user = self.rest.systemusers.retrieve(username=username).get() user = self.rest.systemusers.retrieve(username=username).get()
user.is_active = False user.is_active = False
user.save() user.save()
@save_response_on_error @save_response_on_error
def save(self, username): def save(self, username):
user = self.rest.systemusers.retrieve(username=username).get() user = self.rest.systemusers.retrieve(username=username).get()
user.save() user.save()
@save_response_on_error @save_response_on_error
def change_password(self, username, password): def change_password(self, username, password):
user = self.rest.systemusers.retrieve(username=username).get() user = self.rest.systemusers.retrieve(username=username).get()
user.set_password(password) user.set_password(password)
def delete_account(self, username): def delete_account(self, username):
self.rest.account.delete() self.rest.account.delete()
@ -235,42 +235,42 @@ class AdminSystemUserMixin(SystemUserMixin):
# create main user # create main user
self.save(self.account.username) self.save(self.account.username)
self.addCleanup(self.delete_account, self.account.username) self.addCleanup(self.delete_account, self.account.username)
@snapshot_on_error @snapshot_on_error
def add(self, username, password, shell='/dev/null'): def add(self, username, password, shell='/dev/null'):
url = self.live_server_url + reverse('admin:systemusers_systemuser_add') url = self.live_server_url + reverse('admin:systemusers_systemuser_add')
self.selenium.get(url) self.selenium.get(url)
username_field = self.selenium.find_element_by_id('id_username') username_field = self.selenium.find_element_by_id('id_username')
username_field.send_keys(username) username_field.send_keys(username)
password_field = self.selenium.find_element_by_id('id_password1') password_field = self.selenium.find_element_by_id('id_password1')
password_field.send_keys(password) password_field.send_keys(password)
password_field = self.selenium.find_element_by_id('id_password2') password_field = self.selenium.find_element_by_id('id_password2')
password_field.send_keys(password) password_field.send_keys(password)
shell_input = self.selenium.find_element_by_id('id_shell') shell_input = self.selenium.find_element_by_id('id_shell')
shell_select = Select(shell_input) shell_select = Select(shell_input)
shell_select.select_by_value(shell) shell_select.select_by_value(shell)
username_field.submit() username_field.submit()
self.assertNotEqual(url, self.selenium.current_url) self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error @snapshot_on_error
def delete(self, username): def delete(self, username):
user = SystemUser.objects.get(username=username) user = SystemUser.objects.get(username=username)
self.admin_delete(user) self.admin_delete(user)
@snapshot_on_error @snapshot_on_error
def delete_account(self, username): def delete_account(self, username):
account = Account.objects.get(username=username) account = Account.objects.get(username=username)
self.admin_delete(account) self.admin_delete(account)
@snapshot_on_error @snapshot_on_error
def disable(self, username): def disable(self, username):
user = SystemUser.objects.get(username=username) user = SystemUser.objects.get(username=username)
self.admin_disable(user) self.admin_disable(user)
@snapshot_on_error @snapshot_on_error
def add_group(self, username, groupname): def add_group(self, username, groupname):
user = SystemUser.objects.get(username=username) user = SystemUser.objects.get(username=username)
@ -282,7 +282,7 @@ class AdminSystemUserMixin(SystemUserMixin):
save = self.selenium.find_element_by_name('_save') save = self.selenium.find_element_by_name('_save')
save.submit() save.submit()
self.assertNotEqual(url, self.selenium.current_url) self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error @snapshot_on_error
def save(self, username): def save(self, username):
user = SystemUser.objects.get(username=username) user = SystemUser.objects.get(username=username)
@ -291,7 +291,7 @@ class AdminSystemUserMixin(SystemUserMixin):
save = self.selenium.find_element_by_name('_save') save = self.selenium.find_element_by_name('_save')
save.submit() save.submit()
self.assertNotEqual(url, self.selenium.current_url) self.assertNotEqual(url, self.selenium.current_url)
@snapshot_on_error @snapshot_on_error
def change_password(self, username, password): def change_password(self, username, password):
user = SystemUser.objects.get(username=username) user = SystemUser.objects.get(username=username)
@ -307,37 +307,37 @@ class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
def test_create_account(self): def test_create_account(self):
url = self.live_server_url + reverse('admin:accounts_account_add') url = self.live_server_url + reverse('admin:accounts_account_add')
self.selenium.get(url) self.selenium.get(url)
account_username = '%s_account' % random_ascii(10) account_username = '%s_account' % random_ascii(10)
username = self.selenium.find_element_by_id('id_username') username = self.selenium.find_element_by_id('id_username')
username.send_keys(account_username) username.send_keys(account_username)
account_password = '@!?%spppP001' % random_ascii(5) account_password = '@!?%spppP001' % random_ascii(5)
password = self.selenium.find_element_by_id('id_password1') password = self.selenium.find_element_by_id('id_password1')
password.send_keys(account_password) password.send_keys(account_password)
password = self.selenium.find_element_by_id('id_password2') password = self.selenium.find_element_by_id('id_password2')
password.send_keys(account_password) password.send_keys(account_password)
full_name = random_ascii(10) full_name = random_ascii(10)
full_name_field = self.selenium.find_element_by_id('id_full_name') full_name_field = self.selenium.find_element_by_id('id_full_name')
full_name_field.send_keys(full_name) full_name_field.send_keys(full_name)
account_email = 'orchestra@orchestra.lan' account_email = 'orchestra@orchestra.lan'
email = self.selenium.find_element_by_id('id_email') email = self.selenium.find_element_by_id('id_email')
email.send_keys(account_email) email.send_keys(account_email)
contact_short_name = random_ascii(10) contact_short_name = random_ascii(10)
short_name = self.selenium.find_element_by_id('id_contacts-0-short_name') short_name = self.selenium.find_element_by_id('id_contacts-0-short_name')
short_name.send_keys(contact_short_name) short_name.send_keys(contact_short_name)
email = self.selenium.find_element_by_id('id_contacts-0-email') email = self.selenium.find_element_by_id('id_contacts-0-email')
email.send_keys(account_email) email.send_keys(account_email)
email.submit() email.submit()
self.assertNotEqual(url, self.selenium.current_url) self.assertNotEqual(url, self.selenium.current_url)
self.addCleanup(self.delete_account, account_username) self.addCleanup(self.delete_account, account_username)
self.assertEqual(0, sshr(self.MASTER_SERVER, "id %s" % account_username).exit_code) self.assertEqual(0, sshr(self.MASTER_SERVER, "id %s" % account_username).exit_code)
@snapshot_on_error @snapshot_on_error
def test_delete_account(self): def test_delete_account(self):
home = self.account.main_systemuser.get_home() home = self.account.main_systemuser.get_home()
@ -347,7 +347,7 @@ class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
self.account = self.create_account(username=self.account.username, superuser=True) self.account = self.create_account(username=self.account.username, superuser=True)
self.selenium.delete_all_cookies() self.selenium.delete_all_cookies()
self.admin_login() self.admin_login()
@snapshot_on_error @snapshot_on_error
def test_disable_account(self): def test_disable_account(self):
username = '%s_systemuser' % random_ascii(10) username = '%s_systemuser' % random_ascii(10)
@ -357,18 +357,18 @@ class AdminSystemUserTest(AdminSystemUserMixin, BaseLiveServerTestCase):
self.validate_ftp(username, password) self.validate_ftp(username, password)
self.disable(username) self.disable(username)
self.validate_user(username) self.validate_user(username)
disable = reverse('admin:accounts_account_disable', args=(self.account.pk,)) disable = reverse('admin:accounts_account_disable', args=(self.account.pk,))
url = self.live_server_url + disable url = self.live_server_url + disable
self.selenium.get(url) self.selenium.get(url)
confirmation = self.selenium.find_element_by_name('post') confirmation = self.selenium.find_element_by_name('post')
confirmation.submit() confirmation.submit()
self.assertNotEqual(url, self.selenium.current_url) self.assertNotEqual(url, self.selenium.current_url)
self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password) self.assertRaises(ftplib.error_perm, self.validate_ftp, username, password)
self.selenium.get(url) self.selenium.get(url)
self.assertNotEqual(url, self.selenium.current_url) self.assertNotEqual(url, self.selenium.current_url)
# Reenable for test cleanup # Reenable for test cleanup
self.account.is_active = True self.account.is_active = True
self.account.save() self.account.save()

View file

@ -5,7 +5,7 @@ from io import StringIO
from django.conf import settings as djsettings from django.conf import settings as djsettings
from orchestra.contrib.orchestration.models import Server, Route from orchestra.contrib.orchestration.models import Server, Route
from orchestra.contrib.systemusers.backends import SystemUserBackend from orchestra.contrib.systemusers.backends import UNIXUserController
from orchestra.utils.tests import BaseLiveServerTestCase, random_ascii, snapshot_on_error, save_response_on_error from orchestra.utils.tests import BaseLiveServerTestCase, random_ascii, snapshot_on_error, save_response_on_error
from ... import backends from ... import backends
@ -18,20 +18,20 @@ class WebAppMixin(object):
'orchestra.contrib.systemusers', 'orchestra.contrib.systemusers',
'orchestra.contrib.webapps', 'orchestra.contrib.webapps',
) )
def setUp(self): def setUp(self):
super(WebAppMixin, self).setUp() super(WebAppMixin, self).setUp()
self.add_route() self.add_route()
djsettings.DEBUG = True djsettings.DEBUG = True
def add_route(self): def add_route(self):
server, __ = Server.objects.get_or_create(name=self.MASTER_SERVER) server, __ = Server.objects.get_or_create(name=self.MASTER_SERVER)
backend = SystemUserBackend.get_name() backend = UNIXUserController.get_name()
Route.objects.get_or_create(backend=backend, match=True, host=server) Route.objects.get_or_create(backend=backend, match=True, host=server)
backend = self.backend.get_name() backend = self.backend.get_name()
match = 'webapp.type == "%s"' % self.type_value match = 'webapp.type == "%s"' % self.type_value
Route.objects.create(backend=backend, match=match, host=server) Route.objects.create(backend=backend, match=match, host=server)
def upload_webapp(self, name): def upload_webapp(self, name):
try: try:
ftp = ftplib.FTP(self.MASTER_SERVER) ftp = ftplib.FTP(self.MASTER_SERVER)
@ -44,7 +44,7 @@ class WebAppMixin(object):
index.close() index.close()
finally: finally:
ftp.close() ftp.close()
def test_add(self): def test_add(self):
name = '%s_%s_webapp' % (random_ascii(10), self.type_value) name = '%s_%s_webapp' % (random_ascii(10), self.type_value)
self.add_webapp(name) self.add_webapp(name)
@ -85,16 +85,16 @@ class RESTWebAppMixin(object):
self.rest_login() self.rest_login()
# create main user # create main user
self.save_systemuser() self.save_systemuser()
@save_response_on_error @save_response_on_error
def save_systemuser(self): def save_systemuser(self):
systemuser = self.rest.systemusers.retrieve().get() systemuser = self.rest.systemusers.retrieve().get()
systemuser.update(is_active=True) systemuser.update(is_active=True)
@save_response_on_error @save_response_on_error
def add_webapp(self, name, options=[]): def add_webapp(self, name, options=[]):
self.rest.webapps.create(name=name, type=self.type_value, options=options) self.rest.webapps.create(name=name, type=self.type_value, options=options)
@save_response_on_error @save_response_on_error
def delete_webapp(self, name): def delete_webapp(self, name):
self.rest.webapps.retrieve(name=name).delete() self.rest.webapps.retrieve(name=name).delete()
@ -106,11 +106,11 @@ class AdminWebAppMixin(WebAppMixin):
self.admin_login() self.admin_login()
# create main user # create main user
self.save_systemuser() self.save_systemuser()
@snapshot_on_error @snapshot_on_error
def save_systemuser(self): def save_systemuser(self):
url = '' url = ''
@snapshot_on_error @snapshot_on_error
def add(self, name, password, admin_email): def add(self, name, password, admin_email):
pass pass