diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..38fb229 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,28 @@ +# .coveragerc to control coverage.py +[run] +# branch = True +concurrency = multiprocessing +omit = /usr/lib/python3/*,tests/* + +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: + +omit = /usr/lib/python3/*,tests/* + +[html] +directory = docs/coverage diff --git a/pymanager/password.py b/pymanager/password.py index fad084d..0c80b4b 100755 --- a/pymanager/password.py +++ b/pymanager/password.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- # -# scriptto manipulate password file +# script to manipulate password file # # Copyright (C) 2017 AleaJactaEst # diff --git a/tests/test_manager.py b/tests/test_manager.py index 21b54b8..4a8ec91 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -25,6 +25,7 @@ import multiprocessing import time import re import queue +from unittest.mock import patch try: import pymanager.manager as Manager @@ -57,40 +58,6 @@ class TestManager(unittest.TestCase): self.program = os.path.join(self.path, 'simulate_program.py') self.badprogram = os.path.join(self.path, 'test.cfg') - def testMain(self): - logfile = tempfile.NamedTemporaryFile(suffix=".log") - workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-') - workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-') - -# cert.root(logfile, -# 'DEBUG', -# False, -# workdir_cert_root, -# workdir_cert_appli, -# self.openssl, -# self.size_root, -# self.size_appli, -# self.size_child, -# self.passroot, -# self.passappli, -# self.country_name, -# self.state_or_province_name, -# self.locality_name, -# self.organization_name, -# self.common_name) -# -# conf = tempfile.NamedTemporaryFile(suffix="test.cfg") -# conf.write(bytes( -#'[config]\n' -#'port = 8000\n' -#'keyfile = %s/private/serverkey.pem\n' -#'certfile = %s/private/servercert.pem\n' -#'ca_cert = %s/certs/cachaincert.pem\n' -#'address =\n' %(workdir_cert_appli, workdir_cert_appli, workdir_cert_appli ), 'UTF-8')) -# args=['--conf', conf] -# #manager.main(args) - self.assertTrue(True) - def test_load_config(self): config = configparser.ConfigParser() config.add_section('config:server') @@ -417,6 +384,29 @@ class TestManager(unittest.TestCase): event=event) manageCommand.run() + def test_root_bad_loglevel(self): + with self.assertRaises(ValueError): + Manager.root(None, + None, + 'NOTEXIST', + False, + False) + + def test_root_bad_configfile(self): + logfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') + with self.assertRaises(ValueError): + Manager.root(None, + logfile, + 'DEBUG', + True, + True) + + def test_main(self): + config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') + config.write('[config:server]\nauthentification=no\n') + config.flush() + Manager.main(['--conf=' + config.name]) + # def test_run_manager_command(self): # # Doesn't work (we need enable --concurrency=multiprocessing on coverage command but we need coverage 4.0) # logsize = 10 @@ -434,7 +424,6 @@ class TestManager(unittest.TestCase): # queueOut, # event)) # threadCommand.start() -# # event.set() # queueIn.put("START") # item = queueOut.get(timeout=4) @@ -452,9 +441,43 @@ class TestManager(unittest.TestCase): # item = queueOut.get(timeout=4) # print("-" * 80, "wait thread" ) # threadCommand.join() -# # self.assertTrue(True) - +# +# def test_run_manager_command_2(self): +# manage = Manager.Manager(True) +# logsize = 10 +# bufsize = 10 +# queueIn = multiprocessing.Queue() +# queueOut = multiprocessing.Queue() +# event = multiprocessing.Event() +# manage.runCommand('test_run_manager_command', +# self.program, +# self.path, +# logsize, +# bufsize, +# queueIn, +# queueOut, +# event) +# event.set() +# queueIn.put("START") +# item = queueOut.get(timeout=4) +# self.assertEqual(item, "started", 'Error impossible to start program') +# time.sleep(1) +# event.set() +# queueIn.put("STATUS") +# item = queueOut.get(timeout=4) +# self.assertEqual(item, "started", 'Error impossible to start program') +# time.sleep(1) +# print("-" * 80, "shutdown" ) +# event.set() +# queueIn.put("SHUTDOWN") +# with self.assertRaises(queue.Empty): +# item = queueOut.get(timeout=4) +# print("-" * 80, "wait thread" ) +# #threadCommand.join() +# manage.receive_signal(15, 1) +# manage.wait_children_commands() +# self.assertTrue(True) if __name__ == '__main__': unittest.main()