461 lines
19 KiB
Python
461 lines
19 KiB
Python
|
#!/usr/bin/python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
#
|
||
|
# create certificate (use for test)
|
||
|
# Copyright (C) 2017 AleaJactaEst
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU Affero General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU Affero General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Affero General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
import unittest
|
||
|
import tempfile
|
||
|
import os
|
||
|
import configparser
|
||
|
import multiprocessing
|
||
|
import time
|
||
|
import re
|
||
|
import queue
|
||
|
|
||
|
try:
|
||
|
import pymanager.manager as Manager
|
||
|
except ImportError:
|
||
|
import sys
|
||
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
import pymanager.manager as Manager
|
||
|
#try:
|
||
|
# import pymanager.certificate as cert
|
||
|
#except ImportError:
|
||
|
# import sys
|
||
|
# import os
|
||
|
# sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
# import pymanager.certificate as cert
|
||
|
|
||
|
class TestManager(unittest.TestCase):
|
||
|
def setUp(self):
|
||
|
self.openssl = '/usr/bin/openssl'
|
||
|
self.size_root = 4096
|
||
|
self.size_appli = 4096
|
||
|
self.size_child = 2048
|
||
|
self.passroot = 'BadPasswordRoot'
|
||
|
self.passappli = 'BadPasswordApplication'
|
||
|
self.country_name = 'FR'
|
||
|
self.state_or_province_name = 'France'
|
||
|
self.locality_name = 'Paris'
|
||
|
self.organization_name = 'khanat'
|
||
|
self.common_name = 'khanat'
|
||
|
self.path = os.path.dirname(os.path.abspath(__file__))
|
||
|
self.program = os.path.join(self.path, 'simulate_program.py')
|
||
|
self.badprogram = os.path.join(self.path, 'test.cfg')
|
||
|
|
||
|
def testMain(self):
|
||
|
logfile = tempfile.NamedTemporaryFile(suffix=".log")
|
||
|
workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-')
|
||
|
workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-')
|
||
|
|
||
|
# cert.root(logfile,
|
||
|
# 'DEBUG',
|
||
|
# False,
|
||
|
# workdir_cert_root,
|
||
|
# workdir_cert_appli,
|
||
|
# self.openssl,
|
||
|
# self.size_root,
|
||
|
# self.size_appli,
|
||
|
# self.size_child,
|
||
|
# self.passroot,
|
||
|
# self.passappli,
|
||
|
# self.country_name,
|
||
|
# self.state_or_province_name,
|
||
|
# self.locality_name,
|
||
|
# self.organization_name,
|
||
|
# self.common_name)
|
||
|
#
|
||
|
# conf = tempfile.NamedTemporaryFile(suffix="test.cfg")
|
||
|
# conf.write(bytes(
|
||
|
#'[config]\n'
|
||
|
#'port = 8000\n'
|
||
|
#'keyfile = %s/private/serverkey.pem\n'
|
||
|
#'certfile = %s/private/servercert.pem\n'
|
||
|
#'ca_cert = %s/certs/cachaincert.pem\n'
|
||
|
#'address =\n' %(workdir_cert_appli, workdir_cert_appli, workdir_cert_appli ), 'UTF-8'))
|
||
|
# args=['--conf', conf]
|
||
|
# #manager.main(args)
|
||
|
self.assertTrue(True)
|
||
|
|
||
|
def test_load_config(self):
|
||
|
config = configparser.ConfigParser()
|
||
|
config.add_section('config:server')
|
||
|
config.set('config:server', 'port', '8000')
|
||
|
config.set('config:server', 'keyfile', '/home/gameserver/ca/appli/private/serverkey.pem')
|
||
|
config.set('config:server', 'certfile', '/home/gameserver/ca/appli/certs/servercert.pem')
|
||
|
config.set('config:server', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
||
|
config.set('config:server', 'address', '')
|
||
|
config.set('config:server', 'authentification', 'yes')
|
||
|
config.add_section('config:client')
|
||
|
config.set('config:client', 'port', '8000')
|
||
|
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
|
||
|
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
|
||
|
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
||
|
config.set('config:client', 'address', '127.0.0.1')
|
||
|
config.add_section('command:test')
|
||
|
config.set('command:test', 'path', '/home/gameserver')
|
||
|
config.set('command:test', 'command', '/bin/sleep 10')
|
||
|
config.set('command:test', 'logsize', '10')
|
||
|
config.set('command:test', 'bufsize', '10')
|
||
|
config.add_section('config:user')
|
||
|
config.set('config:user', 'usename', 'filter_all, filter_admin')
|
||
|
try:
|
||
|
manager = Manager.Manager(False)
|
||
|
manager._load_config(config)
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error detected on load config')
|
||
|
|
||
|
def test_load_config2(self):
|
||
|
config = configparser.ConfigParser()
|
||
|
config.add_section('config:server')
|
||
|
config.set('config:server', 'authentification', 'no')
|
||
|
config.add_section('command:test')
|
||
|
config.set('command:test', 'path', '/home/gameserver')
|
||
|
config.set('command:test', 'command', '/bin/sleep 10')
|
||
|
try:
|
||
|
manager = Manager.Manager(False)
|
||
|
manager._load_config(config)
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error detected on load config')
|
||
|
|
||
|
def test_load_config_bad_param_logsize(self):
|
||
|
config = configparser.ConfigParser()
|
||
|
config.add_section('command:test')
|
||
|
config.set('command:test', 'command', '/bin/sleep 10')
|
||
|
config.set('command:test', 'logsize', 'bidon')
|
||
|
with self.assertRaises(ValueError):
|
||
|
manager = Manager.Manager(False)
|
||
|
manager._load_config(config)
|
||
|
self.assertTrue(True)
|
||
|
|
||
|
def test_load_config_bad_param_bufsize(self):
|
||
|
config = configparser.ConfigParser()
|
||
|
config.add_section('command:test')
|
||
|
config.set('command:test', 'command', '/bin/sleep 10')
|
||
|
config.set('command:test', 'bufsize', 'bidon')
|
||
|
with self.assertRaises(ValueError):
|
||
|
manager = Manager.Manager(False)
|
||
|
manager._load_config(config)
|
||
|
self.assertTrue(True)
|
||
|
|
||
|
def test_load_config_empty(self):
|
||
|
config = configparser.ConfigParser()
|
||
|
config.add_section('config:server')
|
||
|
config.add_section('config:client')
|
||
|
config.set('config:client', 'port', '8000')
|
||
|
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
|
||
|
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
|
||
|
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
||
|
config.set('config:client', 'address', '127.0.0.1')
|
||
|
config.add_section('config:user')
|
||
|
config.add_section('command:test')
|
||
|
config.set('command:test', 'command', '/bin/sleep 10')
|
||
|
try:
|
||
|
manager = Manager.Manager(False)
|
||
|
manager._load_config(config)
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error detected on load config')
|
||
|
|
||
|
def test_load_config_file(self):
|
||
|
cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t')
|
||
|
cfgfile.write('#\n[config:server]\nauthentification = No\n')
|
||
|
cfgfile.flush()
|
||
|
try:
|
||
|
manager = Manager.Manager(False)
|
||
|
manager.load_config(cfgfile)
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error detected on load configuration')
|
||
|
|
||
|
def test_load_config_file_none(self):
|
||
|
with self.assertRaises(ValueError):
|
||
|
manager = Manager.Manager(False)
|
||
|
manager.load_config(None)
|
||
|
|
||
|
def test_load_password_file(self):
|
||
|
pwdfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
|
||
|
|
||
|
config = configparser.ConfigParser()
|
||
|
config.add_section('config:server')
|
||
|
config.set('config:server', 'authentification', 'yes')
|
||
|
config.set('config:server', 'passwordfile', pwdfile.name)
|
||
|
pwdfile.write('username:$2a$12$2C97xW0KC/vFp3YyjlOgU.fWXJ3EiGT2Ihb0SWN9Mw0XI4WngiUqS\n\nusername2:badhash\n')
|
||
|
pwdfile.flush()
|
||
|
try:
|
||
|
manager = Manager.Manager(False)
|
||
|
manager._load_config(config)
|
||
|
manager.load_password()
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error detected on load password')
|
||
|
|
||
|
def test_constructor_manager_command(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_constructor_manager_command',
|
||
|
self.program,
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event)
|
||
|
manageCommand.list_thread()
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def test_execute_manager_command(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_execute_manager_command',
|
||
|
self.program,
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event)
|
||
|
manageCommand.status()
|
||
|
manageCommand.start()
|
||
|
manageCommand.status()
|
||
|
manageCommand.start()
|
||
|
foundEnd = re.compile('.*(Started).*')
|
||
|
foundY = re.compile('.*(sendToStdinY).*')
|
||
|
loop = 10
|
||
|
while loop > 0:
|
||
|
time.sleep(1)
|
||
|
out = manageCommand.getlog(0)
|
||
|
if foundEnd.match(out):
|
||
|
break
|
||
|
loop -= 1
|
||
|
if not foundEnd.match(out):
|
||
|
manageCommand.stop()
|
||
|
self.assertTrue(False, 'Missing message in log')
|
||
|
manageCommand.list_thread()
|
||
|
retA = manageCommand.action("sendToStdinA")
|
||
|
self.assertEqual(retA, "ok", 'Error impossible to send to stdin')
|
||
|
for i in range(0, 120):
|
||
|
retX = manageCommand.action("sendToStdin%d" % i)
|
||
|
self.assertEqual(retX, "ok", 'Error impossible to send to stdin')
|
||
|
retY = manageCommand.action("sendToStdinY")
|
||
|
self.assertEqual(retY, "ok", 'Error impossible to send to stdin')
|
||
|
loop = 10
|
||
|
while loop > 0:
|
||
|
time.sleep(1)
|
||
|
out = manageCommand.getlog(0)
|
||
|
if foundY.match(out):
|
||
|
break
|
||
|
loop -= 1
|
||
|
if not foundY.match(out):
|
||
|
manageCommand.stop()
|
||
|
self.assertTrue(False, 'Missing message in log')
|
||
|
|
||
|
manageCommand.stop()
|
||
|
manageCommand.status()
|
||
|
retZ = manageCommand.action("sendToStdinZ")
|
||
|
manageCommand.stop()
|
||
|
self.assertEqual(retZ, "ko", 'Error send to stdin when process is down')
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def test_execute_crash_manager_command(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_execute_crash_manager_command',
|
||
|
self.program + ' --no-loop --timeout 1',
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event)
|
||
|
manageCommand.start()
|
||
|
time.sleep(3)
|
||
|
manageCommand.status()
|
||
|
manageCommand.list_thread()
|
||
|
manageCommand.stop()
|
||
|
manageCommand.status()
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def test_execute_not_kill_manager_command(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_execute_not_kill_manager_command',
|
||
|
self.program + " --disable-kill",
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event,
|
||
|
maxWaitEnd = 2)
|
||
|
manageCommand.start()
|
||
|
time.sleep(1)
|
||
|
manageCommand.status()
|
||
|
manageCommand.list_thread()
|
||
|
manageCommand.stop()
|
||
|
manageCommand.status()
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def test_execute_command_crashed(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_execute_command_crashed',
|
||
|
self.program + " --no-loop --timeout=1",
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event,
|
||
|
waitDelay = 10)
|
||
|
manageCommand.start()
|
||
|
time.sleep(5)
|
||
|
manageCommand.start()
|
||
|
time.sleep(5)
|
||
|
manageCommand.stop()
|
||
|
self.assertTrue(True)
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def test_execute_command_file_not_found(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_execute_command_file_not_found',
|
||
|
self.program + "_not_exist",
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event)
|
||
|
ret = manageCommand.start()
|
||
|
manageCommand.stop()
|
||
|
self.assertEqual(ret, "crashed", 'Error object not generate error when program not exist')
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def test_execute_command_permission(self):
|
||
|
try:
|
||
|
logsize = 10
|
||
|
bufsize = 10
|
||
|
queueIn = multiprocessing.Queue()
|
||
|
queueOut = multiprocessing.Queue()
|
||
|
event = multiprocessing.Event()
|
||
|
manageCommand = Manager.ManageCommand('test_execute_command_permission',
|
||
|
self.badprogram,
|
||
|
self.path,
|
||
|
logsize,
|
||
|
bufsize,
|
||
|
queueIn,
|
||
|
queueOut,
|
||
|
event)
|
||
|
ret = manageCommand.start()
|
||
|
manageCommand.stop()
|
||
|
self.assertEqual(ret, "crashed", 'Error object not generate error when bad permission')
|
||
|
except:
|
||
|
self.fail('Error initialize object ManageCommand')
|
||
|
|
||
|
def _runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
|
||
|
"""
|
||
|
Thread to manage khaganat program
|
||
|
"""
|
||
|
manageCommand = Manager.ManageCommand(name=name,
|
||
|
command=command,
|
||
|
path=path,
|
||
|
logsize=logsize,
|
||
|
bufsize=bufsize,
|
||
|
queueIn=queueIn,
|
||
|
queueOut=queueOut,
|
||
|
event=event)
|
||
|
manageCommand.run()
|
||
|
|
||
|
# def test_run_manager_command(self):
|
||
|
# # Doesn't work (we need enable --concurrency=multiprocessing on coverage command but we need coverage 4.0)
|
||
|
# logsize = 10
|
||
|
# bufsize = 10
|
||
|
# queueIn = multiprocessing.Queue()
|
||
|
# queueOut = multiprocessing.Queue()
|
||
|
# event = multiprocessing.Event()
|
||
|
# threadCommand = multiprocessing.Process(target=self._runCommand,
|
||
|
# args=('test_run_manager_command',
|
||
|
# self.program,
|
||
|
# self.path,
|
||
|
# logsize,
|
||
|
# bufsize,
|
||
|
# queueIn,
|
||
|
# queueOut,
|
||
|
# event))
|
||
|
# threadCommand.start()
|
||
|
#
|
||
|
# event.set()
|
||
|
# queueIn.put("START")
|
||
|
# item = queueOut.get(timeout=4)
|
||
|
# self.assertEqual(item, "started", 'Error impossible to start program')
|
||
|
# time.sleep(1)
|
||
|
# event.set()
|
||
|
# queueIn.put("STATUS")
|
||
|
# item = queueOut.get(timeout=4)
|
||
|
# self.assertEqual(item, "started", 'Error impossible to start program')
|
||
|
# time.sleep(1)
|
||
|
# print("-" * 80, "shutdown" )
|
||
|
# event.set()
|
||
|
# queueIn.put("SHUTDOWN")
|
||
|
# with self.assertRaises(queue.Empty):
|
||
|
# item = queueOut.get(timeout=4)
|
||
|
# print("-" * 80, "wait thread" )
|
||
|
# threadCommand.join()
|
||
|
#
|
||
|
# self.assertTrue(True)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
unittest.main()
|