#!/usr/bin/python3 # -*- coding: utf-8 -*- # # create certificate (use for test) # Copyright (C) 2017 AleaJactaEst # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import unittest import tempfile import os import configparser import multiprocessing import time import re import queue import signal import http.server import logging from unittest.mock import patch from unittest.mock import MagicMock import traceback try: import pymanager.manager as Manager except ImportError: import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pymanager.manager as Manager #try: # import pymanager.certificate as cert #except ImportError: # import sys # import os # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # import pymanager.certificate as cert class TimeoutError(Exception): pass def handlerCrash(signum, frame): print("handlerCrash - TimeOut !") for line in traceback.format_stack(): print(line.strip()) # Force Exit with all thread ! os._exit(2) def handlerRaise(signum, frame): print("handlerRaise - TimeOut !") for line in traceback.format_stack(): print(line.strip()) raise TimeoutError class TestManager(unittest.TestCase): def setUp(self): self.openssl = '/usr/bin/openssl' self.size_root = 4096 self.size_appli = 4096 self.size_child = 2048 self.passroot = 'BadPasswordRoot' self.passappli = 'BadPasswordApplication' self.country_name = 'FR' self.state_or_province_name = 'France' self.locality_name = 'Paris' self.organization_name = 'khanat' self.common_name = 'khanat' self.path = os.path.dirname(os.path.abspath(__file__)) self.program = os.path.join(self.path, 'simulate_program.py') self.badprogram = os.path.join(self.path, 'test.cfg') signal.signal(signal.SIGALRM, handlerCrash) def test_load_config(self): signal.alarm(10) config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'port', '8000') config.set('config:server', 'keyfile', '/home/gameserver/ca/appli/private/serverkey.pem') config.set('config:server', 'certfile', '/home/gameserver/ca/appli/certs/servercert.pem') config.set('config:server', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem') config.set('config:server', 'address', '') config.set('config:server', 'authentification', 'yes') config.set('config:server', 'method', 'https') config.add_section('config:client') config.set('config:client', 'port', '8000') config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem') config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem') config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem') config.set('config:client', 'address', '127.0.0.1') config.set('config:client', 'method', 'https') config.add_section('command:test') config.set('command:test', 'path', '/home/gameserver') config.set('command:test', 'command', '/bin/sleep 10') config.set('command:test', 'logsize', '10') config.set('command:test', 'bufsize', '10') config.set('command:test', 'keep_filter', 'yes') config.set('command:test', 'size_max_filter', '1000') config.set('command:test', 'add_filter', '"^(.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)"') config.set('command:test', 'del_filter', '"^(.*)(disconnectPlayer).+[\s]+(?P.*)[\s]+(is disconnected)"') config.add_section('config:user') config.set('config:user', 'usename', 'filter_all, filter_admin') try: manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) except: self.fail('Error detected on load config') signal.alarm(0) def test_load_config2(self): signal.alarm(10) config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'authentification', 'no') config.add_section('command:test') config.set('command:test', 'path', '/home/gameserver') config.set('command:test', 'command', '/bin/sleep 10') try: manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) except: self.fail('Error detected on load config') signal.alarm(0) def test_load_config_bad_param_logsize(self): signal.alarm(10) config = configparser.ConfigParser() config.add_section('command:test') config.set('command:test', 'command', '/bin/sleep 10') config.set('command:test', 'logsize', 'bidon') with self.assertRaises(ValueError): manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) signal.alarm(0) def test_load_config_bad_param_bufsize(self): signal.alarm(10) config = configparser.ConfigParser() config.add_section('command:test') config.set('command:test', 'command', '/bin/sleep 10') config.set('command:test', 'bufsize', 'bidon') with self.assertRaises(ValueError): manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) signal.alarm(0) def test_load_config_empty(self): signal.alarm(10) config = configparser.ConfigParser() config.add_section('config:server') config.add_section('config:client') config.set('config:client', 'port', '8000') config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem') config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem') config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem') config.set('config:client', 'address', '127.0.0.1') config.set('config:client', 'method', 'https') config.add_section('config:user') config.add_section('command:test') config.set('command:test', 'command', '/bin/sleep 10') try: manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) except: self.fail('Error detected on load config') signal.alarm(0) def test_load_config_file(self): signal.alarm(10) cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:server]\nauthentification = No\n') cfgfile.flush() try: manager = Manager.Manager(False) manager.load_config(cfgfile) self.assertTrue(True) except: self.fail('Error detected on load configuration') signal.alarm(0) def test_load_config_file_none(self): signal.alarm(10) with self.assertRaises(ValueError): manager = Manager.Manager(False) manager.load_config(None) signal.alarm(0) def test_load_password_file(self): signal.alarm(10) pwdfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'authentification', 'yes') config.set('config:server', 'passwordfile', pwdfile.name) pwdfile.write('username:$2a$12$2C97xW0KC/vFp3YyjlOgU.fWXJ3EiGT2Ihb0SWN9Mw0XI4WngiUqS\n\nusername2:badhash\n') pwdfile.flush() try: manager = Manager.Manager(False) manager._load_config(config) manager.load_password() self.assertTrue(True) except: self.fail('Error detected on load password') signal.alarm(0) def test_constructor_manager_command(self): signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_constructor_manager_command', self.program, self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False) manageCommand.list_thread() self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') signal.alarm(0) def test_managercommand_analyze_line(self): manage = Manager.ManageCommand('test_execute_manager_command', "", "", 1000, 1000, None, None, None, False, 1000, "", "", False, False, 1, True) manage._analyze_line("message 1") manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Kezxaa(Lirria)' from BS stream file 'characters/002/account_2_0_pdr.bin'") if '2' not in manage.filter_load_character: self.assertTrue(False, "LOADED - Missing player 2") if '0' not in manage.filter_load_character['2']: self.assertTrue(False, "LOADED - Missing charactere 0 for player 2") manage._analyze_line("alpha egs_plinfo EGS-132 : registerEntity EGS-134 : EIT: Register EId (0x0000000020:00:00:00) EntityName 'Nin(lirria)' UId 2 UserName 'tester'") if '0' not in manage.filter_register_entity['2']: self.assertTrue(False, "registerEntity - Missing player 2 (1st char)") manage._analyze_line("alpha egs_plinfo EGS-132 : registerEntity EGS-134 : EIT: Register EId (0x0000000020:00:00:00) EntityName 'Nin(lirria)' UId 2 UserName 'tester'") if '1' not in manage.filter_register_entity['2']: self.assertTrue(False, "registerEntity - Missing player 2 (2nd char)") manage._analyze_line("alpha egs_plinfo EGS-132 : registerEntity EGS-134 : EIT: Register EId (0x0000000020:00:00:00) EntityName 'Nin(lirria)' UId 3 UserName 'tester'") if '0' not in manage.filter_register_entity['3']: self.assertTrue(False, "registerEntity - Missing player 3 (1st char)") #manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Puskle(Lirria)' from BS stream file 'characters/002/account_2_1_pdr.bin'") #if '1' not in manage.filter_load_character['2']: # self.assertTrue(False, "LOADED - Missing charactere 1 for player 2") #manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '3' Character 'Puskle(Lirria)' from BS stream file 'characters/003/account_3_4_pdr.bin'") #if '3' not in manage.filter_load_character: # self.assertTrue(False, "LOADED - Missing player 2") manage._analyze_line("alpha egs_ecinfo EGS-132 : setActiveCharForPlayer EGS-132 : set active char 1 for player 2") if '2' not in manage.filter_active_character: self.assertTrue(False, "setActiveCharForPlayer - Missing player 2") if 'NameDomain' not in manage.filter_active_character['2']: self.assertTrue(False, "setActiveCharForPlayer - Missing info player 2") manage._analyze_line("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:83)") if 'SID' not in manage.filter_active_character['2']: self.assertTrue(False, "setActiveCharForPlayer - Missing SID player 2") manage._analyze_line("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:83) (Row 90501) added to mirror)") manage._analyze_line("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:83)") manage._analyze_line("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed") manage._analyze_line("alpha egs_plinfo EGS-132 : Player with userId = 2 removed") manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) doesn't have privilege to execute the client admin command 'infos'") manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) tried to execute a no valid client admin command 'INFO'") for i in range(0, 2000): manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) tried to execute a no valid client admin command 'INFO' %d" % i) self.assertTrue(len(manage.filter_admin), 1000) manage._analyze_line("alpha disconnectPlayer EGS-132 : player 2 is disconnected") if '2' in manage.filter_active_character: self.assertTrue(False, "disconnectPlayer - player 2 always live") def test_manager_command_player(self): signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(30) class MockServerHttp: def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass def join(self): pass manage = Manager.Manager(True) try: config = configparser.ConfigParser() config.add_section('config:server') config.add_section('command:test') config.set('command:test', 'command', self.program + ' --message=1 --no-loop --timeout=1') config.set('command:test', 'restart_after_crash', 'no') config.set('command:test', 'restart_delay', '1000') config.set('command:test', 'egs_filter', 'yes') manage.serverHttp = MockServerHttp() manage._load_config(config) manage.launch_command() signal.alarm(10) key = list(manage.info.keys())[0] queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] semaphore = manage.info[key]['semaphore'] signal.alarm(30) item = "started" while item == "started": logging.debug("sleep") time.sleep(1) logging.debug("semaphore") semaphore.acquire() logging.debug("status") queueIn.put("STATUS") logging.debug("queue") item = queueOut.get() semaphore.release() logging.debug("Lecture STDOUT") time.sleep(1) signal.alarm(30) logging.debug("semaphore") semaphore.acquire() logging.debug("stdout") queueIn.put("STDOUT") logging.debug("Attend le retour STDOUT") item = queueOut.get() semaphore.release() logging.debug("Resultat STDOUT") time.sleep(1) signal.alarm(10) semaphore.acquire() queueIn.put("SHUTDOWN") semaphore.release() with self.assertRaises(queue.Empty): item = queueOut.get(timeout=4) #threadCommand.join() manage.receive_signal(15, 1) manage.wait_children_commands() #Disable timeout signal.alarm(0) self.assertTrue(True) signal.alarm(0) except TimeoutError: print("Prepare Crash - TimeOut !") #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() print("Force Crash - TimeOut !") os._exit(2) except Exception as e: print("Prepare Crash - %s" % e) #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() handlerCrash(15, 1) print("Force Crash - TimeOut !") os._exit(2) signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_manager_command(self): signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_manager_command', self.program, self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False) manageCommand.status() manageCommand.start() manageCommand.status() manageCommand.start() foundEnd = re.compile('.*(Started).*') foundY = re.compile('.*(sendToStdinY).*') loop = 10 while loop > 0: time.sleep(1) out = manageCommand.getlog(0) if foundEnd.match(str(out)): break loop -= 1 if not foundEnd.match(str(out)): manageCommand.stop() self.assertTrue(False, 'Missing message in log') manageCommand.list_thread() retA = manageCommand.action("sendToStdinA") self.assertEqual(retA, "ok", 'Error impossible to send to stdin') for i in range(0, 120): retX = manageCommand.action("sendToStdin%d" % i) self.assertEqual(retX, "ok", 'Error impossible to send to stdin') retY = manageCommand.action("sendToStdinY") self.assertEqual(retY, "ok", 'Error impossible to send to stdin') loop = 10 while loop > 0: time.sleep(1) out = manageCommand.getlog(0) if foundY.match(str(out)): break loop -= 1 if not foundY.match(str(out)): manageCommand.stop() self.assertTrue(False, 'Missing message in log') manageCommand.stop() manageCommand.status() retZ = manageCommand.action("sendToStdinZ") manageCommand.stop() self.assertEqual(retZ, "ko", 'Error send to stdin when process is down') self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_crash_manager_command(self): signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_crash_manager_command', self.program + ' --no-loop --timeout 1', self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False) res = manageCommand.start() self.assertEqual(res, 0) wait = True while wait: time.sleep(1) res = manageCommand.status() if res != 0: wait = False self.assertEqual(res, 2) manageCommand.list_thread() manageCommand.stop() manageCommand.status() self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_not_kill_manager_command(self): signal.alarm(30) signal.signal(signal.SIGALRM, handlerCrash) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_not_kill_manager_command', self.program + " --disable-kill", self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False, maxWaitEnd = 2) except: self.fail('Error initialize object ManageCommand') try: res = manageCommand.start() self.assertEqual(res, 0) wait = True while wait: time.sleep(1) res = manageCommand.status() self.assertEqual(res, 0) res = manageCommand.getlog(0) try: if 'last-line' in res: if res['last-line'] == 5: wait = False except: pass manageCommand.list_thread() manageCommand.stop() res = manageCommand.status() self.assertEqual(res, 1) self.assertTrue(True) except Exception as e: self.fail('Error when run test (%s)' % str(e)) signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_command_crashed(self): try: signal.alarm(20) logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_command_crashed', self.program + " --no-loop --timeout=1", self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False, waitDelay = 1) manageCommand.start() wait = True while wait: time.sleep(1) res = manageCommand.status() if res == 2: wait = False res = manageCommand.status() self.assertEqual(res, 2) manageCommand.start() wait = True while wait: time.sleep(1) res = manageCommand.status() if res == 2: wait = False res = manageCommand.status() self.assertEqual(res, 2) manageCommand.stop() self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') signal.alarm(0) def test_execute_command_file_not_found(self): signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_command_file_not_found', self.program + "_not_exist", self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False) ret = manageCommand.start() manageCommand.stop() self.assertEqual(ret, 2, 'Error object not generate error when program not exist') except: self.fail('Error initialize object ManageCommand') signal.alarm(0) def test_execute_command_permission(self): signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_command_permission', self.badprogram, self.path, logsize, bufsize, queueIn, queueOut, semaphore, False, 1, "", "", False, False, 1, False) ret = manageCommand.start() manageCommand.stop() self.assertEqual(ret, 2, 'Error object not generate error when bad permission') except: self.fail('Error initialize object ManageCommand') signal.alarm(0) def _runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore): """ Thread to manage khaganat program """ signal.alarm(10) manageCommand = Manager.ManageCommand(name=name, command=command, path=path, logsize=logsize, bufsize=bufsize, queueIn=queueIn, queueOut=queueOut, semaphore=semaphore, keep_filter=False, size_max_filter=1, add_filter="", del_filter="", autostart=False, restart_after_crash=False, restart_delay=1, egs_filter=False) manageCommand.run() signal.alarm(0) def test_root_bad_loglevel(self): signal.alarm(10) with self.assertRaises(ValueError): Manager.root(None, None, 'NOTEXIST', False, False) signal.alarm(0) def test_root_bad_configfile(self): signal.alarm(10) logfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') with self.assertRaises(ValueError): Manager.root(None, logfile, 'DEBUG', True, True) signal.alarm(0) def test_main(self): signal.signal(signal.SIGALRM, handlerCrash) signal.alarm(10) config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') config.write('[config:server]\nauthentification=no\n') config.flush() logging.debug("load") Manager.main(['--conf=' + config.name]) logging.debug("end") signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_run_manager_command(self): # Enable timeout signal.signal(signal.SIGALRM, handlerCrash) signal.alarm(10) class MockServerHttp: def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass def join(self): pass config = configparser.ConfigParser() config.add_section('config:server') config.add_section('command:test') config.set('command:test', 'command', self.program) manage = Manager.Manager(False) try: manage.serverHttp = MockServerHttp() manage._load_config(config) manage.launch_command() for key in manage.info: queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] semaphore = manage.info[key]['semaphore'] logging.debug("[1] --------- send START") signal.alarm(10) semaphore.acquire() queueIn.put("START") # Enable timeout signal.alarm(10) item = queueOut.get() semaphore.release() self.assertEqual(item['state'], 'started', 'Error impossible to start program') signal.alarm(0) time.sleep(1) logging.debug("[2] --------- send STATUS") signal.alarm(10) semaphore.acquire() queueIn.put("STATUS") item = queueOut.get() semaphore.release() self.assertEqual(item['state'], 'started', 'Error impossible to read status') signal.alarm(0) time.sleep(1) logging.debug("[3] --------- send STDIN arg") signal.alarm(10) semaphore.acquire() queueIn.put("STDIN arg") item = queueOut.get() semaphore.release() self.assertEqual(item['state'], 'ok', 'Error when send STDIN') signal.alarm(0) time.sleep(1) logging.debug("[4] --------- send STDOUT 4") signal.alarm(10) semaphore.acquire() logging.debug("[4.1] --------- send STDOUT 4") queueIn.put("STDOUT 4") logging.debug("[4.2] --------- send STDOUT 4") item = queueOut.get() logging.debug("[4.3] --------- send STDOUT 4") semaphore.release() logging.debug("[4.4] --------- send STDOUT 4") signal.alarm(0) logging.debug("[4.5] --------- send STDOUT 4") self.assertEqual(item['first-line'], 4, 'Error when read STDOUT (Missing first-line)') self.assertEqual(item['last-line'], 6, 'Error when read STDOUT (Missing first-line)') # self.assertRegex(item, '^[{](.*)("first-line": 4)(.*)[}]$', 'Error when read STDOUT (Missing first-line)') # self.assertRegex(item, '^[{](.*)("last-line": 6)(.*)[}]$', 'Error when read STDOUT (Missing last-line)') self.assertRegex(str(item), '^(.*)(6 arg)(.*)$', 'Error when read STDOUT (bad record)') time.sleep(1) logging.debug("[5] --------- send BADCOMMAND") signal.alarm(10) semaphore.acquire() queueIn.put("BADCOMMAND") item = queueOut.get() semaphore.release() self.assertEqual(item['error'], 'command unknown', 'Error impossible to read status') signal.alarm(0) time.sleep(1) logging.debug("[6] --------- send STOP") signal.alarm(10) semaphore.acquire() queueIn.put("STOP") item = queueOut.get() semaphore.release() self.assertEqual(item["state"], "stopped", 'Error impossible to read status') signal.alarm(0) time.sleep(1) logging.debug("[7] --------- send SHUTDOWN") signal.alarm(10) semaphore.acquire() queueIn.put("SHUTDOWN") with self.assertRaises(queue.Empty): item = queueOut.get(timeout=5) semaphore.release() #threadCommand.join() logging.debug("--------- Stop all") manage.receive_signal(15, 1) manage.wait_children_commands() #Disable timeout signal.alarm(0) self.assertTrue(True) except Exception as e: logging.error("test_run_manager_command - Prepare Crash - %s" % e) logging.error(traceback.format_exc()) #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() print("test_run_manager_command - Force Crash - TimeOut !") os._exit(2) signal.alarm(0) signal.signal(signal.SIGALRM, handlerRaise) def test_run_manager_command_autostart(self): # Enable timeout signal.alarm(10) class MockServerHttp: def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass def join(self): pass config = configparser.ConfigParser() config.add_section('config:server') config.add_section('command:test') config.set('command:test', 'command', self.program) manage = Manager.Manager(True) manage.serverHttp = MockServerHttp() manage._load_config(config) manage.launch_command() key = list(manage.info.keys())[0] queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] semaphore = manage.info[key]['semaphore'] signal.alarm(10) semaphore.acquire() queueIn.put("STATUS") item = queueOut.get(timeout=4) semaphore.release() self.assertEqual(item['state'], "started", 'Error impossible to read status') time.sleep(1) signal.alarm(10) semaphore.acquire() queueIn.put("SHUTDOWN") with self.assertRaises(queue.Empty): item = queueOut.get(timeout=4) semaphore.release() #threadCommand.join() manage.receive_signal(15, 1) manage.wait_children_commands() #Disable timeout signal.alarm(0) self.assertTrue(True) signal.alarm(0) def test_run_manager_command_autostart_option(self): # Enable timeout signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) class MockServerHttp: def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass def join(self): pass config = configparser.ConfigParser() config.add_section('config:server') config.add_section('command:test') config.set('command:test', 'command', self.program) config.set('command:test', 'autostart', 'yes') manage = Manager.Manager(False) try: manage.serverHttp = MockServerHttp() manage._load_config(config) manage.launch_command() time.sleep(5) key = list(manage.info.keys())[0] queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] semaphore = manage.info[key]['semaphore'] signal.alarm(10) semaphore.acquire() queueIn.put("STATUS") item = queueOut.get(timeout=4) semaphore.release() self.assertEqual(item["state"], "started", 'program not started') time.sleep(1) signal.alarm(10) semaphore.acquire() queueIn.put("SHUTDOWN") with self.assertRaises(queue.Empty): item = queueOut.get(timeout=4) semaphore.release() #threadCommand.join() manage.receive_signal(15, 1) manage.wait_children_commands() #Disable timeout signal.alarm(0) except TimeoutError: print("test_run_manager_command_restart_after_crash - Prepare Crash - TimeOut !") #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") os._exit(2) except Exception as e: print("test_run_manager_command_restart_after_crash - Prepare Crash - %s" % e) #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() handlerCrash(15, 1) print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") os._exit(2) self.assertTrue(True) signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_run_manager_command_restart_after_crash(self): # Enable timeout signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) logging.debug("--------- test_run_manager_command_restart_after_crash => Start") class MockServerHttp: def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass def join(self): pass config = configparser.ConfigParser() config.add_section('config:server') config.add_section('command:test') config.set('command:test', 'command', self.program + ' --no-loop --timeout=5') config.set('command:test', 'autostart', 'yes') config.set('command:test', 'restart_after_crash', 'yes') config.set('command:test', 'restart_delay', '5') manage = Manager.Manager(False) try: manage.serverHttp = MockServerHttp() manage._load_config(config) manage.launch_command() key = list(manage.info.keys())[0] queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] semaphore = manage.info[key]['semaphore'] logging.debug("--------- wait not started") signal.alarm(20) item = {"state": "started"} while item['state'] == "started": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") try: item = queueOut.get() except queue.Empty: pass semaphore.release() self.assertEqual(item['state'], "crashed", 'program not started') signal.alarm(20) logging.debug("--------- wait not crashed") while item['state'] == "crashed": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") try: item = queueOut.get(timeout=4) except queue.Empty: pass semaphore.release() signal.alarm(20) self.assertEqual(item['state'], "started", 'program not started') logging.debug("--------- wait not started") signal.alarm(20) while item['state'] == "started": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") try: item = queueOut.get(timeout=4) except queue.Empty: pass semaphore.release() self.assertEqual(item['state'], "crashed", 'program not started') signal.alarm(20) logging.debug("--------- wait not crashed") # item = "crashed" while item['state'] == "crashed": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") try: item = queueOut.get() except queue.Empty: pass semaphore.release() signal.alarm(20) logging.debug("--------- SHUTDOWN") time.sleep(1) signal.alarm(10) semaphore.acquire() queueIn.put("SHUTDOWN") semaphore.release() #threadCommand.join() manage.receive_signal(15, 1) manage.wait_children_commands() #Disable timeout signal.alarm(0) except TimeoutError: print("test_run_manager_command_restart_after_crash - Prepare Crash - TimeOut !") #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") os._exit(2) except Exception as e: print("test_run_manager_command_restart_after_crash - Prepare Crash - %s" % e) #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() handlerCrash(15, 1) print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") os._exit(2) self.assertTrue(True) signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) class MockStreamRequestHandler(): def __init__(self): self.value = "{}" self.message = None def define_return(self, value): self.value = value def read(self, numbercar): return bytes(self.value, "utf-8") def write(self, message): self.message = message class MockServer(): def __init__(self): self.listSemaphore = {} self.listQueueIn = {} self.listQueueOut = {} self.authentification = False @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_log_message(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.address_string = MagicMock() manage.log_message('example') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_set_headers(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage._set_headers() manage.send_response.assert_called_with(200) # manage.send_header.assert_called_with('Content-type', 'application/json') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_1(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.headers = {} manage._command_log() manage.send_error.assert_called_with(400, "bad content-type") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_2(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.headers = {'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, "bad content-length") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_3(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage.rfile = TestManager.MockStreamRequestHandler() manage._command_log() manage.send_error.assert_called_with(400, 'Missing param name') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_4(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.rfile.define_return( '{"name": "test"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, 'Name unknown') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_5(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listQueueIn = {'test': ''} manage.rfile.define_return( '{"name": "test"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, 'Missing param first-line') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_6(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listQueueIn = {'test': ''} manage.rfile.define_return( '{"name": "test", "first-line" : "test"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, 'Impossible to read first-line') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_7(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} #manage.server.listQueueOut['test'].put("empty") manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() #self.assertEqual(b'empty',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_8(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.server.listQueueOut['test'].put("empty") manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() self.assertEqual(b'"empty"',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_list(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage._send_list() self.assertEqual(b'{"0": "test"}',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_shutdown(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage._send_shutdown() self.assertEqual(b'{"shutdown": "ok"}',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_all(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.server.listQueueOut['test'].put("empty") manage._send_command_all("test") self.assertEqual(b'{"test": "empty"}',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_all_timeout(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage._send_command_all("test") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_1(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() manage.send_error.assert_called_with(400, 'Missing param name') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_2(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.rfile.define_return( '{"name": "testnew"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() manage.send_error.assert_called_with(400, 'Name unknown') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_3(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.rfile.define_return( '{"name": "test"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() manage.send_error.assert_called_with(400, 'Missing param action') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_4(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.rfile.define_return( '{"name": "test", "action": "example"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() self.assertEqual(None,manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_5(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.server.listQueueOut['test'].put({"state": "empty"}) manage.rfile.define_return( '{"name": "test", "action": "example"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() self.assertEqual(b'{"state": "empty"}',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_6(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.headers = {} manage._send_action() signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_1(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.headers = {} manage._send_command("STATUS") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_2(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_command("STATUS") manage.send_error.assert_called_with(400, 'Missing param name') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_3(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage.server.listQueueOut['test'].put({"state": "empty"}) manage.rfile.define_return( '{"name": "test"}' ) manage._send_command("STATUS") self.assertEqual(b'{"state": "empty"}',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_4(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage.rfile.define_return( '{"name": "test"}' ) manage._send_command("STATUS") manage.send_error.assert_called_with(500, 'Missing return') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_5(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage.server.listQueueOut['test'].put("empty") manage.rfile.define_return( '{"name": "testnew"}' ) manage._send_command("STATUS") manage.send_error.assert_called_with(400, 'Name unknown') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_1(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._command_log = MagicMock() manage.path = "/STDOUT" manage.do_GET() manage._command_log.assert_called_with() signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_1(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() res = manage.check_authentication() self.assertEqual(True, res) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_2(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.server.authentification = True res = manage.check_authentication() self.assertEqual(False, res) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_3(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() logging.error = MagicMock() manage.server.authentification = True manage.headers = {'Authorization' : 'Other'} res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Authentification with Bad method (Other)") @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_4(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() logging.error = MagicMock() manage.server.authentification = True manage.headers = {'Authorization' : 'Basic '} res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Error detected list index out of range") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_5(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() logging.error = MagicMock() manage.server.authentification = True manage.headers = {'Authorization' : 'Basic test:$2b$12$AoRGhrAYsuSlROUDx.0AZ.3SPBVYd8wD.Vyz7jCRXZdwm7ArEN3Oi'} res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Error detected Incorrect padding") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_6(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.server.users = {"user1": ""} logging.error = MagicMock() manage.server.authentification = True manage.headers = {'Authorization' : 'Basic dGVzdDokMmIkMTIkQW9SR2hyQVlzdVNsUk9VRHguMEFaLjNTUEJWWWQ4d0QuVnl6N2pDUlhaZHdtN0FyRU4zT2k='} res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Authentification with unknown user (test)") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_7(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() #manage.server.users = {"username": bytes("$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i", "utf-8")} #manage.server.users = {"username": b"$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i"} manage.server.users = {"username": "$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i".encode('utf-8')} logging.error = MagicMock() manage.server.authentification = True #manage.headers = {'Authorization': bytes('Basic dXNlcm5hbWU6cGFzc3dvcmQ=', 'UTF-8')} manage.headers = {'Authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='} res = manage.check_authentication() self.assertEqual(True, res) #logging.error.assert_called_with("Authentification with user (username)") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_8(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.server.users = {"username": "$2b$12$/rfBBlTy3E9CgB8ZGeWFBOo54UN5Ogj3PuEOHVJcXyQ2hL6kYVwWW".encode('utf-8')} logging.error = MagicMock() manage.server.authentification = True manage.headers = {'Authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='} res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Authentification with wrong password for user (username)") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_2(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_command = MagicMock() manage.path = "/STATUS" manage.do_GET() manage._send_command.assert_called_with("STATUS") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_3(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_list = MagicMock() manage.path = "/LIST" manage.do_GET() manage._send_list.assert_called_with() signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_4(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_command_all = MagicMock() manage.path = "/STATUSALL" manage.do_GET() manage._send_command_all.assert_called_with("STATUS") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_5(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.send_error = MagicMock() manage.path = "/BADPATH" manage.do_GET() manage.send_error.assert_called_with(400, 'Path unknown') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_6(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.server.authentification = True manage.send_error = MagicMock() manage.path = "/BADPATH" manage.do_GET() manage.send_error.assert_called_with(403, 'Wrong authentication') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_1(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_command = MagicMock() manage.path = "/START" manage.do_POST() manage._send_command.assert_called_with("START") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_2(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_command = MagicMock() manage.path = "/STOP" manage.do_POST() manage._send_command.assert_called_with("STOP") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_3(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_action = MagicMock() manage.path = "/STDIN" manage.do_POST() manage._send_action.assert_called_with() signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_4(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_shutdown = MagicMock() manage.path = "/SHUTDOWN" manage.do_POST() manage._send_shutdown.assert_called_with() signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_5(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_command_all = MagicMock() manage.path = "/STARTALL" manage.do_POST() manage._send_command_all.assert_called_with("START") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_6(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage._send_command_all = MagicMock() manage.path = "/STOPALL" manage.do_POST() manage._send_command_all.assert_called_with("STOP") signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_7(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage._send_command = MagicMock() manage._send_action = MagicMock() manage._send_shutdown = MagicMock() manage._send_command_all = MagicMock() manage.path = "/BADPATH" manage.do_POST() manage.send_error.assert_called_with(400, 'Path unknown') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_8(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.server = TestManager.MockServer() manage.server.authentification = True manage.send_error = MagicMock() manage.path = "/BADPATH" manage.do_POST() manage.send_error.assert_called_with(403, 'Wrong authentication') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_HEAD(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.path = "test" manage.do_HEAD() manage.send_error.assert_called_with(404, 'File Not Found: test') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_PUT(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.path = "test" manage.do_PUT() manage.send_error.assert_called_with(404, 'File Not Found: test') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_PATCH(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.path = "test" manage.do_PATCH() manage.send_error.assert_called_with(404, 'File Not Found: test') signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_DELETE(self, init): # Enable timeout signal.alarm(10) manage = Manager.ManageHttpRequest(None, None, None) manage.log_message = MagicMock() manage.send_response = MagicMock() manage.send_header = MagicMock() manage.send_error = MagicMock() manage.end_headers = MagicMock() manage.path = "test" manage.do_DELETE() manage.send_error.assert_called_with(404, 'File Not Found: test') signal.alarm(0) if __name__ == '__main__': logging.getLogger('logging') handlers = [] handlers.append(logging.StreamHandler()) logging.basicConfig(handlers=handlers, level=0, format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s') unittest.main()