update coverity/test
This commit is contained in:
parent
f009fe9b90
commit
027c366e7e
5 changed files with 549 additions and 12 deletions
|
@ -1,6 +1,6 @@
|
|||
# .coveragerc to control coverage.py
|
||||
[run]
|
||||
# branch = True
|
||||
branch = True
|
||||
concurrency = multiprocessing
|
||||
omit = /usr/lib/python3/*,tests/*
|
||||
[report]
|
||||
|
|
1
Makefile
1
Makefile
|
@ -31,6 +31,7 @@ test:
|
|||
|
||||
coverage:
|
||||
$(PYTHONCOVERAGE) erase
|
||||
$(PYTHONCOVERAGE) run -a --concurrency=multiprocessing tests/test_client_manager.py
|
||||
$(PYTHONCOVERAGE) run -a tests/test_certificate.py
|
||||
$(PYTHONCOVERAGE) run -a tests/test_manager.py
|
||||
$(PYTHONCOVERAGE) run -a tests/test_client.py
|
||||
|
|
|
@ -152,23 +152,23 @@ class TestManager(unittest.TestCase):
|
|||
client._load_config(config)
|
||||
|
||||
def test_init_client(self):
|
||||
client = Client.Client('username', True, 'password')
|
||||
Client.Client('username', True, 'password')
|
||||
|
||||
def test_init_client_missing_password(self):
|
||||
with self.assertRaises(Exception):
|
||||
client = Client.Client('username', True, None)
|
||||
Client.Client('username', True, None)
|
||||
|
||||
def test_init_client_stdin_password_but_send_param(self):
|
||||
with self.assertRaises(Exception):
|
||||
client = Client.Client('username', False, 'password')
|
||||
Client.Client('username', False, 'password')
|
||||
|
||||
@patch("getpass.getpass")
|
||||
def test_init_client_stdin_password(self, getpass):
|
||||
getpass.return_value = "password"
|
||||
client = Client.Client('username', False, None)
|
||||
Client.Client('username', False, None)
|
||||
|
||||
def test_https_init(self):
|
||||
https = Client.HTTPSConnectionCertificate(None, None, None)
|
||||
Client.HTTPSConnectionCertificate(None, None, None)
|
||||
|
||||
@patch("socket.create_connection")
|
||||
@patch("ssl.wrap_socket")
|
||||
|
@ -187,7 +187,7 @@ class TestManager(unittest.TestCase):
|
|||
https.connect()
|
||||
|
||||
def test_client_send_json(self):
|
||||
workdir = tempfile.mkdtemp(prefix='test_client_send_json')
|
||||
#workdir = tempfile.mkdtemp(prefix='test_client_send_json')
|
||||
workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-')
|
||||
workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-')
|
||||
args=['--workdir-cert-root', workdir_cert_root, '--workdir-cert-appli', workdir_cert_appli]
|
||||
|
|
|
@ -21,7 +21,10 @@ import unittest
|
|||
import tempfile
|
||||
import os
|
||||
import configparser
|
||||
from unittest.mock import patch
|
||||
import signal
|
||||
import time
|
||||
import coverage
|
||||
coverage.process_startup()
|
||||
|
||||
try:
|
||||
import pymanager.client as Client
|
||||
|
@ -45,11 +48,15 @@ except ImportError:
|
|||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import pymanager.manager as Manager
|
||||
|
||||
|
||||
class TestManager(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
self.path = os.path.dirname(os.path.abspath(__file__))
|
||||
self.program = os.path.join(self.path, 'simulate_program.py')
|
||||
|
||||
def test_client_send_json(self):
|
||||
signal.alarm(10)
|
||||
|
||||
workdir = tempfile.mkdtemp(prefix='test_client_send_json')
|
||||
workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-')
|
||||
workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-')
|
||||
|
@ -57,20 +64,22 @@ class TestManager(unittest.TestCase):
|
|||
cert.main(args)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
port = 8000
|
||||
config.add_section('config:server')
|
||||
config.set('config:server', 'port', '8000')
|
||||
config.set('config:server', 'port', str(port))
|
||||
config.set('config:server', 'keyfile', os.path.join(workdir_cert_appli, 'private', 'serverkey.pem'))
|
||||
config.set('config:server', 'certfile', os.path.join(workdir_cert_appli, 'certs', 'servercert.pem'))
|
||||
config.set('config:server', 'ca_cert', os.path.join(workdir_cert_appli, 'certs', 'cachaincert.pem'))
|
||||
config.add_section('config:client')
|
||||
config.set('config:client', 'port', '8000')
|
||||
config.set('config:server', 'port', str(port))
|
||||
config.set('config:client', 'keyfile', os.path.join(workdir_cert_appli, 'private', 'clientkey.pem'))
|
||||
config.set('config:client', 'certfile', os.path.join(workdir_cert_appli, 'certs', 'clientcert.pem'))
|
||||
config.set('config:client', 'ca_cert', os.path.join(workdir_cert_appli, 'certs', 'cachaincert.pem'))
|
||||
config.set('config:client', 'address', '127.0.0.1')
|
||||
config.add_section('command:test')
|
||||
config.set('command:test', 'path', workdir)
|
||||
config.set('command:test', 'command', os.path.join(os.path.abspath(__file__), 'simulate_program.py'))
|
||||
config.set('command:test', 'command', self.program)
|
||||
print("-"*10, 'A1')
|
||||
try:
|
||||
client = Client.Client(None, None, None)
|
||||
client._load_config(config)
|
||||
|
@ -78,6 +87,36 @@ class TestManager(unittest.TestCase):
|
|||
except:
|
||||
self.fail('Error detected on load config')
|
||||
|
||||
print("-"*10, 'A2')
|
||||
|
||||
manage = Manager.Manager(False)
|
||||
manage._load_config(config)
|
||||
manage.initialize_http()
|
||||
manage.launch_command()
|
||||
manage.launch_server_http()
|
||||
time.sleep(1)
|
||||
signal.alarm(10)
|
||||
msgjson = client.send_json({'name': 'command:test'}, 'GET', "/STATUS", show_result=False)
|
||||
assert msgjson == {'state' : 'stopped'}
|
||||
msgjson = client.send_json({'name': 'command:test'}, 'POST', "/START", show_result=False)
|
||||
assert msgjson == {'state' : 'started'}
|
||||
msgjson = client.send_json({'name': 'command:test'}, 'GET', "/STATUS", show_result=False)
|
||||
assert msgjson == {'state' : 'started'}
|
||||
msgjson = client.send_json({'name': 'command:test', 'action': 'test'}, 'POST', "/STDIN", show_result=False)
|
||||
print(msgjson)
|
||||
assert msgjson == {'state' : 'ok'}
|
||||
print("-"*80)
|
||||
msgjson = client.send_json({'name': 'command:test'}, 'POST', "/STOP", show_result=False)
|
||||
assert msgjson == {'state' : 'stopped'}
|
||||
msgjson = client.send_json({'name': 'command:test'}, 'GET', "/STATUS", show_result=False)
|
||||
assert msgjson == {'state' : 'stopped'}
|
||||
|
||||
print("-"*10, 'A3')
|
||||
manage.receive_signal(15, 1)
|
||||
manage.wait_children_commands()
|
||||
#Disable timeout
|
||||
signal.alarm(0)
|
||||
self.assertTrue
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -26,7 +26,9 @@ import time
|
|||
import re
|
||||
import queue
|
||||
import signal
|
||||
import http.server
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
try:
|
||||
import pymanager.manager as Manager
|
||||
|
@ -542,6 +544,501 @@ class TestManager(unittest.TestCase):
|
|||
signal.alarm(0)
|
||||
self.assertTrue(True)
|
||||
|
||||
class MockStreamRequestHandler():
|
||||
def __init__(self):
|
||||
self.value = "{}"
|
||||
self.message = None
|
||||
def define_return(self, value):
|
||||
self.value = value
|
||||
def read(self, numbercar):
|
||||
return bytes(self.value, "utf-8")
|
||||
def write(self, message):
|
||||
self.message = message
|
||||
|
||||
class MockServer():
|
||||
def __init__(self):
|
||||
self.listEvent = {}
|
||||
self.listQueueIn = {}
|
||||
self.listQueueOut = {}
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_log_message(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.address_string = MagicMock()
|
||||
manage.log_message('example')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_set_headers(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage._set_headers()
|
||||
manage.send_response.assert_called_with(200)
|
||||
manage.send_header.assert_called_with('Content-type', 'application/json')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_1(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.headers = {}
|
||||
manage._command_log()
|
||||
manage.send_error.assert_called_with(400, "bad content-type")
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_2(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.headers = {'content-type' : 'application/json'}
|
||||
manage._command_log()
|
||||
manage.send_error.assert_called_with(400, "bad content-length")
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_3(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage._command_log()
|
||||
manage.send_error.assert_called_with(400, 'Missing param name')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_4(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.rfile.define_return( '{"name": "test"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._command_log()
|
||||
manage.send_error.assert_called_with(400, 'Name unknown')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_5(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listQueueIn = {'test': ''}
|
||||
manage.rfile.define_return( '{"name": "test"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._command_log()
|
||||
manage.send_error.assert_called_with(400, 'Missing param first-line')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_6(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listQueueIn = {'test': ''}
|
||||
manage.rfile.define_return( '{"name": "test", "first-line" : "test"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._command_log()
|
||||
manage.send_error.assert_called_with(400, 'Impossible to read first-line')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_7(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
#manage.server.listQueueOut['test'].put("empty")
|
||||
manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._command_log()
|
||||
#self.assertEqual(b'empty',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_command_log_8(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.server.listQueueOut['test'].put("empty")
|
||||
manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._command_log()
|
||||
self.assertEqual(b'empty',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_list(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage._send_list()
|
||||
self.assertEqual(b'{"0": "test"}',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_shutdown(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage._send_shutdown()
|
||||
self.assertEqual(b'{"shutdown": "ok"}',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_all(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.server.listQueueOut['test'].put("empty")
|
||||
manage._send_command_all("test")
|
||||
self.assertEqual(b'{"test": "empty"}',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_all_timeout(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage._send_command_all("test")
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_action_1(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._send_action()
|
||||
manage.send_error.assert_called_with(400, 'Missing param name')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_action_2(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.rfile.define_return( '{"name": "testnew"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._send_action()
|
||||
manage.send_error.assert_called_with(400, 'Name unknown')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_action_3(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.rfile.define_return( '{"name": "test"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._send_action()
|
||||
manage.send_error.assert_called_with(400, 'Missing param action')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_action_4(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.rfile.define_return( '{"name": "test", "action": "example"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._send_action()
|
||||
self.assertEqual(None,manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_action_5(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.server.listQueueOut['test'].put("empty")
|
||||
manage.rfile.define_return( '{"name": "test", "action": "example"}' )
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._send_action()
|
||||
self.assertEqual(b'{"state": "empty"}',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_action_6(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.headers = {}
|
||||
manage._send_action()
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_1(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.headers = {}
|
||||
manage._send_command("STATUS")
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_2(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage._send_command("STATUS")
|
||||
manage.send_error.assert_called_with(400, 'Missing param name')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_3(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage.server.listQueueOut['test'].put("empty")
|
||||
manage.rfile.define_return( '{"name": "test"}' )
|
||||
manage._send_command("STATUS")
|
||||
self.assertEqual(b'{"state": "empty"}',manage.wfile.message)
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_4(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage.rfile.define_return( '{"name": "test"}' )
|
||||
manage._send_command("STATUS")
|
||||
manage.send_error.assert_called_with(500, 'Missing return')
|
||||
|
||||
@patch.object(http.server.SimpleHTTPRequestHandler, '__init__')
|
||||
def test_run_manage_send_command_5(self, init):
|
||||
# Enable timeout
|
||||
signal.alarm(10)
|
||||
manage = Manager.ManageHttpRequest(None, None, None)
|
||||
manage.log_message = MagicMock()
|
||||
manage.send_response = MagicMock()
|
||||
manage.send_header = MagicMock()
|
||||
manage.send_error = MagicMock()
|
||||
manage.end_headers = MagicMock()
|
||||
|
||||
manage.rfile = TestManager.MockStreamRequestHandler()
|
||||
manage.wfile = TestManager.MockStreamRequestHandler()
|
||||
manage.server = TestManager.MockServer()
|
||||
manage.server.listEvent = {'test': multiprocessing.Event() }
|
||||
manage.server.listQueueIn = {'test': multiprocessing.Queue() }
|
||||
manage.server.listQueueOut = {'test': multiprocessing.Queue()}
|
||||
manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'}
|
||||
manage.server.listQueueOut['test'].put("empty")
|
||||
manage.rfile.define_return( '{"name": "testnew"}' )
|
||||
manage._send_command("STATUS")
|
||||
manage.send_error.assert_called_with(400, 'Name unknown')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in a new issue