update check
This commit is contained in:
parent
07f475c65c
commit
db1b6e97ff
3 changed files with 89 additions and 38 deletions
28
.coveragerc
Normal file
28
.coveragerc
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
# .coveragerc to control coverage.py
|
||||||
|
[run]
|
||||||
|
# branch = True
|
||||||
|
concurrency = multiprocessing
|
||||||
|
omit = /usr/lib/python3/*,tests/*
|
||||||
|
|
||||||
|
[report]
|
||||||
|
# Regexes for lines to exclude from consideration
|
||||||
|
exclude_lines =
|
||||||
|
# Have to re-enable the standard pragma
|
||||||
|
pragma: no cover
|
||||||
|
|
||||||
|
# Don't complain about missing debug-only code:
|
||||||
|
def __repr__
|
||||||
|
if self\.debug
|
||||||
|
|
||||||
|
# Don't complain if tests don't hit defensive assertion code:
|
||||||
|
raise AssertionError
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
# Don't complain if non-runnable code isn't run:
|
||||||
|
if 0:
|
||||||
|
if __name__ == .__main__.:
|
||||||
|
|
||||||
|
omit = /usr/lib/python3/*,tests/*
|
||||||
|
|
||||||
|
[html]
|
||||||
|
directory = docs/coverage
|
|
@ -25,6 +25,7 @@ import multiprocessing
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
import queue
|
import queue
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import pymanager.manager as Manager
|
import pymanager.manager as Manager
|
||||||
|
@ -57,40 +58,6 @@ class TestManager(unittest.TestCase):
|
||||||
self.program = os.path.join(self.path, 'simulate_program.py')
|
self.program = os.path.join(self.path, 'simulate_program.py')
|
||||||
self.badprogram = os.path.join(self.path, 'test.cfg')
|
self.badprogram = os.path.join(self.path, 'test.cfg')
|
||||||
|
|
||||||
def testMain(self):
|
|
||||||
logfile = tempfile.NamedTemporaryFile(suffix=".log")
|
|
||||||
workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-')
|
|
||||||
workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-')
|
|
||||||
|
|
||||||
# cert.root(logfile,
|
|
||||||
# 'DEBUG',
|
|
||||||
# False,
|
|
||||||
# workdir_cert_root,
|
|
||||||
# workdir_cert_appli,
|
|
||||||
# self.openssl,
|
|
||||||
# self.size_root,
|
|
||||||
# self.size_appli,
|
|
||||||
# self.size_child,
|
|
||||||
# self.passroot,
|
|
||||||
# self.passappli,
|
|
||||||
# self.country_name,
|
|
||||||
# self.state_or_province_name,
|
|
||||||
# self.locality_name,
|
|
||||||
# self.organization_name,
|
|
||||||
# self.common_name)
|
|
||||||
#
|
|
||||||
# conf = tempfile.NamedTemporaryFile(suffix="test.cfg")
|
|
||||||
# conf.write(bytes(
|
|
||||||
#'[config]\n'
|
|
||||||
#'port = 8000\n'
|
|
||||||
#'keyfile = %s/private/serverkey.pem\n'
|
|
||||||
#'certfile = %s/private/servercert.pem\n'
|
|
||||||
#'ca_cert = %s/certs/cachaincert.pem\n'
|
|
||||||
#'address =\n' %(workdir_cert_appli, workdir_cert_appli, workdir_cert_appli ), 'UTF-8'))
|
|
||||||
# args=['--conf', conf]
|
|
||||||
# #manager.main(args)
|
|
||||||
self.assertTrue(True)
|
|
||||||
|
|
||||||
def test_load_config(self):
|
def test_load_config(self):
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
config.add_section('config:server')
|
config.add_section('config:server')
|
||||||
|
@ -417,6 +384,29 @@ class TestManager(unittest.TestCase):
|
||||||
event=event)
|
event=event)
|
||||||
manageCommand.run()
|
manageCommand.run()
|
||||||
|
|
||||||
|
def test_root_bad_loglevel(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
Manager.root(None,
|
||||||
|
None,
|
||||||
|
'NOTEXIST',
|
||||||
|
False,
|
||||||
|
False)
|
||||||
|
|
||||||
|
def test_root_bad_configfile(self):
|
||||||
|
logfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
Manager.root(None,
|
||||||
|
logfile,
|
||||||
|
'DEBUG',
|
||||||
|
True,
|
||||||
|
True)
|
||||||
|
|
||||||
|
def test_main(self):
|
||||||
|
config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t')
|
||||||
|
config.write('[config:server]\nauthentification=no\n')
|
||||||
|
config.flush()
|
||||||
|
Manager.main(['--conf=' + config.name])
|
||||||
|
|
||||||
# def test_run_manager_command(self):
|
# def test_run_manager_command(self):
|
||||||
# # Doesn't work (we need enable --concurrency=multiprocessing on coverage command but we need coverage 4.0)
|
# # Doesn't work (we need enable --concurrency=multiprocessing on coverage command but we need coverage 4.0)
|
||||||
# logsize = 10
|
# logsize = 10
|
||||||
|
@ -434,7 +424,6 @@ class TestManager(unittest.TestCase):
|
||||||
# queueOut,
|
# queueOut,
|
||||||
# event))
|
# event))
|
||||||
# threadCommand.start()
|
# threadCommand.start()
|
||||||
#
|
|
||||||
# event.set()
|
# event.set()
|
||||||
# queueIn.put("START")
|
# queueIn.put("START")
|
||||||
# item = queueOut.get(timeout=4)
|
# item = queueOut.get(timeout=4)
|
||||||
|
@ -452,9 +441,43 @@ class TestManager(unittest.TestCase):
|
||||||
# item = queueOut.get(timeout=4)
|
# item = queueOut.get(timeout=4)
|
||||||
# print("-" * 80, "wait thread" )
|
# print("-" * 80, "wait thread" )
|
||||||
# threadCommand.join()
|
# threadCommand.join()
|
||||||
#
|
|
||||||
# self.assertTrue(True)
|
# self.assertTrue(True)
|
||||||
|
#
|
||||||
|
# def test_run_manager_command_2(self):
|
||||||
|
# manage = Manager.Manager(True)
|
||||||
|
# logsize = 10
|
||||||
|
# bufsize = 10
|
||||||
|
# queueIn = multiprocessing.Queue()
|
||||||
|
# queueOut = multiprocessing.Queue()
|
||||||
|
# event = multiprocessing.Event()
|
||||||
|
# manage.runCommand('test_run_manager_command',
|
||||||
|
# self.program,
|
||||||
|
# self.path,
|
||||||
|
# logsize,
|
||||||
|
# bufsize,
|
||||||
|
# queueIn,
|
||||||
|
# queueOut,
|
||||||
|
# event)
|
||||||
|
# event.set()
|
||||||
|
# queueIn.put("START")
|
||||||
|
# item = queueOut.get(timeout=4)
|
||||||
|
# self.assertEqual(item, "started", 'Error impossible to start program')
|
||||||
|
# time.sleep(1)
|
||||||
|
# event.set()
|
||||||
|
# queueIn.put("STATUS")
|
||||||
|
# item = queueOut.get(timeout=4)
|
||||||
|
# self.assertEqual(item, "started", 'Error impossible to start program')
|
||||||
|
# time.sleep(1)
|
||||||
|
# print("-" * 80, "shutdown" )
|
||||||
|
# event.set()
|
||||||
|
# queueIn.put("SHUTDOWN")
|
||||||
|
# with self.assertRaises(queue.Empty):
|
||||||
|
# item = queueOut.get(timeout=4)
|
||||||
|
# print("-" * 80, "wait thread" )
|
||||||
|
# #threadCommand.join()
|
||||||
|
# manage.receive_signal(15, 1)
|
||||||
|
# manage.wait_children_commands()
|
||||||
|
# self.assertTrue(True)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in a new issue