2018-02-02 20:52:32 +00:00
|
|
|
#!/usr/bin/python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
#
|
|
|
|
# create certificate (use for test)
|
|
|
|
# Copyright (C) 2017 AleaJactaEst
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
import tempfile
|
|
|
|
import os
|
|
|
|
import configparser
|
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
try:
|
|
|
|
import pymanager.client as Client
|
|
|
|
except ImportError:
|
|
|
|
import sys
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import pymanager.client as Client
|
|
|
|
|
|
|
|
try:
|
|
|
|
import pymanager.certificate as cert
|
|
|
|
except ImportError:
|
|
|
|
import sys
|
|
|
|
import os
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import pymanager.certificate as cert
|
|
|
|
|
|
|
|
class TestManager(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_cmp_to_key_correct(self):
|
|
|
|
list = [1, 1]
|
|
|
|
for key in sorted(list, key=Client.cmp_to_key()):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_cmp_to_key_bad_value(self):
|
|
|
|
list = ['A', '1']
|
|
|
|
for key in sorted(list, key=Client.cmp_to_key()):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_cmp_to_key_all_int(self):
|
|
|
|
mref = Client.cmp_to_key()
|
|
|
|
m1 = mref(1)
|
|
|
|
m2 = mref(2)
|
|
|
|
if m1 == m2:
|
|
|
|
pass
|
|
|
|
if m1 >= m2:
|
|
|
|
pass
|
|
|
|
if m1 <= m2:
|
|
|
|
pass
|
|
|
|
if m1 < m2:
|
|
|
|
pass
|
|
|
|
if m1 > m2:
|
|
|
|
pass
|
|
|
|
if m1 != m2:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_cmp_to_key_all_string(self):
|
|
|
|
mref = Client.cmp_to_key()
|
|
|
|
m1 = mref('a')
|
|
|
|
m2 = mref('a')
|
|
|
|
if m1 == m2:
|
|
|
|
pass
|
|
|
|
if m1 >= m2:
|
|
|
|
pass
|
|
|
|
if m1 <= m2:
|
|
|
|
pass
|
|
|
|
if m1 < m2:
|
|
|
|
pass
|
|
|
|
if m1 > m2:
|
|
|
|
pass
|
|
|
|
if m1 != m2:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_load_config_file(self):
|
|
|
|
cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t')
|
|
|
|
cfgfile.write('#\n[config:server]\nauthentification = No\n')
|
|
|
|
cfgfile.flush()
|
|
|
|
client = Client.Client(None, None, None)
|
|
|
|
client.load_config(cfgfile)
|
|
|
|
|
|
|
|
def test_load_config_file_none(self):
|
|
|
|
client = Client.Client(None, None, None)
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
client.load_config(None)
|
|
|
|
|
|
|
|
def test_load_config(self):
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
config.add_section('config:server')
|
|
|
|
config.set('config:server', 'port', '8000')
|
|
|
|
config.set('config:server', 'keyfile', '/home/gameserver/ca/appli/private/serverkey.pem')
|
|
|
|
config.set('config:server', 'certfile', '/home/gameserver/ca/appli/certs/servercert.pem')
|
|
|
|
config.set('config:server', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
|
|
|
config.set('config:server', 'address', '')
|
|
|
|
config.set('config:server', 'authentification', 'yes')
|
|
|
|
config.add_section('config:client')
|
|
|
|
config.set('config:client', 'port', '8000')
|
|
|
|
config.set('config:client', 'keyfile', '/home/gameserver/ca/appli/private/clientkey.pem')
|
|
|
|
config.set('config:client', 'certfile', '/home/gameserver/ca/appli/certs/clientcert.pem')
|
|
|
|
config.set('config:client', 'ca_cert', '/home/gameserver/ca/appli/certs/cachaincert.pem')
|
|
|
|
config.set('config:client', 'address', '127.0.0.1')
|
|
|
|
config.add_section('command:test')
|
|
|
|
config.set('command:test', 'path', '/home/gameserver')
|
|
|
|
config.set('command:test', 'command', '/bin/sleep 10')
|
|
|
|
config.set('command:test', 'logsize', '10')
|
|
|
|
config.set('command:test', 'bufsize', '10')
|
|
|
|
config.add_section('config:user')
|
|
|
|
config.set('config:user', 'usename', 'filter_all, filter_admin')
|
|
|
|
try:
|
|
|
|
client = Client.Client(None, None, None)
|
|
|
|
client._load_config(config)
|
|
|
|
self.assertTrue(True)
|
|
|
|
except:
|
|
|
|
self.fail('Error detected on load config')
|
|
|
|
|
|
|
|
def test_load_config_empty(self):
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
config.add_section('config:server')
|
|
|
|
config.set('config:server', 'authentification', 'yes')
|
|
|
|
config.add_section('config:client')
|
|
|
|
#config.set('config:client', 'port', '8000')
|
|
|
|
config.add_section('command:test')
|
|
|
|
config.set('command:test', 'path', '/home/gameserver')
|
|
|
|
try:
|
|
|
|
client = Client.Client(None, None, None)
|
|
|
|
client._load_config(config)
|
|
|
|
self.assertTrue(True)
|
|
|
|
except:
|
|
|
|
self.fail('Error detected on load config')
|
|
|
|
|
|
|
|
def test_load_config_bad_param_port(self):
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
config.add_section('config:server')
|
|
|
|
config.add_section('config:client')
|
|
|
|
config.set('config:client', 'port', 'A')
|
|
|
|
client = Client.Client(None, None, None)
|
|
|
|
with self.assertRaises(Exception):
|
|
|
|
client._load_config(config)
|
|
|
|
|
|
|
|
def test_init_client(self):
|
2018-02-13 19:43:02 +00:00
|
|
|
Client.Client('username', True, 'password')
|
2018-02-02 20:52:32 +00:00
|
|
|
|
|
|
|
def test_init_client_missing_password(self):
|
|
|
|
with self.assertRaises(Exception):
|
2018-02-13 19:43:02 +00:00
|
|
|
Client.Client('username', True, None)
|
2018-02-02 20:52:32 +00:00
|
|
|
|
|
|
|
def test_init_client_stdin_password_but_send_param(self):
|
|
|
|
with self.assertRaises(Exception):
|
2018-02-13 19:43:02 +00:00
|
|
|
Client.Client('username', False, 'password')
|
2018-02-02 20:52:32 +00:00
|
|
|
|
|
|
|
@patch("getpass.getpass")
|
|
|
|
def test_init_client_stdin_password(self, getpass):
|
|
|
|
getpass.return_value = "password"
|
2018-02-13 19:43:02 +00:00
|
|
|
Client.Client('username', False, None)
|
2018-02-02 20:52:32 +00:00
|
|
|
|
|
|
|
def test_https_init(self):
|
2018-02-13 19:43:02 +00:00
|
|
|
Client.HTTPSConnectionCertificate(None, None, None)
|
2018-02-02 20:52:32 +00:00
|
|
|
|
|
|
|
@patch("socket.create_connection")
|
|
|
|
@patch("ssl.wrap_socket")
|
|
|
|
def test_https_connection(self, socket, ssl):
|
|
|
|
#socket.return_value = ""
|
|
|
|
#ssl.return_value = ""
|
|
|
|
https = Client.HTTPSConnectionCertificate(None, None, None)
|
|
|
|
https.connect()
|
|
|
|
|
|
|
|
@patch("socket.create_connection")
|
|
|
|
@patch("ssl.wrap_socket")
|
|
|
|
def test_https_connection_with_ca(self, socket, ssl):
|
|
|
|
#socket.return_value = ""
|
|
|
|
#ssl.return_value = ""
|
|
|
|
https = Client.HTTPSConnectionCertificate(None, None, 'ca')
|
|
|
|
https.connect()
|
|
|
|
|
|
|
|
def test_client_send_json(self):
|
2018-02-13 19:43:02 +00:00
|
|
|
#workdir = tempfile.mkdtemp(prefix='test_client_send_json')
|
2018-02-02 20:52:32 +00:00
|
|
|
workdir_cert_root = tempfile.mkdtemp(prefix='pymanager-certificate-root-')
|
|
|
|
workdir_cert_appli = tempfile.mkdtemp(prefix='pymanager-certificate-application-')
|
|
|
|
args=['--workdir-cert-root', workdir_cert_root, '--workdir-cert-appli', workdir_cert_appli]
|
|
|
|
cert.main(args)
|
|
|
|
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
config.add_section('config:server')
|
|
|
|
config.set('config:server', 'port', '8000')
|
|
|
|
config.set('config:server', 'keyfile', os.path.join(workdir_cert_appli, 'private', 'serverkey.pem'))
|
|
|
|
config.set('config:server', 'certfile', os.path.join(workdir_cert_appli, 'certs', 'servercert.pem'))
|
|
|
|
config.set('config:server', 'ca_cert', os.path.join(workdir_cert_appli, 'certs', 'cachaincert.pem'))
|
|
|
|
config.add_section('config:client')
|
|
|
|
config.set('config:client', 'port', '8000')
|
|
|
|
config.set('config:client', 'keyfile', os.path.join(workdir_cert_appli, 'private', 'clientkey.pem'))
|
|
|
|
config.set('config:client', 'certfile', os.path.join(workdir_cert_appli, 'certs', 'clientcert.pem'))
|
|
|
|
config.set('config:client', 'ca_cert', os.path.join(workdir_cert_appli, 'certs', 'cachaincert.pem'))
|
|
|
|
config.set('config:client', 'address', '127.0.0.1')
|
|
|
|
config.add_section('config:user')
|
|
|
|
config.set('config:user', 'usename', 'filter_all, filter_admin')
|
|
|
|
try:
|
|
|
|
client = Client.Client(None, None, None)
|
|
|
|
client._load_config(config)
|
|
|
|
self.assertTrue(True)
|
|
|
|
except:
|
|
|
|
self.fail('Error detected on load config')
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|