1904 lines
79 KiB
Python
1904 lines
79 KiB
Python
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# create certificate (use for test)
|
|
# Copyright (C) 2017 AleaJactaEst
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import unittest
|
|
import tempfile
|
|
import os
|
|
import configparser
|
|
import multiprocessing
|
|
import time
|
|
import re
|
|
import queue
|
|
import signal
|
|
import http.server
|
|
import logging
|
|
import json
|
|
from unittest.mock import patch
|
|
from unittest.mock import MagicMock
|
|
import traceback
|
|
|
|
try:
|
|
import pymanager.manager as Manager
|
|
except ImportError:
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
import pymanager.manager as Manager
|
|
#try:
|
|
# import pymanager.certificate as cert
|
|
#except ImportError:
|
|
# import sys
|
|
# import os
|
|
# sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
# import pymanager.certificate as cert
|
|
|
|
|
|
def handlerCrash(signum, frame):
|
|
print("handlerCrash - TimeOut !")
|
|
for line in traceback.format_stack():
|
|
print(line.strip())
|
|
# Force Exit with all thread !
|
|
os._exit(2)
|
|
|
|
|
|
def handlerRaise(signum, frame):
|
|
print("handlerRaise - TimeOut !")
|
|
for line in traceback.format_stack():
|
|
print(line.strip())
|
|
raise "Timeout"
|
|
|
|
class TestManager(unittest.TestCase):
|
|
def setUp(self):
|
|
self.openssl = '/usr/bin/openssl'
|
|
self.size_root = 4096
|
|
self.size_appli = 4096
|
|
self.size_child = 2048
|
|
self.passroot = 'BadPasswordRoot'
|
|
self.passappli = 'BadPasswordApplication'
|
|
self.country_name = 'FR'
|
|
self.state_or_province_name = 'France'
|
|
self.locality_name = 'Paris'
|
|
self.organization_name = 'khanat'
|
|
self.common_name = 'khanat'
|
|
self.path = os.path.dirname(os.path.abspath(__file__))
|
|
self.program = os.path.join(self.path, 'simulate_program.py')
|
|
self.badprogram = os.path.join(self.path, 'test.cfg')
|
|
signal.signal(signal.SIGALRM, handlerCrash)
|
|
|
|
def test_load_config(self):
|
|
signal.alarm(10)
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.set('config:server', 'port', '8000')
|
|
config.set('config:server', 'keyfile', '/home/gameserver/ca/appli/private/serverkey.pem')
|
|
config.set('config:server', 'certfile', '/home/gameserver/ca/appli/certs/servercert.pem')
|
|
config.set('config:server', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
|
config.set('config:server', 'address', '')
|
|
config.set('config:server', 'authentification', 'yes')
|
|
config.set('config:server', 'method', 'https')
|
|
config.add_section('config:client')
|
|
config.set('config:client', 'port', '8000')
|
|
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
|
|
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
|
|
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
|
config.set('config:client', 'address', '127.0.0.1')
|
|
config.set('config:client', 'method', 'https')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'path', '/home/gameserver')
|
|
config.set('command:test', 'command', '/bin/sleep 10')
|
|
config.set('command:test', 'logsize', '10')
|
|
config.set('command:test', 'bufsize', '10')
|
|
config.set('command:test', 'keep_state', 'yes')
|
|
config.set('command:test', 'size_max_state', '1000')
|
|
config.set('command:test', 'add_state', '"^(.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P<ActivePlayer>.*)"')
|
|
config.set('command:test', 'del_state', '"^(.*)(disconnectPlayer).+[\s]+(?P<ActivePlayer>.*)[\s]+(is disconnected)"')
|
|
config.add_section('config:user')
|
|
config.set('config:user', 'usename', 'filter_all, filter_admin')
|
|
try:
|
|
manager = Manager.Manager(False)
|
|
manager._load_config(config)
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error detected on load config')
|
|
signal.alarm(0)
|
|
|
|
def test_load_config2(self):
|
|
signal.alarm(10)
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.set('config:server', 'authentification', 'no')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'path', '/home/gameserver')
|
|
config.set('command:test', 'command', '/bin/sleep 10')
|
|
try:
|
|
manager = Manager.Manager(False)
|
|
manager._load_config(config)
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error detected on load config')
|
|
signal.alarm(0)
|
|
|
|
def test_load_config_bad_param_logsize(self):
|
|
signal.alarm(10)
|
|
config = configparser.ConfigParser()
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', '/bin/sleep 10')
|
|
config.set('command:test', 'logsize', 'bidon')
|
|
with self.assertRaises(ValueError):
|
|
manager = Manager.Manager(False)
|
|
manager._load_config(config)
|
|
self.assertTrue(True)
|
|
signal.alarm(0)
|
|
|
|
def test_load_config_bad_param_bufsize(self):
|
|
signal.alarm(10)
|
|
config = configparser.ConfigParser()
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', '/bin/sleep 10')
|
|
config.set('command:test', 'bufsize', 'bidon')
|
|
with self.assertRaises(ValueError):
|
|
manager = Manager.Manager(False)
|
|
manager._load_config(config)
|
|
self.assertTrue(True)
|
|
signal.alarm(0)
|
|
|
|
def test_load_config_empty(self):
|
|
signal.alarm(10)
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.add_section('config:client')
|
|
config.set('config:client', 'port', '8000')
|
|
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
|
|
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
|
|
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
|
config.set('config:client', 'address', '127.0.0.1')
|
|
config.set('config:client', 'method', 'https')
|
|
config.add_section('config:user')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', '/bin/sleep 10')
|
|
try:
|
|
manager = Manager.Manager(False)
|
|
manager._load_config(config)
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error detected on load config')
|
|
signal.alarm(0)
|
|
|
|
def test_load_config_file(self):
|
|
signal.alarm(10)
|
|
cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t')
|
|
cfgfile.write('#\n[config:server]\nauthentification = No\n')
|
|
cfgfile.flush()
|
|
try:
|
|
manager = Manager.Manager(False)
|
|
manager.load_config(cfgfile)
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error detected on load configuration')
|
|
signal.alarm(0)
|
|
|
|
def test_load_config_file_none(self):
|
|
signal.alarm(10)
|
|
with self.assertRaises(ValueError):
|
|
manager = Manager.Manager(False)
|
|
manager.load_config(None)
|
|
signal.alarm(0)
|
|
|
|
def test_load_password_file(self):
|
|
signal.alarm(10)
|
|
pwdfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
|
|
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.set('config:server', 'authentification', 'yes')
|
|
config.set('config:server', 'passwordfile', pwdfile.name)
|
|
pwdfile.write('username:$2a$12$2C97xW0KC/vFp3YyjlOgU.fWXJ3EiGT2Ihb0SWN9Mw0XI4WngiUqS\n\nusername2:badhash\n')
|
|
pwdfile.flush()
|
|
try:
|
|
manager = Manager.Manager(False)
|
|
manager._load_config(config)
|
|
manager.load_password()
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error detected on load password')
|
|
signal.alarm(0)
|
|
|
|
def test_constructor_manager_command(self):
|
|
signal.alarm(10)
|
|
try:
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_constructor_manager_command',
|
|
self.program,
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False)
|
|
manageCommand.list_thread()
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
signal.alarm(0)
|
|
|
|
def test_managercommand_analyze_line(self):
|
|
manage = Manager.ManageCommand('test_execute_manager_command',
|
|
"",
|
|
"",
|
|
1000,
|
|
1000,
|
|
None,
|
|
None,
|
|
None,
|
|
False,
|
|
1000,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
True)
|
|
manage._analyze_line("message 1")
|
|
manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Kezxaa(Lirria)' from BS stream file 'characters/002/account_2_0_pdr.bin'")
|
|
if '2' not in manage.state_load_character:
|
|
self.assertTrue(False, "LOADED - Missing player 2")
|
|
if '0' not in manage.state_load_character['2']:
|
|
self.assertTrue(False, "LOADED - Missing charactere 0 for player 2")
|
|
manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Puskle(Lirria)' from BS stream file 'characters/002/account_2_1_pdr.bin'")
|
|
if '1' not in manage.state_load_character['2']:
|
|
self.assertTrue(False, "LOADED - Missing charactere 1 for player 2")
|
|
manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '3' Character 'Puskle(Lirria)' from BS stream file 'characters/003/account_3_4_pdr.bin'")
|
|
if '3' not in manage.state_load_character:
|
|
self.assertTrue(False, "LOADED - Missing player 2")
|
|
manage._analyze_line("alpha egs_ecinfo EGS-132 : setActiveCharForPlayer EGS-132 : set active char 1 for player 2")
|
|
if '2' not in manage.state_active_character:
|
|
self.assertTrue(False, "setActiveCharForPlayer - Missing player 2")
|
|
if 'NameDomain' not in manage.state_active_character['2']:
|
|
self.assertTrue(False, "setActiveCharForPlayer - Missing info player 2")
|
|
manage._analyze_line("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:81)")
|
|
if 'SID' not in manage.state_active_character['2']:
|
|
self.assertTrue(False, "setActiveCharForPlayer - Missing SID player 2")
|
|
manage._analyze_line("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:81) (Row 90501) added to mirror)")
|
|
manage._analyze_line("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:81)")
|
|
manage._analyze_line("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed")
|
|
manage._analyze_line("alpha egs_plinfo EGS-132 : Player with userId = 2 removed")
|
|
manage._analyze_line("alpha disconnectPlayer EGS-132 : player 2 is disconnected")
|
|
if '2' in manage.state_active_character:
|
|
self.assertTrue(False, "disconnectPlayer - player 2 always live")
|
|
|
|
def test_manager_command_player(self):
|
|
signal.signal(signal.SIGALRM, handlerRaise)
|
|
signal.alarm(30)
|
|
class MockServerHttp:
|
|
def append(self, name, queueIn, queueOut, semaphore):
|
|
pass
|
|
def terminate(self):
|
|
pass
|
|
def join(self):
|
|
pass
|
|
manage = Manager.Manager(True)
|
|
try:
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', self.program + ' --message=1 --no-loop --timeout=1')
|
|
config.set('command:test', 'restart_after_crash', 'no')
|
|
config.set('command:test', 'restart_delay', '1000')
|
|
config.set('command:test', 'egs_filter', 'yes')
|
|
|
|
manage.serverHttp = MockServerHttp()
|
|
manage._load_config(config)
|
|
manage.launch_command()
|
|
|
|
key = list(manage.info.keys())[0]
|
|
queueIn = manage.info[key]['queueIn']
|
|
queueOut = manage.info[key]['queueOut']
|
|
semaphore = manage.info[key]['semaphore']
|
|
|
|
signal.alarm(30)
|
|
item = "started"
|
|
while item == "started":
|
|
print("sleep")
|
|
time.sleep(1)
|
|
print("semaphore")
|
|
semaphore.acquire()
|
|
print("status")
|
|
queueIn.put("STATUS")
|
|
print("queue")
|
|
item = queueOut.get(timeout=4)
|
|
print(item)
|
|
semaphore.release()
|
|
print("Lecture STDOUT")
|
|
print("sleep")
|
|
time.sleep(1)
|
|
signal.alarm(30)
|
|
print("semaphore")
|
|
semaphore.acquire()
|
|
print("stdout")
|
|
queueIn.put("STDOUT")
|
|
print("Attend le retour STDOUT")
|
|
item = queueOut.get()
|
|
semaphore.release()
|
|
print("Resultat STDOUT")
|
|
print(item)
|
|
time.sleep(1)
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("SHUTDOWN")
|
|
semaphore.release()
|
|
with self.assertRaises(queue.Empty):
|
|
item = queueOut.get(timeout=4)
|
|
#threadCommand.join()
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
#Disable timeout
|
|
signal.alarm(0)
|
|
self.assertTrue(True)
|
|
signal.alarm(0)
|
|
except:
|
|
print("Prepare Crash - TimeOut !")
|
|
#self.fail('Error initialize object ManageCommand')
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
print("Force Crash - TimeOut !")
|
|
os._exit(2)
|
|
signal.alarm(0)
|
|
signal.signal(signal.SIGALRM, handlerCrash)
|
|
|
|
def test_execute_manager_command(self):
|
|
signal.alarm(10)
|
|
try:
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_execute_manager_command',
|
|
self.program,
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False)
|
|
manageCommand.status()
|
|
manageCommand.start()
|
|
manageCommand.status()
|
|
manageCommand.start()
|
|
foundEnd = re.compile('.*(Started).*')
|
|
foundY = re.compile('.*(sendToStdinY).*')
|
|
loop = 10
|
|
while loop > 0:
|
|
time.sleep(1)
|
|
out = manageCommand.getlog(0)
|
|
if foundEnd.match(out):
|
|
break
|
|
loop -= 1
|
|
if not foundEnd.match(out):
|
|
manageCommand.stop()
|
|
self.assertTrue(False, 'Missing message in log')
|
|
manageCommand.list_thread()
|
|
retA = manageCommand.action("sendToStdinA")
|
|
self.assertEqual(retA, "ok", 'Error impossible to send to stdin')
|
|
for i in range(0, 120):
|
|
retX = manageCommand.action("sendToStdin%d" % i)
|
|
self.assertEqual(retX, "ok", 'Error impossible to send to stdin')
|
|
retY = manageCommand.action("sendToStdinY")
|
|
self.assertEqual(retY, "ok", 'Error impossible to send to stdin')
|
|
loop = 10
|
|
while loop > 0:
|
|
time.sleep(1)
|
|
out = manageCommand.getlog(0)
|
|
if foundY.match(out):
|
|
break
|
|
loop -= 1
|
|
if not foundY.match(out):
|
|
manageCommand.stop()
|
|
self.assertTrue(False, 'Missing message in log')
|
|
|
|
manageCommand.stop()
|
|
manageCommand.status()
|
|
retZ = manageCommand.action("sendToStdinZ")
|
|
manageCommand.stop()
|
|
self.assertEqual(retZ, "ko", 'Error send to stdin when process is down')
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
signal.alarm(0)
|
|
|
|
def test_execute_crash_manager_command(self):
|
|
signal.alarm(10)
|
|
try:
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_execute_crash_manager_command',
|
|
self.program + ' --no-loop --timeout 1',
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False)
|
|
res = manageCommand.start()
|
|
self.assertEqual(res, 0)
|
|
wait = True
|
|
while wait:
|
|
time.sleep(1)
|
|
res = manageCommand.status()
|
|
if res != 0:
|
|
wait = False
|
|
self.assertEqual(res, 2)
|
|
manageCommand.list_thread()
|
|
manageCommand.stop()
|
|
manageCommand.status()
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
signal.alarm(0)
|
|
|
|
def test_execute_not_kill_manager_command(self):
|
|
signal.alarm(30)
|
|
try:
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_execute_not_kill_manager_command',
|
|
self.program + " --disable-kill",
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False,
|
|
maxWaitEnd = 2)
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
try:
|
|
res = manageCommand.start()
|
|
self.assertEqual(res, 0)
|
|
wait = True
|
|
while wait:
|
|
time.sleep(1)
|
|
res = manageCommand.status()
|
|
self.assertEqual(res, 0)
|
|
res = manageCommand.getlog(0)
|
|
try:
|
|
resjson = json.loads(res)
|
|
if 'last-line' in res:
|
|
if resjson['last-line'] == 5:
|
|
wait = False
|
|
except:
|
|
pass
|
|
manageCommand.list_thread()
|
|
manageCommand.stop()
|
|
res = manageCommand.status()
|
|
self.assertEqual(res, 1)
|
|
self.assertTrue(True)
|
|
except Exception as e:
|
|
self.fail('Error when run test (%s)' % str(e))
|
|
signal.alarm(0)
|
|
|
|
def test_execute_command_crashed(self):
|
|
try:
|
|
signal.alarm(20)
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_execute_command_crashed',
|
|
self.program + " --no-loop --timeout=1",
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False,
|
|
waitDelay = 1)
|
|
manageCommand.start()
|
|
wait = True
|
|
while wait:
|
|
time.sleep(1)
|
|
res = manageCommand.status()
|
|
if res == 2:
|
|
wait = False
|
|
res = manageCommand.status()
|
|
self.assertEqual(res, 2)
|
|
manageCommand.start()
|
|
wait = True
|
|
while wait:
|
|
time.sleep(1)
|
|
res = manageCommand.status()
|
|
if res == 2:
|
|
wait = False
|
|
res = manageCommand.status()
|
|
self.assertEqual(res, 2)
|
|
manageCommand.stop()
|
|
self.assertTrue(True)
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
signal.alarm(0)
|
|
|
|
def test_execute_command_file_not_found(self):
|
|
signal.alarm(10)
|
|
try:
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_execute_command_file_not_found',
|
|
self.program + "_not_exist",
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False)
|
|
ret = manageCommand.start()
|
|
manageCommand.stop()
|
|
self.assertEqual(ret, 2, 'Error object not generate error when program not exist')
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
signal.alarm(0)
|
|
|
|
def test_execute_command_permission(self):
|
|
signal.alarm(10)
|
|
try:
|
|
logsize = 10
|
|
bufsize = 10
|
|
queueIn = multiprocessing.Queue()
|
|
queueOut = multiprocessing.Queue()
|
|
semaphore = multiprocessing.Semaphore()
|
|
manageCommand = Manager.ManageCommand('test_execute_command_permission',
|
|
self.badprogram,
|
|
self.path,
|
|
logsize,
|
|
bufsize,
|
|
queueIn,
|
|
queueOut,
|
|
semaphore,
|
|
False,
|
|
1,
|
|
"",
|
|
"",
|
|
False,
|
|
False,
|
|
1,
|
|
False)
|
|
ret = manageCommand.start()
|
|
manageCommand.stop()
|
|
self.assertEqual(ret, 2, 'Error object not generate error when bad permission')
|
|
except:
|
|
self.fail('Error initialize object ManageCommand')
|
|
signal.alarm(0)
|
|
|
|
def _runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore):
|
|
"""
|
|
Thread to manage khaganat program
|
|
"""
|
|
signal.alarm(10)
|
|
manageCommand = Manager.ManageCommand(name=name,
|
|
command=command,
|
|
path=path,
|
|
logsize=logsize,
|
|
bufsize=bufsize,
|
|
queueIn=queueIn,
|
|
queueOut=queueOut,
|
|
semaphore=semaphore,
|
|
keep_state=False,
|
|
size_max_state=1,
|
|
add_state="",
|
|
del_state="",
|
|
autostart=False,
|
|
restart_after_crash=False,
|
|
restart_delay=1,
|
|
egs_filter=False)
|
|
manageCommand.run()
|
|
signal.alarm(0)
|
|
|
|
def test_root_bad_loglevel(self):
|
|
signal.alarm(10)
|
|
with self.assertRaises(ValueError):
|
|
Manager.root(None,
|
|
None,
|
|
'NOTEXIST',
|
|
False,
|
|
False)
|
|
signal.alarm(0)
|
|
|
|
def test_root_bad_configfile(self):
|
|
signal.alarm(10)
|
|
logfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
|
|
with self.assertRaises(ValueError):
|
|
Manager.root(None,
|
|
logfile,
|
|
'DEBUG',
|
|
True,
|
|
True)
|
|
signal.alarm(0)
|
|
|
|
def test_main(self):
|
|
signal.alarm(10)
|
|
config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
|
|
config.write('[config:server]\nauthentification=no\n')
|
|
config.flush()
|
|
Manager.main(['--conf=' + config.name])
|
|
signal.alarm(0)
|
|
|
|
def test_run_manager_command(self):
|
|
# Enable timeout
|
|
signal.signal(signal.SIGALRM, handlerCrash)
|
|
signal.alarm(10)
|
|
class MockServerHttp:
|
|
def append(self, name, queueIn, queueOut, semaphore):
|
|
pass
|
|
def terminate(self):
|
|
pass
|
|
def join(self):
|
|
pass
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', self.program)
|
|
|
|
manage = Manager.Manager(False)
|
|
try:
|
|
manage.serverHttp = MockServerHttp()
|
|
manage._load_config(config)
|
|
manage.launch_command()
|
|
for key in manage.info:
|
|
queueIn = manage.info[key]['queueIn']
|
|
queueOut = manage.info[key]['queueOut']
|
|
semaphore = manage.info[key]['semaphore']
|
|
|
|
logging.debug("[1] --------- send START")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("START")
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
item = queueOut.get(timeout=4)
|
|
semaphore.release()
|
|
self.assertEqual(item, "started", 'Error impossible to start program')
|
|
signal.alarm(0)
|
|
time.sleep(1)
|
|
logging.debug("[2] --------- send STATUS")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
item = queueOut.get()
|
|
semaphore.release()
|
|
self.assertEqual(item, "started", 'Error impossible to read status')
|
|
signal.alarm(0)
|
|
time.sleep(1)
|
|
logging.debug("[3] --------- send STDIN arg")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("STDIN arg")
|
|
item = queueOut.get()
|
|
semaphore.release()
|
|
self.assertEqual(item, "ok", 'Error when send STDIN')
|
|
signal.alarm(0)
|
|
time.sleep(1)
|
|
logging.debug("[4] --------- send STDOUT 4")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
logging.debug("[4.1] --------- send STDOUT 4")
|
|
queueIn.put("STDOUT 4")
|
|
logging.debug("[4.2] --------- send STDOUT 4")
|
|
item = queueOut.get()
|
|
logging.debug("[4.3] --------- send STDOUT 4")
|
|
semaphore.release()
|
|
logging.debug("[4.4] --------- send STDOUT 4")
|
|
signal.alarm(0)
|
|
logging.debug("[4.5] --------- send STDOUT 4")
|
|
self.assertRegex(item,
|
|
'^[{](.*)("first-line": 4)(.*)[}]$',
|
|
'Error when read STDOUT (Missing first-line)')
|
|
self.assertRegex(item,
|
|
'^[{](.*)("last-line": 6)(.*)[}]$',
|
|
'Error when read STDOUT (Missing last-line)')
|
|
self.assertRegex(item,
|
|
'^[{](.*)(6 arg")(.*)[}]$',
|
|
'Error when read STDOUT (bad record)')
|
|
time.sleep(1)
|
|
logging.debug("[5] --------- send BADCOMMAND")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("BADCOMMAND")
|
|
item = queueOut.get()
|
|
semaphore.release()
|
|
self.assertEqual(item, "error : command unknown", 'Error impossible to read status')
|
|
signal.alarm(0)
|
|
time.sleep(1)
|
|
logging.debug("[6] --------- send STOP")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("STOP")
|
|
item = queueOut.get()
|
|
semaphore.release()
|
|
self.assertEqual(item, "stopped", 'Error impossible to read status')
|
|
signal.alarm(0)
|
|
time.sleep(1)
|
|
logging.debug("[7] --------- send SHUTDOWN")
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("SHUTDOWN")
|
|
with self.assertRaises(queue.Empty):
|
|
item = queueOut.get(timeout=5)
|
|
semaphore.release()
|
|
#threadCommand.join()
|
|
logging.debug("--------- Stop all")
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
#Disable timeout
|
|
signal.alarm(0)
|
|
self.assertTrue(True)
|
|
except Exception as e:
|
|
logging.error("test_run_manager_command - Prepare Crash - %s" % e)
|
|
logging.error(traceback.format_exc())
|
|
#self.fail('Error initialize object ManageCommand')
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
print("test_run_manager_command - Force Crash - TimeOut !")
|
|
os._exit(2)
|
|
signal.alarm(0)
|
|
signal.signal(signal.SIGALRM, handlerCrash)
|
|
|
|
def test_run_manager_command_autostart(self):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
class MockServerHttp:
|
|
def append(self, name, queueIn, queueOut, semaphore):
|
|
pass
|
|
def terminate(self):
|
|
pass
|
|
def join(self):
|
|
pass
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', self.program)
|
|
|
|
manage = Manager.Manager(True)
|
|
manage.serverHttp = MockServerHttp()
|
|
manage._load_config(config)
|
|
manage.launch_command()
|
|
|
|
key = list(manage.info.keys())[0]
|
|
queueIn = manage.info[key]['queueIn']
|
|
queueOut = manage.info[key]['queueOut']
|
|
semaphore = manage.info[key]['semaphore']
|
|
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
item = queueOut.get(timeout=4)
|
|
semaphore.release()
|
|
self.assertEqual(item, "started", 'Error impossible to read status')
|
|
time.sleep(1)
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("SHUTDOWN")
|
|
with self.assertRaises(queue.Empty):
|
|
item = queueOut.get(timeout=4)
|
|
semaphore.release()
|
|
#threadCommand.join()
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
#Disable timeout
|
|
signal.alarm(0)
|
|
self.assertTrue(True)
|
|
signal.alarm(0)
|
|
|
|
def test_run_manager_command_autostart_option(self):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
class MockServerHttp:
|
|
def append(self, name, queueIn, queueOut, semaphore):
|
|
pass
|
|
def terminate(self):
|
|
pass
|
|
def join(self):
|
|
pass
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', self.program)
|
|
config.set('command:test', 'autostart', 'yes')
|
|
|
|
manage = Manager.Manager(False)
|
|
manage.serverHttp = MockServerHttp()
|
|
manage._load_config(config)
|
|
manage.launch_command()
|
|
time.sleep(5)
|
|
|
|
key = list(manage.info.keys())[0]
|
|
queueIn = manage.info[key]['queueIn']
|
|
queueOut = manage.info[key]['queueOut']
|
|
semaphore = manage.info[key]['semaphore']
|
|
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
item = queueOut.get(timeout=4)
|
|
semaphore.release()
|
|
self.assertEqual(item, "started", 'program not started')
|
|
time.sleep(1)
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("SHUTDOWN")
|
|
with self.assertRaises(queue.Empty):
|
|
item = queueOut.get(timeout=4)
|
|
semaphore.release()
|
|
#threadCommand.join()
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
#Disable timeout
|
|
signal.alarm(0)
|
|
self.assertTrue(True)
|
|
signal.alarm(0)
|
|
|
|
def test_run_manager_command_restart_after_crash(self):
|
|
# Enable timeout
|
|
signal.signal(signal.SIGALRM, handlerCrash)
|
|
signal.alarm(10)
|
|
logging.debug("--------- test_run_manager_command_restart_after_crash => Start")
|
|
class MockServerHttp:
|
|
def append(self, name, queueIn, queueOut, semaphore):
|
|
pass
|
|
def terminate(self):
|
|
pass
|
|
def join(self):
|
|
pass
|
|
config = configparser.ConfigParser()
|
|
config.add_section('config:server')
|
|
config.add_section('command:test')
|
|
config.set('command:test', 'command', self.program + ' --no-loop --timeout=5')
|
|
config.set('command:test', 'autostart', 'yes')
|
|
config.set('command:test', 'restart_after_crash', 'yes')
|
|
config.set('command:test', 'restart_delay', '5')
|
|
|
|
manage = Manager.Manager(False)
|
|
|
|
try:
|
|
manage.serverHttp = MockServerHttp()
|
|
manage._load_config(config)
|
|
manage.launch_command()
|
|
|
|
key = list(manage.info.keys())[0]
|
|
queueIn = manage.info[key]['queueIn']
|
|
queueOut = manage.info[key]['queueOut']
|
|
semaphore = manage.info[key]['semaphore']
|
|
|
|
logging.debug("--------- wait not started")
|
|
signal.alarm(20)
|
|
item = "started"
|
|
while item == "started":
|
|
time.sleep(1)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
try:
|
|
item = queueOut.get(timeout=4)
|
|
except queue.Empty:
|
|
pass
|
|
semaphore.release()
|
|
self.assertEqual(item, "crashed", 'program not started')
|
|
signal.alarm(20)
|
|
|
|
logging.debug("--------- wait not crashed")
|
|
# item = "crashed"
|
|
while item == "crashed":
|
|
time.sleep(1)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
try:
|
|
item = queueOut.get(timeout=4)
|
|
except queue.Empty:
|
|
pass
|
|
semaphore.release()
|
|
signal.alarm(20)
|
|
|
|
self.assertEqual(item, "started", 'program not started')
|
|
|
|
logging.debug("--------- wait not started")
|
|
signal.alarm(20)
|
|
item = "started"
|
|
while item == "started":
|
|
time.sleep(1)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
try:
|
|
item = queueOut.get(timeout=4)
|
|
except queue.Empty:
|
|
pass
|
|
semaphore.release()
|
|
self.assertEqual(item, "crashed", 'program not started')
|
|
signal.alarm(20)
|
|
|
|
self.assertEqual(item, "crashed", 'program not started')
|
|
signal.alarm(20)
|
|
|
|
logging.debug("--------- wait not crashed")
|
|
# item = "crashed"
|
|
while item == "crashed":
|
|
time.sleep(1)
|
|
semaphore.acquire()
|
|
queueIn.put("STATUS")
|
|
try:
|
|
item = queueOut.get(timeout=4)
|
|
except queue.Empty:
|
|
pass
|
|
semaphore.release()
|
|
signal.alarm(20)
|
|
|
|
logging.debug("--------- SHUTDOWN")
|
|
time.sleep(1)
|
|
signal.alarm(10)
|
|
semaphore.acquire()
|
|
queueIn.put("SHUTDOWN")
|
|
semaphore.release()
|
|
#threadCommand.join()
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
#Disable timeout
|
|
signal.alarm(0)
|
|
except:
|
|
print("Prepare Crash - TimeOut !")
|
|
#self.fail('Error initialize object ManageCommand')
|
|
manage.receive_signal(15, 1)
|
|
manage.wait_children_commands()
|
|
print("Force Crash - TimeOut !")
|
|
os._exit(2)
|
|
self.assertTrue(True)
|
|
signal.alarm(0)
|
|
|
|
class MockStreamRequestHandler():
|
|
def __init__(self):
|
|
self.value = "{}"
|
|
self.message = None
|
|
def define_return(self, value):
|
|
self.value = value
|
|
def read(self, numbercar):
|
|
return bytes(self.value, "utf-8")
|
|
def write(self, message):
|
|
self.message = message
|
|
|
|
class MockServer():
|
|
def __init__(self):
|
|
self.listSemaphore = {}
|
|
self.listQueueIn = {}
|
|
self.listQueueOut = {}
|
|
self.authentification = False
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_log_message(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.address_string = MagicMock()
|
|
manage.log_message('example')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_set_headers(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage._set_headers()
|
|
manage.send_response.assert_called_with(200)
|
|
# manage.send_header.assert_called_with('Content-type', 'application/json')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_1(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.headers = {}
|
|
manage._command_log()
|
|
manage.send_error.assert_called_with(400, "bad content-type")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_2(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.headers = {'content-type' : 'application/json'}
|
|
manage._command_log()
|
|
manage.send_error.assert_called_with(400, "bad content-length")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_3(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage._command_log()
|
|
manage.send_error.assert_called_with(400, 'Missing param name')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_4(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.rfile.define_return( '{"name": "test"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._command_log()
|
|
manage.send_error.assert_called_with(400, 'Name unknown')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_5(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listQueueIn = {'test': ''}
|
|
manage.rfile.define_return( '{"name": "test"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._command_log()
|
|
manage.send_error.assert_called_with(400, 'Missing param first-line')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_6(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listQueueIn = {'test': ''}
|
|
manage.rfile.define_return( '{"name": "test", "first-line" : "test"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._command_log()
|
|
manage.send_error.assert_called_with(400, 'Impossible to read first-line')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_7(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
#manage.server.listQueueOut['test'].put("empty")
|
|
manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._command_log()
|
|
#self.assertEqual(b'empty',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_command_log_8(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.server.listQueueOut['test'].put("empty")
|
|
manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._command_log()
|
|
self.assertEqual(b'empty',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_list(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage._send_list()
|
|
self.assertEqual(b'{"0": "test"}',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_shutdown(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage._send_shutdown()
|
|
self.assertEqual(b'{"shutdown": "ok"}',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_all(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.server.listQueueOut['test'].put("empty")
|
|
manage._send_command_all("test")
|
|
self.assertEqual(b'{"test": "empty"}',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_all_timeout(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage._send_command_all("test")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_action_1(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._send_action()
|
|
manage.send_error.assert_called_with(400, 'Missing param name')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_action_2(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.rfile.define_return( '{"name": "testnew"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._send_action()
|
|
manage.send_error.assert_called_with(400, 'Name unknown')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_action_3(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.rfile.define_return( '{"name": "test"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._send_action()
|
|
manage.send_error.assert_called_with(400, 'Missing param action')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_action_4(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.rfile.define_return( '{"name": "test", "action": "example"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._send_action()
|
|
self.assertEqual(None,manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_action_5(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.server.listQueueOut['test'].put("empty")
|
|
manage.rfile.define_return( '{"name": "test", "action": "example"}' )
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._send_action()
|
|
self.assertEqual(b'{"state": "empty"}',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_action_6(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.headers = {}
|
|
manage._send_action()
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_1(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.headers = {}
|
|
manage._send_command("STATUS")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_2(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage._send_command("STATUS")
|
|
manage.send_error.assert_called_with(400, 'Missing param name')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_3(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage.server.listQueueOut['test'].put("empty")
|
|
manage.rfile.define_return( '{"name": "test"}' )
|
|
manage._send_command("STATUS")
|
|
self.assertEqual(b'{"state": "empty"}',manage.wfile.message)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_4(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage.rfile.define_return( '{"name": "test"}' )
|
|
manage._send_command("STATUS")
|
|
manage.send_error.assert_called_with(500, 'Missing return')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_send_command_5(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
|
|
manage.rfile = TestManager.MockStreamRequestHandler()
|
|
manage.wfile = TestManager.MockStreamRequestHandler()
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.listSemaphore = {'test': multiprocessing.Semaphore() }
|
|
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
|
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
|
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
|
manage.server.listQueueOut['test'].put("empty")
|
|
manage.rfile.define_return( '{"name": "testnew"}' )
|
|
manage._send_command("STATUS")
|
|
manage.send_error.assert_called_with(400, 'Name unknown')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_GET_1(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._command_log = MagicMock()
|
|
manage.path = "/STDOUT"
|
|
manage.do_GET()
|
|
manage._command_log.assert_called_with()
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_1(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
res = manage.check_authentication()
|
|
self.assertEqual(True, res)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_2(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.authentification = True
|
|
res = manage.check_authentication()
|
|
self.assertEqual(False, res)
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_3(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
logging.error = MagicMock()
|
|
manage.server.authentification = True
|
|
manage.headers = {'Authorization' : 'Other'}
|
|
res = manage.check_authentication()
|
|
self.assertEqual(False, res)
|
|
logging.error.assert_called_with("Authentification with Bad method (Other)")
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_4(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
logging.error = MagicMock()
|
|
manage.server.authentification = True
|
|
manage.headers = {'Authorization' : 'Basic '}
|
|
res = manage.check_authentication()
|
|
self.assertEqual(False, res)
|
|
logging.error.assert_called_with("Error detected list index out of range")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_5(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
logging.error = MagicMock()
|
|
manage.server.authentification = True
|
|
manage.headers = {'Authorization' : 'Basic test:$2b$12$AoRGhrAYsuSlROUDx.0AZ.3SPBVYd8wD.Vyz7jCRXZdwm7ArEN3Oi'}
|
|
res = manage.check_authentication()
|
|
self.assertEqual(False, res)
|
|
logging.error.assert_called_with("Error detected Incorrect padding")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_6(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.users = {"user1": ""}
|
|
logging.error = MagicMock()
|
|
manage.server.authentification = True
|
|
manage.headers = {'Authorization' : 'Basic dGVzdDokMmIkMTIkQW9SR2hyQVlzdVNsUk9VRHguMEFaLjNTUEJWWWQ4d0QuVnl6N2pDUlhaZHdtN0FyRU4zT2k='}
|
|
res = manage.check_authentication()
|
|
self.assertEqual(False, res)
|
|
logging.error.assert_called_with("Authentification with unknown user (test)")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_7(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
#manage.server.users = {"username": bytes("$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i", "utf-8")}
|
|
#manage.server.users = {"username": b"$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i"}
|
|
manage.server.users = {"username": "$2b$12$f5kbmfXvUq3LDequGHcoqu06AueJ35TpIU2MIf5L8c9Y1PE/6Rz4i".encode('utf-8')}
|
|
logging.error = MagicMock()
|
|
manage.server.authentification = True
|
|
#manage.headers = {'Authorization': bytes('Basic dXNlcm5hbWU6cGFzc3dvcmQ=', 'UTF-8')}
|
|
manage.headers = {'Authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='}
|
|
res = manage.check_authentication()
|
|
self.assertEqual(True, res)
|
|
#logging.error.assert_called_with("Authentification with user (username)")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_check_authentication_8(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.users = {"username": "$2b$12$/rfBBlTy3E9CgB8ZGeWFBOo54UN5Ogj3PuEOHVJcXyQ2hL6kYVwWW".encode('utf-8')}
|
|
logging.error = MagicMock()
|
|
manage.server.authentification = True
|
|
manage.headers = {'Authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='}
|
|
res = manage.check_authentication()
|
|
self.assertEqual(False, res)
|
|
logging.error.assert_called_with("Authentification with wrong password for user (username)")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_GET_2(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_command = MagicMock()
|
|
manage.path = "/STATUS"
|
|
manage.do_GET()
|
|
manage._send_command.assert_called_with("STATUS")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_GET_3(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_list = MagicMock()
|
|
manage.path = "/LIST"
|
|
manage.do_GET()
|
|
manage._send_list.assert_called_with()
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_GET_4(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_command_all = MagicMock()
|
|
manage.path = "/STATUSALL"
|
|
manage.do_GET()
|
|
manage._send_command_all.assert_called_with("STATUS")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_GET_5(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.send_error = MagicMock()
|
|
manage.path = "/BADPATH"
|
|
manage.do_GET()
|
|
manage.send_error.assert_called_with(400, 'Path unknown')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_GET_6(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.authentification = True
|
|
manage.send_error = MagicMock()
|
|
manage.path = "/BADPATH"
|
|
manage.do_GET()
|
|
manage.send_error.assert_called_with(403, 'Wrong authentication')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_1(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_command = MagicMock()
|
|
manage.path = "/START"
|
|
manage.do_POST()
|
|
manage._send_command.assert_called_with("START")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_2(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_command = MagicMock()
|
|
manage.path = "/STOP"
|
|
manage.do_POST()
|
|
manage._send_command.assert_called_with("STOP")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_3(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_action = MagicMock()
|
|
manage.path = "/STDIN"
|
|
manage.do_POST()
|
|
manage._send_action.assert_called_with()
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_4(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_shutdown = MagicMock()
|
|
manage.path = "/SHUTDOWN"
|
|
manage.do_POST()
|
|
manage._send_shutdown.assert_called_with()
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_5(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_command_all = MagicMock()
|
|
manage.path = "/STARTALL"
|
|
manage.do_POST()
|
|
manage._send_command_all.assert_called_with("START")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_6(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage._send_command_all = MagicMock()
|
|
manage.path = "/STOPALL"
|
|
manage.do_POST()
|
|
manage._send_command_all.assert_called_with("STOP")
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_7(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
manage._send_command = MagicMock()
|
|
manage._send_action = MagicMock()
|
|
manage._send_shutdown = MagicMock()
|
|
manage._send_command_all = MagicMock()
|
|
manage.path = "/BADPATH"
|
|
manage.do_POST()
|
|
manage.send_error.assert_called_with(400, 'Path unknown')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_POST_8(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.server = TestManager.MockServer()
|
|
manage.server.authentification = True
|
|
manage.send_error = MagicMock()
|
|
manage.path = "/BADPATH"
|
|
manage.do_POST()
|
|
manage.send_error.assert_called_with(403, 'Wrong authentication')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_HEAD(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
manage.path = "test"
|
|
manage.do_HEAD()
|
|
manage.send_error.assert_called_with(404, 'File Not Found: test')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_PUT(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
manage.path = "test"
|
|
manage.do_PUT()
|
|
manage.send_error.assert_called_with(404, 'File Not Found: test')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_PATCH(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
manage.path = "test"
|
|
manage.do_PATCH()
|
|
manage.send_error.assert_called_with(404, 'File Not Found: test')
|
|
signal.alarm(0)
|
|
|
|
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
|
def test_run_manage_do_DELETE(self, init):
|
|
# Enable timeout
|
|
signal.alarm(10)
|
|
manage = Manager.ManageHttpRequest(None, None, None)
|
|
manage.log_message = MagicMock()
|
|
manage.send_response = MagicMock()
|
|
manage.send_header = MagicMock()
|
|
manage.send_error = MagicMock()
|
|
manage.end_headers = MagicMock()
|
|
manage.path = "test"
|
|
manage.do_DELETE()
|
|
manage.send_error.assert_called_with(404, 'File Not Found: test')
|
|
signal.alarm(0)
|
|
|
|
if __name__ == '__main__':
|
|
logging.getLogger('logging')
|
|
handlers = []
|
|
handlers.append(logging.StreamHandler())
|
|
logging.basicConfig(handlers=handlers, level=0,
|
|
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
|
|
unittest.main()
|