opennel-pymanager/tests/test_manager.py

1955 lines
83 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# create certificate (use for test)
# Copyright (C) 2017 AleaJactaEst
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import unittest
import tempfile
import os
import configparser
import multiprocessing
import time
import re
import queue
import signal
import http.server
import logging
from unittest.mock import patch
from unittest.mock import MagicMock
import traceback
try:
import pymanager.manager as Manager
except ImportError:
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pymanager.manager as Manager
#try:
# import pymanager.certificate as cert
#except ImportError:
# import sys
# import os
# sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# import pymanager.certificate as cert
class TimeoutError(Exception):
pass
def handlerCrash(signum, frame):
print("handlerCrash - TimeOut !")
for line in traceback.format_stack():
print(line.strip())
# Force Exit with all thread !
os._exit(2)
def handlerRaise(signum, frame):
print("handlerRaise - TimeOut !")
for line in traceback.format_stack():
print(line.strip())
raise TimeoutError
class TestManager(unittest.TestCase):
def setUp(self):
self.openssl = '/usr/bin/openssl'
self.size_root = 4096
self.size_appli = 4096
self.size_child = 2048
self.passroot = 'BadPasswordRoot'
self.passappli = 'BadPasswordApplication'
self.country_name = 'FR'
self.state_or_province_name = 'France'
self.locality_name = 'Paris'
self.organization_name = 'khanat'
self.common_name = 'khanat'
self.path = os.path.dirname(os.path.abspath(__file__))
self.program = os.path.join(self.path, 'simulate_program.py')
self.badprogram = os.path.join(self.path, 'test.cfg')
signal.signal(signal.SIGALRM, handlerCrash)
def test_load_config(self):
signal.alarm(10)
config = configparser.ConfigParser()
config.add_section('config:server')
config.set('config:server', 'port', '8000')
config.set('config:server', 'keyfile', '/home/gameserver/ca/appli/private/serverkey.pem')
config.set('config:server', 'certfile', '/home/gameserver/ca/appli/certs/servercert.pem')
config.set('config:server', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
config.set('config:server', 'address', '')
config.set('config:server', 'authentification', 'yes')
config.set('config:server', 'method', 'https')
config.add_section('config:client')
config.set('config:client', 'port', '8000')
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
config.set('config:client', 'address', '127.0.0.1')
config.set('config:client', 'method', 'https')
config.add_section('command:test')
config.set('command:test', 'path', '/home/gameserver')
config.set('command:test', 'command', '/bin/sleep 10')
config.set('command:test', 'logsize', '10')
config.set('command:test', 'bufsize', '10')
config.set('command:test', 'keep_filter', 'yes')
config.set('command:test', 'size_max_filter', '1000')
config.set('command:test', 'add_filter', '"^(.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P<ActivePlayer>.*)"')
config.set('command:test', 'del_filter', '"^(.*)(disconnectPlayer).+[\s]+(?P<ActivePlayer>.*)[\s]+(is disconnected)"')
config.add_section('config:user')
config.set('config:user', 'usename', 'filter_all, filter_admin')
try:
manager = Manager.Manager(False)
manager._load_config(config)
self.assertTrue(True)
except:
self.fail('Error detected on load config')
signal.alarm(0)
def test_load_config2(self):
signal.alarm(10)
config = configparser.ConfigParser()
config.add_section('config:server')
config.set('config:server', 'authentification', 'no')
config.add_section('command:test')
config.set('command:test', 'path', '/home/gameserver')
config.set('command:test', 'command', '/bin/sleep 10')
try:
manager = Manager.Manager(False)
manager._load_config(config)
self.assertTrue(True)
except:
self.fail('Error detected on load config')
signal.alarm(0)
def test_load_config_bad_param_logsize(self):
signal.alarm(10)
config = configparser.ConfigParser()
config.add_section('command:test')
config.set('command:test', 'command', '/bin/sleep 10')
config.set('command:test', 'logsize', 'bidon')
with self.assertRaises(ValueError):
manager = Manager.Manager(False)
manager._load_config(config)
self.assertTrue(True)
signal.alarm(0)
def test_load_config_bad_param_bufsize(self):
signal.alarm(10)
config = configparser.ConfigParser()
config.add_section('command:test')
config.set('command:test', 'command', '/bin/sleep 10')
config.set('command:test', 'bufsize', 'bidon')
with self.assertRaises(ValueError):
manager = Manager.Manager(False)
manager._load_config(config)
self.assertTrue(True)
signal.alarm(0)
def test_load_config_empty(self):
signal.alarm(10)
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('config:client')
config.set('config:client', 'port', '8000')
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
config.set('config:client', 'address', '127.0.0.1')
config.set('config:client', 'method', 'https')
config.add_section('config:user')
config.add_section('command:test')
config.set('command:test', 'command', '/bin/sleep 10')
try:
manager = Manager.Manager(False)
manager._load_config(config)
self.assertTrue(True)
except:
self.fail('Error detected on load config')
signal.alarm(0)
def test_load_config_file(self):
signal.alarm(10)
cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t')
cfgfile.write('#\n[config:server]\nauthentification = No\n')
cfgfile.flush()
try:
manager = Manager.Manager(False)
manager.load_config(cfgfile)
self.assertTrue(True)
except:
self.fail('Error detected on load configuration')
signal.alarm(0)
def test_load_config_file_none(self):
signal.alarm(10)
with self.assertRaises(ValueError):
manager = Manager.Manager(False)
manager.load_config(None)
signal.alarm(0)
def test_load_password_file(self):
signal.alarm(10)
pwdfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
config = configparser.ConfigParser()
config.add_section('config:server')
config.set('config:server', 'authentification', 'yes')
config.set('config:server', 'passwordfile', pwdfile.name)
pwdfile.write('username:$2a$12$2C97xW0KC/vFp3YyjlOgU.fWXJ3EiGT2Ihb0SWN9Mw0XI4WngiUqS\n\nusername2:badhash\n')
pwdfile.flush()
try:
manager = Manager.Manager(False)
manager._load_config(config)
manager.load_password()
self.assertTrue(True)
except:
self.fail('Error detected on load password')
signal.alarm(0)
def test_constructor_manager_command(self):
signal.alarm(10)
try:
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_constructor_manager_command',
self.program,
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False)
manageCommand.list_thread()
self.assertTrue(True)
except:
self.fail('Error initialize object ManageCommand')
signal.alarm(0)
def test_managercommand_analyze_line(self):
manage = Manager.ManageCommand('test_execute_manager_command',
"",
"",
1000,
1000,
None,
None,
None,
False,
1000,
"",
"",
False,
False,
1,
True)
manage._analyze_line("message 1")
manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Kezxaa(Lirria)' from BS stream file 'characters/002/account_2_0_pdr.bin'")
if '2' not in manage.filter_load_character:
self.assertTrue(False, "LOADED - Missing player 2")
if '0' not in manage.filter_load_character['2']:
self.assertTrue(False, "LOADED - Missing charactere 0 for player 2")
manage._analyze_line("alpha egs_plinfo EGS-132 : registerEntity EGS-134 : EIT: Register EId (0x0000000020:00:00:00) EntityName 'Nin(lirria)' UId 2 UserName 'tester'")
if '0' not in manage.filter_register_entity['2']:
self.assertTrue(False, "registerEntity - Missing player 2 (1st char)")
manage._analyze_line("alpha egs_plinfo EGS-132 : registerEntity EGS-134 : EIT: Register EId (0x0000000020:00:00:00) EntityName 'Nin(lirria)' UId 2 UserName 'tester'")
if '1' not in manage.filter_register_entity['2']:
self.assertTrue(False, "registerEntity - Missing player 2 (2nd char)")
manage._analyze_line("alpha egs_plinfo EGS-132 : registerEntity EGS-134 : EIT: Register EId (0x0000000020:00:00:00) EntityName 'Nin(lirria)' UId 3 UserName 'tester'")
if '0' not in manage.filter_register_entity['3']:
self.assertTrue(False, "registerEntity - Missing player 3 (1st char)")
#manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Puskle(Lirria)' from BS stream file 'characters/002/account_2_1_pdr.bin'")
#if '1' not in manage.filter_load_character['2']:
# self.assertTrue(False, "LOADED - Missing charactere 1 for player 2")
#manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '3' Character 'Puskle(Lirria)' from BS stream file 'characters/003/account_3_4_pdr.bin'")
#if '3' not in manage.filter_load_character:
# self.assertTrue(False, "LOADED - Missing player 2")
manage._analyze_line("alpha egs_ecinfo EGS-132 : setActiveCharForPlayer EGS-132 : set active char 1 for player 2")
if '2' not in manage.filter_active_character:
self.assertTrue(False, "setActiveCharForPlayer - Missing player 2")
if 'NameDomain' not in manage.filter_active_character['2']:
self.assertTrue(False, "setActiveCharForPlayer - Missing info player 2")
manage._analyze_line("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:83)")
if 'SID' not in manage.filter_active_character['2']:
self.assertTrue(False, "setActiveCharForPlayer - Missing SID player 2")
manage._analyze_line("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:83) (Row 90501) added to mirror)")
manage._analyze_line("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:83)")
manage._analyze_line("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed")
manage._analyze_line("alpha egs_plinfo EGS-132 : Player with userId = 2 removed")
manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) doesn't have privilege to execute the client admin command 'infos'")
manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) tried to execute a no valid client admin command 'INFO'")
for i in range(0, 2000):
manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) tried to execute a no valid client admin command 'INFO' %d" % i)
self.assertTrue(len(manage.filter_admin), 1000)
manage._analyze_line("alpha disconnectPlayer EGS-132 : player 2 is disconnected")
if '2' in manage.filter_active_character:
self.assertTrue(False, "disconnectPlayer - player 2 always live")
def test_manager_command_player(self):
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(30)
class MockServerHttp:
def appendchild(self, name, queueIn, queueOut, semaphore):
pass
def terminate(self):
pass
def join(self):
pass
manage = Manager.Manager(True)
try:
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program + ' --message=1 --no-loop --timeout=1')
config.set('command:test', 'restart_after_crash', 'no')
config.set('command:test', 'restart_delay', '1000')
config.set('command:test', 'egs_filter', 'yes')
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
signal.alarm(10)
key = list(manage.info.keys())[0]
queueIn = manage.info[key]['queueIn']
queueOut = manage.info[key]['queueOut']
semaphore = manage.info[key]['semaphore']
signal.alarm(30)
item = "started"
while item == "started":
logging.debug("sleep")
time.sleep(1)
logging.debug("semaphore")
semaphore.acquire()
logging.debug("status")
queueIn.put("STATUS")
logging.debug("queue")
item = queueOut.get()
semaphore.release()
logging.debug("Lecture STDOUT")
time.sleep(1)
signal.alarm(30)
logging.debug("semaphore")
semaphore.acquire()
logging.debug("stdout")
queueIn.put("STDOUT")
logging.debug("Attend le retour STDOUT")
item = queueOut.get()
semaphore.release()
logging.debug("Resultat STDOUT")
time.sleep(1)
signal.alarm(10)
semaphore.acquire()
queueIn.put("SHUTDOWN")
semaphore.release()
with self.assertRaises(queue.Empty):
item = queueOut.get(timeout=4)
#threadCommand.join()
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
self.assertTrue(True)
signal.alarm(0)
except TimeoutError:
print("Prepare Crash - TimeOut !")
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
print("Force Crash - TimeOut !")
os._exit(2)
except Exception as e:
print("Prepare Crash - %s" % e)
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
handlerCrash(15, 1)
print("Force Crash - TimeOut !")
os._exit(2)
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
def test_execute_manager_command(self):
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(10)
try:
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_execute_manager_command',
self.program,
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False)
manageCommand.status()
manageCommand.start()
manageCommand.status()
manageCommand.start()
foundEnd = re.compile('.*(Started).*')
foundY = re.compile('.*(sendToStdinY).*')
loop = 10
while loop > 0:
time.sleep(1)
out = manageCommand.getlog(0)
if foundEnd.match(str(out)):
break
loop -= 1
if not foundEnd.match(str(out)):
manageCommand.stop()
self.assertTrue(False, 'Missing message in log')
manageCommand.list_thread()
retA = manageCommand.action("sendToStdinA")
self.assertEqual(retA, "ok", 'Error impossible to send to stdin')
for i in range(0, 120):
retX = manageCommand.action("sendToStdin%d" % i)
self.assertEqual(retX, "ok", 'Error impossible to send to stdin')
retY = manageCommand.action("sendToStdinY")
self.assertEqual(retY, "ok", 'Error impossible to send to stdin')
loop = 10
while loop > 0:
time.sleep(1)
out = manageCommand.getlog(0)
if foundY.match(str(out)):
break
loop -= 1
if not foundY.match(str(out)):
manageCommand.stop()
self.assertTrue(False, 'Missing message in log')
manageCommand.stop()
manageCommand.status()
retZ = manageCommand.action("sendToStdinZ")
manageCommand.stop()
self.assertEqual(retZ, "ko", 'Error send to stdin when process is down')
self.assertTrue(True)
except:
self.fail('Error initialize object ManageCommand')
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
def test_execute_crash_manager_command(self):
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(10)
try:
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_execute_crash_manager_command',
self.program + ' --no-loop --timeout 1',
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False)
res = manageCommand.start()
self.assertEqual(res, 0)
wait = True
while wait:
time.sleep(1)
res = manageCommand.status()
if res != 0:
wait = False
self.assertEqual(res, 2)
manageCommand.list_thread()
manageCommand.stop()
manageCommand.status()
self.assertTrue(True)
except:
self.fail('Error initialize object ManageCommand')
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
def test_execute_not_kill_manager_command(self):
signal.alarm(30)
signal.signal(signal.SIGALRM, handlerCrash)
try:
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_execute_not_kill_manager_command',
self.program + " --disable-kill",
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False,
maxWaitEnd = 2)
except:
self.fail('Error initialize object ManageCommand')
try:
res = manageCommand.start()
self.assertEqual(res, 0)
wait = True
while wait:
time.sleep(1)
res = manageCommand.status()
self.assertEqual(res, 0)
res = manageCommand.getlog(0)
try:
if 'last-line' in res:
if res['last-line'] == 5:
wait = False
except:
pass
manageCommand.list_thread()
manageCommand.stop()
res = manageCommand.status()
self.assertEqual(res, 1)
self.assertTrue(True)
except Exception as e:
self.fail('Error when run test (%s)' % str(e))
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
def test_execute_command_crashed(self):
try:
signal.alarm(20)
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_execute_command_crashed',
self.program + " --no-loop --timeout=1",
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False,
waitDelay = 1)
manageCommand.start()
wait = True
while wait:
time.sleep(1)
res = manageCommand.status()
if res == 2:
wait = False
res = manageCommand.status()
self.assertEqual(res, 2)
manageCommand.start()
wait = True
while wait:
time.sleep(1)
res = manageCommand.status()
if res == 2:
wait = False
res = manageCommand.status()
self.assertEqual(res, 2)
manageCommand.stop()
self.assertTrue(True)
except:
self.fail('Error initialize object ManageCommand')
signal.alarm(0)
def test_execute_command_file_not_found(self):
signal.alarm(10)
try:
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_execute_command_file_not_found',
self.program + "_not_exist",
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False)
ret = manageCommand.start()
manageCommand.stop()
self.assertEqual(ret, 2, 'Error object not generate error when program not exist')
except:
self.fail('Error initialize object ManageCommand')
signal.alarm(0)
def test_execute_command_permission(self):
signal.alarm(10)
try:
logsize = 10
bufsize = 10
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
semaphore = multiprocessing.Semaphore()
manageCommand = Manager.ManageCommand('test_execute_command_permission',
self.badprogram,
self.path,
logsize,
bufsize,
queueIn,
queueOut,
semaphore,
False,
1,
"",
"",
False,
False,
1,
False)
ret = manageCommand.start()
manageCommand.stop()
self.assertEqual(ret, 2, 'Error object not generate error when bad permission')
except:
self.fail('Error initialize object ManageCommand')
signal.alarm(0)
def _runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore):
"""
Thread to manage khaganat program
"""
signal.alarm(10)
manageCommand = Manager.ManageCommand(name=name,
command=command,
path=path,
logsize=logsize,
bufsize=bufsize,
queueIn=queueIn,
queueOut=queueOut,
semaphore=semaphore,
keep_filter=False,
size_max_filter=1,
add_filter="",
del_filter="",
autostart=False,
restart_after_crash=False,
restart_delay=1,
egs_filter=False)
manageCommand.run()
signal.alarm(0)
def test_root_bad_loglevel(self):
signal.alarm(10)
with self.assertRaises(ValueError):
Manager.root(None,
None,
'NOTEXIST',
False,
False)
signal.alarm(0)
def test_root_bad_configfile(self):
signal.alarm(10)
logfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
with self.assertRaises(ValueError):
Manager.root(None,
logfile,
'DEBUG',
True,
True)
signal.alarm(0)
def test_main(self):
signal.signal(signal.SIGALRM, handlerCrash)
signal.alarm(10)
config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
config.write('[config:server]\nauthentification=no\n')
config.flush()
logging.debug("load")
Manager.main(['--conf=' + config.name])
logging.debug("end")
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
def test_run_manager_command(self):
# Enable timeout
signal.signal(signal.SIGALRM, handlerCrash)
signal.alarm(10)
class MockServerHttp:
def appendchild(self, name, queueIn, queueOut, semaphore):
pass
def terminate(self):
pass
def join(self):
pass
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program)
manage = Manager.Manager(False)
try:
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
for key in manage.info:
queueIn = manage.info[key]['queueIn']
queueOut = manage.info[key]['queueOut']
semaphore = manage.info[key]['semaphore']
logging.debug("[1] --------- send START")
signal.alarm(10)
semaphore.acquire()
queueIn.put("START")
# Enable timeout
signal.alarm(10)
item = queueOut.get()
semaphore.release()
self.assertEqual(item['state'], 'started', 'Error impossible to start program')
signal.alarm(0)
time.sleep(1)
logging.debug("[2] --------- send STATUS")
signal.alarm(10)
semaphore.acquire()
queueIn.put("STATUS")
item = queueOut.get()
semaphore.release()
self.assertEqual(item['state'], 'started', 'Error impossible to read status')
signal.alarm(0)
time.sleep(1)
logging.debug("[3] --------- send STDIN arg")
signal.alarm(10)
semaphore.acquire()
queueIn.put("STDIN arg")
item = queueOut.get()
semaphore.release()
self.assertEqual(item['state'], 'ok', 'Error when send STDIN')
signal.alarm(0)
time.sleep(1)
logging.debug("[4] --------- send STDOUT 4")
signal.alarm(10)
semaphore.acquire()
logging.debug("[4.1] --------- send STDOUT 4")
queueIn.put("STDOUT 4")
logging.debug("[4.2] --------- send STDOUT 4")
item = queueOut.get()
logging.debug("[4.3] --------- send STDOUT 4")
semaphore.release()
logging.debug("[4.4] --------- send STDOUT 4")
signal.alarm(0)
logging.debug("[4.5] --------- send STDOUT 4")
self.assertEqual(item['first-line'], 4, 'Error when read STDOUT (Missing first-line)')
self.assertEqual(item['last-line'], 6, 'Error when read STDOUT (Missing first-line)')
# self.assertRegex(item, '^[{](.*)("first-line": 4)(.*)[}]$', 'Error when read STDOUT (Missing first-line)')
# self.assertRegex(item, '^[{](.*)("last-line": 6)(.*)[}]$', 'Error when read STDOUT (Missing last-line)')
self.assertRegex(str(item),
'^(.*)(6 arg)(.*)$',
'Error when read STDOUT (bad record)')
time.sleep(1)
logging.debug("[5] --------- send BADCOMMAND")
signal.alarm(10)
semaphore.acquire()
queueIn.put("BADCOMMAND")
item = queueOut.get()
semaphore.release()
self.assertEqual(item['error'], 'command unknown', 'Error impossible to read status')
signal.alarm(0)
time.sleep(1)
logging.debug("[6] --------- send STOP")
signal.alarm(10)
semaphore.acquire()
queueIn.put("STOP")
item = queueOut.get()
semaphore.release()
self.assertEqual(item["state"], "stopped", 'Error impossible to read status')
signal.alarm(0)
time.sleep(1)
logging.debug("[7] --------- send SHUTDOWN")
signal.alarm(10)
semaphore.acquire()
queueIn.put("SHUTDOWN")
with self.assertRaises(queue.Empty):
item = queueOut.get(timeout=5)
semaphore.release()
#threadCommand.join()
logging.debug("--------- Stop all")
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
self.assertTrue(True)
except Exception as e:
logging.error("test_run_manager_command - Prepare Crash - %s" % e)
logging.error(traceback.format_exc())
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
print("test_run_manager_command - Force Crash - TimeOut !")
os._exit(2)
signal.alarm(0)
signal.signal(signal.SIGALRM, handlerRaise)
def test_run_manager_command_autostart(self):
# Enable timeout
signal.alarm(10)
class MockServerHttp:
def appendchild(self, name, queueIn, queueOut, semaphore):
pass
def terminate(self):
pass
def join(self):
pass
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program)
manage = Manager.Manager(True)
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
key = list(manage.info.keys())[0]
queueIn = manage.info[key]['queueIn']
queueOut = manage.info[key]['queueOut']
semaphore = manage.info[key]['semaphore']
signal.alarm(10)
semaphore.acquire()
queueIn.put("STATUS")
item = queueOut.get(timeout=4)
semaphore.release()
self.assertEqual(item['state'], "started", 'Error impossible to read status')
time.sleep(1)
signal.alarm(10)
semaphore.acquire()
queueIn.put("SHUTDOWN")
with self.assertRaises(queue.Empty):
item = queueOut.get(timeout=4)
semaphore.release()
#threadCommand.join()
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
self.assertTrue(True)
signal.alarm(0)
def test_run_manager_command_autostart_option(self):
# Enable timeout
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(10)
class MockServerHttp:
def appendchild(self, name, queueIn, queueOut, semaphore):
pass
def terminate(self):
pass
def join(self):
pass
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program)
config.set('command:test', 'autostart', 'yes')
manage = Manager.Manager(False)
try:
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
time.sleep(5)
key = list(manage.info.keys())[0]
queueIn = manage.info[key]['queueIn']
queueOut = manage.info[key]['queueOut']
semaphore = manage.info[key]['semaphore']
signal.alarm(10)
semaphore.acquire()
queueIn.put("STATUS")
item = queueOut.get(timeout=4)
semaphore.release()
self.assertEqual(item["state"], "started", 'program not started')
time.sleep(1)
signal.alarm(10)
semaphore.acquire()
queueIn.put("SHUTDOWN")
with self.assertRaises(queue.Empty):
item = queueOut.get(timeout=4)
semaphore.release()
#threadCommand.join()
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
except TimeoutError:
print("test_run_manager_command_restart_after_crash - Prepare Crash - TimeOut !")
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !")
os._exit(2)
except Exception as e:
print("test_run_manager_command_restart_after_crash - Prepare Crash - %s" % e)
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
handlerCrash(15, 1)
print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !")
os._exit(2)
self.assertTrue(True)
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
def test_run_manager_command_restart_after_crash(self):
# Enable timeout
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(10)
logging.debug("--------- test_run_manager_command_restart_after_crash => Start")
class MockServerHttp:
def appendchild(self, name, queueIn, queueOut, semaphore):
pass
def terminate(self):
pass
def join(self):
pass
config = configparser.ConfigParser()
config.add_section('config:server')
config.add_section('command:test')
config.set('command:test', 'command', self.program + ' --no-loop --timeout=5')
config.set('command:test', 'autostart', 'yes')
config.set('command:test', 'restart_after_crash', 'yes')
config.set('command:test', 'restart_delay', '5')
manage = Manager.Manager(False)
try:
manage.serverHttp = MockServerHttp()
manage._load_config(config)
manage.launch_command()
key = list(manage.info.keys())[0]
queueIn = manage.info[key]['queueIn']
queueOut = manage.info[key]['queueOut']
semaphore = manage.info[key]['semaphore']
logging.debug("--------- wait not started")
signal.alarm(20)
item = {"state": "started"}
while item['state'] == "started":
time.sleep(1)
semaphore.acquire()
queueIn.put("STATUS")
try:
item = queueOut.get()
except queue.Empty:
pass
semaphore.release()
self.assertEqual(item['state'], "crashed", 'program not started')
signal.alarm(20)
logging.debug("--------- wait not crashed")
while item['state'] == "crashed":
time.sleep(1)
semaphore.acquire()
queueIn.put("STATUS")
try:
item = queueOut.get(timeout=4)
except queue.Empty:
pass
semaphore.release()
signal.alarm(20)
self.assertEqual(item['state'], "started", 'program not started')
logging.debug("--------- wait not started")
signal.alarm(20)
while item['state'] == "started":
time.sleep(1)
semaphore.acquire()
queueIn.put("STATUS")
try:
item = queueOut.get(timeout=4)
except queue.Empty:
pass
semaphore.release()
self.assertEqual(item['state'], "crashed", 'program not started')
signal.alarm(20)
logging.debug("--------- wait not crashed")
# item = "crashed"
while item['state'] == "crashed":
time.sleep(1)
semaphore.acquire()
queueIn.put("STATUS")
try:
item = queueOut.get()
except queue.Empty:
pass
semaphore.release()
signal.alarm(20)
logging.debug("--------- SHUTDOWN")
time.sleep(1)
signal.alarm(10)
semaphore.acquire()
queueIn.put("SHUTDOWN")
semaphore.release()
#threadCommand.join()
manage.receive_signal(15, 1)
manage.wait_children_commands()
#Disable timeout
signal.alarm(0)
except TimeoutError:
print("test_run_manager_command_restart_after_crash - Prepare Crash - TimeOut !")
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !")
os._exit(2)
except Exception as e:
print("test_run_manager_command_restart_after_crash - Prepare Crash - %s" % e)
#self.fail('Error initialize object ManageCommand')
manage.receive_signal(15, 1)
manage.wait_children_commands()
handlerCrash(15, 1)
print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !")
os._exit(2)
self.assertTrue(True)
signal.signal(signal.SIGALRM, handlerRaise)
signal.alarm(0)
class MockStreamRequestHandler():
def __init__(self):
self.value = "{}"
self.message = None
def define_return(self, value):
self.value = value
def read(self, numbercar):
return bytes(self.value, "utf-8")
def write(self, message):
self.message = message
class MockServer():
def __init__(self):
self.listSemaphore = {}
self.listQueueIn = {}
self.listQueueOut = {}
self.authentification = False
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_log_message(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.address_string = MagicMock()
manage.log_message('example')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_set_headers(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage._set_headers()
manage.send_response.assert_called_with(200)
# manage.send_header.assert_called_with('Content-type', 'application/json')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_1(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.headers = {}
manage._command_log()
manage.send_error.assert_called_with(400, "bad content-type")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_2(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.headers = {'content-type' : 'application/json'}
manage._command_log()
manage.send_error.assert_called_with(400, "bad content-length")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_3(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage.rfile = TestManager.MockStreamRequestHandler()
manage._command_log()
manage.send_error.assert_called_with(400, 'Missing param name')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_4(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.rfile.define_return( '{"name": "test"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._command_log()
manage.send_error.assert_called_with(400, 'Name unknown')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_5(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listQueueIn = {'test': ''}
manage.rfile.define_return( '{"name": "test"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._command_log()
manage.send_error.assert_called_with(400, 'Missing param first-line')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_6(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listQueueIn = {'test': ''}
manage.rfile.define_return( '{"name": "test", "first-line" : "test"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._command_log()
manage.send_error.assert_called_with(400, 'Impossible to read first-line')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_7(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
#manage.server.listQueueOut['test'].put("empty")
manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._command_log()
#self.assertEqual(b'empty',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_command_log_8(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.server.listQueueOut['test'].put("empty")
manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._command_log()
self.assertEqual(b'"empty"',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_list(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage._send_list()
self.assertEqual(b'{"0": "test"}',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_shutdown(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage._send_shutdown()
self.assertEqual(b'{"shutdown": "ok"}',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_all(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.server.listQueueOut['test'].put("empty")
manage._send_command_all("test")
self.assertEqual(b'{"test": "empty"}',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_all_timeout(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage._send_command_all("test")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_action_1(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._send_action()
manage.send_error.assert_called_with(400, 'Missing param name')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_action_2(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.rfile.define_return( '{"name": "testnew"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._send_action()
manage.send_error.assert_called_with(400, 'Name unknown')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_action_3(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.rfile.define_return( '{"name": "test"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._send_action()
manage.send_error.assert_called_with(400, 'Missing param action')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_action_4(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.rfile.define_return( '{"name": "test", "action": "example"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._send_action()
self.assertEqual(None,manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_action_5(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.server.listQueueOut['test'].put({"state": "empty"})
manage.rfile.define_return( '{"name": "test", "action": "example"}' )
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._send_action()
self.assertEqual(b'{"state": "empty"}',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_action_6(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.headers = {}
manage._send_action()
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_1(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.headers = {}
manage._send_command("STATUS")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_2(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage._send_command("STATUS")
manage.send_error.assert_called_with(400, 'Missing param name')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_3(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage.server.listQueueOut['test'].put({"state": "empty"})
manage.rfile.define_return( '{"name": "test"}' )
manage._send_command("STATUS")
self.assertEqual(b'{"state": "empty"}',manage.wfile.message)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_4(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage.rfile.define_return( '{"name": "test"}' )
manage._send_command("STATUS")
manage.send_error.assert_called_with(500, 'Missing return')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_send_command_5(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.rfile = TestManager.MockStreamRequestHandler()
manage.wfile = TestManager.MockStreamRequestHandler()
manage.server = TestManager.MockServer()
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
manage.server.listQueueOut['test'].put("empty")
manage.rfile.define_return( '{"name": "testnew"}' )
manage._send_command("STATUS")
manage.send_error.assert_called_with(400, 'Name unknown')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_GET_1(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._command_log = MagicMock()
manage.path = "/STDOUT"
manage.do_GET()
manage._command_log.assert_called_with()
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_1(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
res = manage.check_authentication()
self.assertEqual(True, res)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_2(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.server.authentification = True
res = manage.check_authentication()
self.assertEqual(False, res)
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_3(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
logging.error = MagicMock()
manage.server.authentification = True
manage.headers = {'Authorization' : 'Other'}
res = manage.check_authentication()
self.assertEqual(False, res)
logging.error.assert_called_with("Authentification with Bad method (Other)")
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_4(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
logging.error = MagicMock()
manage.server.authentification = True
manage.headers = {'Authorization' : 'Basic '}
res = manage.check_authentication()
self.assertEqual(False, res)
logging.error.assert_called_with("Error detected list index out of range")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_5(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
logging.error = MagicMock()
manage.server.authentification = True
manage.headers = {'Authorization' : 'Basic test:$2b$12$AoRGhrAYsuSlROUDx.0AZ.3SPBVYd8wD.Vyz7jCRXZdwm7ArEN3Oi'}
res = manage.check_authentication()
self.assertEqual(False, res)
logging.error.assert_called_with("Error detected Incorrect padding")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_6(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.server.users = {"user1": ""}
logging.error = MagicMock()
manage.server.authentification = True
manage.headers = {'Authorization' : 'Basic dGVzdDokMmIkMTIkQW9SR2hyQVlzdVNsUk9VRHguMEFaLjNTUEJWWWQ4d0QuVnl6N2pDUlhaZHdtN0FyRU4zT2k='}
res = manage.check_authentication()
self.assertEqual(False, res)
logging.error.assert_called_with("Authentification with unknown user (test)")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_7(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
#manage.server.users = {"username": bytes("$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i", "utf-8")}
#manage.server.users = {"username": b"$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i"}
manage.server.users = {"username": "$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i".encode('utf-8')}
logging.error = MagicMock()
manage.server.authentification = True
#manage.headers = {'Authorization': bytes('Basic dXNlcm5hbWU6cGFzc3dvcmQ=', 'UTF-8')}
manage.headers = {'Authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='}
res = manage.check_authentication()
self.assertEqual(True, res)
#logging.error.assert_called_with("Authentification with user (username)")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_check_authentication_8(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.server.users = {"username": "$2b$12$/rfBBlTy3E9CgB8ZGeWFBOo54UN5Ogj3PuEOHVJcXyQ2hL6kYVwWW".encode('utf-8')}
logging.error = MagicMock()
manage.server.authentification = True
manage.headers = {'Authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='}
res = manage.check_authentication()
self.assertEqual(False, res)
logging.error.assert_called_with("Authentification with wrong password for user (username)")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_GET_2(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_command = MagicMock()
manage.path = "/STATUS"
manage.do_GET()
manage._send_command.assert_called_with("STATUS")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_GET_3(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_list = MagicMock()
manage.path = "/LIST"
manage.do_GET()
manage._send_list.assert_called_with()
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_GET_4(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_command_all = MagicMock()
manage.path = "/STATUSALL"
manage.do_GET()
manage._send_command_all.assert_called_with("STATUS")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_GET_5(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.send_error = MagicMock()
manage.path = "/BADPATH"
manage.do_GET()
manage.send_error.assert_called_with(400, 'Path unknown')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_GET_6(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.server.authentification = True
manage.send_error = MagicMock()
manage.path = "/BADPATH"
manage.do_GET()
manage.send_error.assert_called_with(403, 'Wrong authentication')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_1(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_command = MagicMock()
manage.path = "/START"
manage.do_POST()
manage._send_command.assert_called_with("START")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_2(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_command = MagicMock()
manage.path = "/STOP"
manage.do_POST()
manage._send_command.assert_called_with("STOP")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_3(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_action = MagicMock()
manage.path = "/STDIN"
manage.do_POST()
manage._send_action.assert_called_with()
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_4(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_shutdown = MagicMock()
manage.path = "/SHUTDOWN"
manage.do_POST()
manage._send_shutdown.assert_called_with()
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_5(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_command_all = MagicMock()
manage.path = "/STARTALL"
manage.do_POST()
manage._send_command_all.assert_called_with("START")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_6(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage._send_command_all = MagicMock()
manage.path = "/STOPALL"
manage.do_POST()
manage._send_command_all.assert_called_with("STOP")
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_7(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage._send_command = MagicMock()
manage._send_action = MagicMock()
manage._send_shutdown = MagicMock()
manage._send_command_all = MagicMock()
manage.path = "/BADPATH"
manage.do_POST()
manage.send_error.assert_called_with(400, 'Path unknown')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_POST_8(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.server = TestManager.MockServer()
manage.server.authentification = True
manage.send_error = MagicMock()
manage.path = "/BADPATH"
manage.do_POST()
manage.send_error.assert_called_with(403, 'Wrong authentication')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_HEAD(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.path = "test"
manage.do_HEAD()
manage.send_error.assert_called_with(404, 'File Not Found: test')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_PUT(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.path = "test"
manage.do_PUT()
manage.send_error.assert_called_with(404, 'File Not Found: test')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_PATCH(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.path = "test"
manage.do_PATCH()
manage.send_error.assert_called_with(404, 'File Not Found: test')
signal.alarm(0)
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
def test_run_manage_do_DELETE(self, init):
# Enable timeout
signal.alarm(10)
manage = Manager.ManageHttpRequest(None, None, None)
manage.log_message = MagicMock()
manage.send_response = MagicMock()
manage.send_header = MagicMock()
manage.send_error = MagicMock()
manage.end_headers = MagicMock()
manage.path = "test"
manage.do_DELETE()
manage.send_error.assert_called_with(404, 'File Not Found: test')
signal.alarm(0)
if __name__ == '__main__':
logging.getLogger('logging')
handlers = []
handlers.append(logging.StreamHandler())
logging.basicConfig(handlers=handlers, level=0,
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
unittest.main()