From 41b23d2e663d651b6e0e7347c8d53cb88b2c5903 Mon Sep 17 00:00:00 2001 From: AleaJactaEst Date: Sun, 7 Oct 2018 16:09:19 +0200 Subject: [PATCH] adding feature #1, and correct synchro multi thread --- pymanager/manager.py | 635 ++++++++++++++++++++++++++------ tests/simulate_program.py | 35 +- tests/test_manager.py | 735 +++++++++++++++++++++++++++++++------- 3 files changed, 1168 insertions(+), 237 deletions(-) diff --git a/pymanager/manager.py b/pymanager/manager.py index 32bcb0f..ba432e5 100755 --- a/pymanager/manager.py +++ b/pymanager/manager.py @@ -68,7 +68,22 @@ This script need configuration file (see below for model):: logsize = 1000 # buffer size (define value bufsize on subprocess.Popen) bufsize = 100 - + # It's possible to collect some message on output (example player conected) with regex command + # keep some data on array/dict state + keep_state = yes + # size array/dict state + size_max_state = 1000 + # search regex to add state (python regex) + add_state = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" + del_state = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" + # autostart (when start OpenNelManager, launch this program) + autostart = no + # restart after crash + restart_after_crash = yes + # Delay after each restart (second) + restart_delay = 10 + # Enable special filter EGS (account connection / command admin) + egs_filter = yes Manager ------- @@ -96,28 +111,37 @@ Design http(s) command : ----------------- -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **Html command** | **Path** | **Argument** {json format} | **Comment** | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **POST** | /SHUTDOWN | | Stop all process and stop pymanager | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **POST** | /STARTALL | | Start all processes | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **GET** | /STATUSALL | | Get status all processes | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **POST** | /STOPALL | | Stop all processes | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **POST** | /START | {'name': program} | Start for one program | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **GET** | /STATUS | {'name': program} | Get status for one program | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **POST** | /STOP | {'name': program} | Stop for one program | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ -| **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | -+------------------+-------------+---------------------------------------------+---------------------------------------------+ - ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **Html command** | **Path** | **Argument** {json format} | **Comment** | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **POST** | /SHUTDOWN | | Stop all process and stop pymanager | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **POST** | /STARTALL | | Start all processes | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /STATUSALL | | Get status all processes | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **POST** | /STOPALL | | Stop all processes | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **POST** | /START | {'name': program} | Start for one program | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /STATUS | {'name': program} | Get status for one program | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **POST** | /STOP | {'name': program} | Stop for one program | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /GETSTATE | {'name': program } | Get all state (key find in stdout add/remove) | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /CONFIG | {'name': program } | Get configuration | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /INFO | {'name': program } | Get Information (number player, ...) | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /PLAYER | {'name': program } | Get active player | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ +| **GET** | /ADMINCOMMAND | {'name': program } | Get admin commmand | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+ Example :: @@ -142,6 +166,7 @@ import json import fcntl import os import base64 +import re from socketserver import ThreadingMixIn try: @@ -150,7 +175,7 @@ try: except ImportError: __DISABLE_BCRYPT__ = True -__VERSION__ = '1.0' +__VERSION__ = '1.1.0' class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): @@ -240,7 +265,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.error("Impossible to read first-line '%s'" % msgjson['first-line']) return logging.debug("%s:%s" % (name, firstLine)) - self.server.listEvent[name].set() + self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put("STDOUT %d" % firstLine) logging.debug("Send request to '%s'" % (name)) try: @@ -248,6 +273,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): except queue.Empty: logging.debug("Received nothing from '%s'" % name) return + self.server.listSemaphore[name].release() self._set_headers() self.wfile.write(bytes(item, "utf-8")) logging.debug("item : %s" % item) @@ -265,8 +291,9 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): def _send_shutdown(self): """ Stop all program and stop manager """ for name in self.server.listQueueIn: - self.server.listEvent[name].set() + self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put("SHUTDOWN") + self.server.listSemaphore[name].release() self._set_headers() outjson = {'shutdown': 'ok'} self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -278,13 +305,14 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): """ outjson = {} for name in self.server.listQueueIn: - self.server.listEvent[name].set() + self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put(command) try: item = self.server.listQueueOut[name].get(timeout=4) except queue.Empty: logging.debug("pas de message recu pour %s" % name) return + self.server.listSemaphore[name].release() outjson.setdefault(name, item) self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -313,7 +341,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): action = msgjson['action'] logging.debug("%s:%s" % (name, action)) - self.server.listEvent[name].set() + self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put("STDIN %s" % action) logging.debug("message envoye: %s" % (name)) @@ -322,6 +350,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): except queue.Empty: logging.debug("pas de message recu pour %s" % name) return + self.server.listSemaphore[name].release() outjson = {'state': result} self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -345,7 +374,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.error("Name unknwon '%s'" % name) return logging.debug("[%s %s] Send command" % (command, name)) - self.server.listEvent[name].set() + self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put(command) try: result = self.server.listQueueOut[name].get(timeout=4) @@ -353,6 +382,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): self.send_error(500, 'Missing return') logging.debug("[%s %s] Missing return" % (command, name)) return + self.server.listSemaphore[name].release() logging.debug("[%s %s] => %s" % (command, name, result)) outjson = {'state': result} @@ -396,6 +426,16 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.error("Wrong authentication") elif self.path == '/STDOUT': self._command_log() + elif self.path == "/STATE": + self._send_command("STATE") + elif self.path == "/INFO": + self._send_command("INFO") + elif self.path == "/PLAYER": + self._send_command("PLAYER") + elif self.path == "/ADMINCOMMAND": + self._send_command("ADMINCOMMAND") + elif self.path == "/CONFIG": + self._send_command("CONFIG") elif self.path == '/STATUS': self._send_command("STATUS") elif self.path == '/LIST': @@ -467,7 +507,7 @@ class khaganatHTTPServer(ThreadingMixIn, http.server.HTTPServer): def __init__(self, listQueueIn, listQueueOut, - listEvent, + listSemaphore, server_address, RequestHandlerClass, authentification, @@ -476,7 +516,7 @@ class khaganatHTTPServer(ThreadingMixIn, http.server.HTTPServer): http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate) self.listQueueIn = listQueueIn self.listQueueOut = listQueueOut - self.listEvent = listEvent + self.listSemaphore = listSemaphore self.authentification = authentification self.users = users @@ -491,7 +531,7 @@ class ServerHttp(multiprocessing.Process): multiprocessing.Process.__init__(self) self.listQueueIn = {} self.listQueueOut = {} - self.listEvent = {} + self.listSemaphore = {} self.port = port self.key_file = keyfile self.cert_file = certfile @@ -505,7 +545,7 @@ class ServerHttp(multiprocessing.Process): server_address = (self.address, self.port) httpd = khaganatHTTPServer(self.listQueueIn, self.listQueueOut, - self.listEvent, + self.listSemaphore, server_address, ManageHttpRequest, self.authentification, @@ -532,10 +572,10 @@ class ServerHttp(multiprocessing.Process): raise ValueError httpd.serve_forever() - def append(self, name, queueIn, queueOut, event): + def append(self, name, queueIn, queueOut, semaphore): self.listQueueIn.setdefault(name, queueIn) self.listQueueOut.setdefault(name, queueOut) - self.listEvent.setdefault(name, event) + self.listSemaphore.setdefault(name, semaphore) class ManageCommand(): @@ -545,7 +585,13 @@ class ManageCommand(): * read output [in other thread] * communicate with ManageHttpRequest (with queueOut) """ - def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event, maxWaitEnd=10, waitDelay=1): + def __init__(self, name, + command, path, + logsize, bufsize, queueIn, queueOut, semaphore, + keep_state, size_max_state, add_state, del_state, + autostart, restart_after_crash, restart_delay, + egs_filter, + maxWaitEnd=10, waitDelay=1): self.process = None self.queueIn = queueIn self.queueOut = queueOut @@ -555,46 +601,217 @@ class ManageCommand(): self.log = [] self.poslastlog = 0 self.maxlog = logsize - self.event = event + self.semaphore = semaphore self.bufsize = bufsize self.threadRead = None self.running = False self.state = multiprocessing.Queue() self.pipeIn, self.pipeOut = multiprocessing.Pipe() - self.eventRunning = threading.Event() + self.eventRunningReader = threading.Event() + self.eventRunningRestart = threading.Event() self.maxWaitEnd = maxWaitEnd self.waitDelay = waitDelay + self.keep_state = keep_state + self.size_max_state = size_max_state + self.add_state_cmd = add_state[1:-1] + self.del_state_cmd = del_state[1:-1] + self.filter_add_state = re.compile(self.add_state_cmd) + self.filter_del_state = re.compile(self.del_state_cmd) + self.state = {} + self.autostart = autostart + self.restart_after_crash = restart_after_crash + self.restart_delay = restart_delay + self.threadRestart = None + self.egs_filter = egs_filter + self.egs_filter_load_character = re.compile(".*(LOADED User )'(?P[\d]+)' Character '(?P[^']+)' from BS stream file 'characters/([\d]+)/account_(?P[\d]+)_(?P[\d]+)_pdr.bin") + self.egs_filter_active_character = re.compile(".*(setActiveCharForPlayer).*(: set active char )(?P[\d]+)( for player )(?P[\d]+)") + self.egs_filter_sid = re.compile(".*(Mapping UID )(?P[\d]+)( => Sid )\((?P.*)\)") + self.egs_filter_client_ready = re.compile(".*(Updating IS_NEWBIE flag for character: )\((?P.*)\)") + self.egs_filter_disconnected = re.compile(".*(disconnectPlayer).+[\s]+(player )(?P[\d]+)[\s]+(is disconnected)") + self.egs_filter_admin = re.compile("(.*)(cbClientAdmin EGS-136 : )(ADMIN)(: Player )\((?P.*)\)(?P.+)") + self.state_load_character = {} + self.state_active_character = {} + self.state_admin = [] + self.number_start = 0 + self.first_line = 0 + self.last_line = 0 + + def _analyze_line(self, msg): + now = time.strftime('%Y/%m/%d %H:%M:%S %Z') + self.poslastlog += 1 + while len(self.log) >= self.maxlog: + self.log.pop(0) + self.first_line = self.first_line + 1 + self.log.append(now + ' ' + msg) + self.last_line = self.last_line + 1 + # If option sate is defined, analyze message and keep state (example , all player connected) + logging.debug("recu: '%s'" % (msg)) + if self.keep_state: + res = self.filter_add_state.match(msg) + if res: + logging.debug("add_state found") + if len(self.state) < self.size_max_state: + logging.debug("include add_state found") + dico = res.groupdict() + for key in dico: + logging.debug("set add_state found [%s]" % (str(key))) + if dico[key]: + logging.debug("set1 add_state found [%s][%s]" % (str(key), str(dico[key]))) + self.state.setdefault(key, {}) + self.state[key].setdefault(dico[key], now) + res = self.filter_del_state.match(msg) + if res: + logging.debug("del_state found") + dico = res.groupdict() + for key in dico: + logging.debug("prepare del_state found %s" % str(key)) + if dico[key]: + self.state.setdefault(key, {}) + if dico[key] in self.state[key]: + logging.debug("del1 del_state found [%s][%s][%s]" % (str(key), str(dico[key]), str(self.state[key]))) + del self.state[key][dico[key]] + if self.egs_filter: + res = self.egs_filter_load_character.match(msg) + if res: + logging.debug("egs_filter_load_character found") + if len(self.state_load_character) < self.size_max_state: + logging.debug("include add_state found") + dico = res.groupdict() + try: + self.state_load_character.setdefault(dico['UID'], {}) + self.state_load_character[dico['UID']].setdefault(dico['IDCHAR'], {'NameDomain': dico['NameDomain'], 'UIDBIS': dico['UIDBIS'], 'when': now}) + except KeyError as e: + logging.error('Missing key when read "load_character" (%s)' % e) + else: + logging.warning("impossible to add param 'load_character' (size too high)") + return + res = self.egs_filter_active_character.match(msg) + if res: + logging.debug("egs_filter_active_character found") + if len(self.state_active_character) < self.size_max_state: + dico = res.groupdict() + try: + self.state_active_character.setdefault(dico['UID'], {}) + self.state_active_character[dico['UID']] = self.state_load_character[dico['UID']][dico['IDCHAR']] + del self.state_load_character[dico['UID']] + except KeyError as e: + logging.error('Missing key when read "active_character" (%s)' % e) + else: + logging.warning("impossible to add param 'active_character' (size too high)") + return + res = self.egs_filter_sid.match(msg) + if res: + logging.debug("egs_filter_sid found") + dico = res.groupdict() + try: + if dico['UID'] in self.state_active_character: + self.state_active_character[dico['UID']].setdefault("SID", dico['SID']) + else: + logging.error('Impossible to add SID on player %s (Player not found)' % dico['UID']) + except KeyError as e: + logging.error('Missing key when read "sid" (%s)' % e) + return + res = self.egs_filter_disconnected.match(msg) + if res: + logging.debug("egs_filter_sid found") + dico = res.groupdict() + try: + if dico['UID'] in self.state_active_character: + del self.state_active_character[dico['UID']] + else: + logging.error('Impossible to remove player %s (Player not found)' % dico['UID']) + except KeyError as e: + logging.error('Missing key when remove player (%s)' % e) + return + res =self.egs_filter_admin.match(msg) + if res: + logging.debug("egs_filter_admin found") + while len(self.state_admin) >= self.maxlog: + self.state_admin.pop(0) + try: + username = '' + try: + for key in self.state_active_character: + if self.state_active_character[key]['SID'] == dico['SID']: + username = self.state_active_character[key]['NameDomain'] + break + except KeyError: + pass + self.state_admin.append( {'pos': self.pos_admin, 'when': now, 'SID': dico['SID'], 'ACTION': dico['ACTION'], 'USER': username}) + except KeyError as e: + logging.error('Missing key when admin player (%s)' % e) + self.pos_admin = self.pos_admin + 1 + return + + def _readline_stdout(self): + try: + line = self.process.stdout.readline() + except AttributeError: + logging.error("process %s down (not detected)" % self.name) + return True, False + except ValueError: + logging.error("process %s down (not detected)" % self.name) + return True, False + if not line: + time.sleep(self.waitDelay) + return False, True + logging.debug("line %s " % line) + self._analyze_line(line.decode().strip()) + return False, False def read_output(self): """ Thread to read output (stdout) """ fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) logging.debug("Start reader %s" % self.name) - while self.eventRunning.is_set(): - code = self.process.poll() - if code is not None: - logging.error("process %s down" % self.name) - self.eventRunning.clear() - continue + crashed = False + while self.eventRunningReader.is_set(): try: - line = self.process.stdout.readline() - except AttributeError: - logging.error("process %s down (not detected)" % self.name) - self.eventRunning.clear() - continue - if not line: - time.sleep(self.waitDelay) - continue - now = time.strftime('%Y/%m/%d %H:%M:%S %Z') - logging.debug("line %s " % line) - self.poslastlog += 1 - while len(self.log) >= self.maxlog: - self.log.pop(0) - msg = line.decode().strip() - self.log.append(now + ' ' + msg) - logging.debug("recu: '%s'" % (msg)) + logging.debug("ping") + code = self.process.poll() + if code is not None: + logging.error("process %s down" % self.name) + #self.eventRunning.clear() + crashed = True + except AttributeError as e: + logging.warning("process %s down (%s)" % (self.name, e)) + break + crashedbis, end = self._readline_stdout() + if end and (crashed or crashedbis): + break + # Send to thread manage process + if crashed: + logging.debug("Process stopped : '%s'" % self.name) + wait_semaphore = self.semaphore.acquire(False) + while self.eventRunningReader.is_set() and not wait_semaphore: + time.sleep(1) + wait_semaphore = self.semaphore.acquire(False) + if wait_semaphore == True: + self.queueIn.put("STOPPED") + self.semaphore.release() + if self.keep_state: + self.state_load_character = {} + self.state_active_character = {} logging.debug("End reader: '%s'" % self.name) + def restart(self): + """ Thread to restart after crash """ + logging.debug('initialize process restart %s (wait %ds)' % (self.name, self.restart_delay)) + time.sleep(self.restart_delay) + logging.debug('Prepare restart service %s' % (self.name)) + wait_semaphore = self.semaphore.acquire(False) + while self.eventRunningRestart.is_set() and not wait_semaphore: + logging.debug('Ping - restart service %s' % (self.name)) + time.sleep(1) + wait_semaphore = self.semaphore.acquire(False) + logging.debug('Prepare restart service %s (step 2)' % (self.name)) + if wait_semaphore == True: + logging.debug('Restart service %s' % (self.name)) + self.queueIn.put("START") + self.queueOut.get() + self.semaphore.release() + logging.debug('Prepare restart service %s (step 3)' % (self.name)) + def handler(self, signum, frame): """ Managed signal (not used) """ if self.process: @@ -612,10 +829,12 @@ class ManageCommand(): code = self.process.poll() if code is None: logging.debug("%s already exist" % self.name) - return "already-started" + return 0 else: logging.debug("%s crashed" % self.name) code = self.process.wait() + self.process.stdin.close() + self.process.stdout.close() logging.error("%s crashed (return code:%d) - restart program" % (self.name, code)) try: self.process = subprocess.Popen(self.command.split(), @@ -628,19 +847,23 @@ class ManageCommand(): close_fds=True) except FileNotFoundError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) - return "crashed" + return 2 except PermissionError as e: logging.error("Impossible to start %s (%s)" % (self.name, e)) - return "crashed" - self.eventRunning.set() + return 2 if self.threadRead: - self.eventRunning.clear() + self.eventRunningReader.clear() self.threadRead.join() self.threadRead = None self.running = True + self.eventRunningReader.set() self.threadRead = threading.Thread(target=self.read_output) self.threadRead.start() - return "started" + tmp = self.number_start + tmp = tmp + 1 + if tmp > self.number_start: + self.number_start = tmp + return 0 def status(self): """ Get status of program """ @@ -649,15 +872,16 @@ class ManageCommand(): logging.debug("status %s - check" % (self.name)) code = self.process.poll() if code is None: - logging.debug("%s status" % (self.name)) - return "started" + logging.debug("%s status [started]" % (self.name)) + return 0 else: logging.error("%s crashed (return code:%d)" % (self.name, code)) - self.process = None - return "stopped" + #self.semaphore + #self.queueIn.put("STOPPED") + return 2 else: - logging.debug("%s status" % (self.name)) - return "stopped" + logging.debug("%s status [stopped]" % (self.name)) + return 1 def list_thread(self): """ List number thrad (not used) """ @@ -671,7 +895,7 @@ class ManageCommand(): """ Stop program """ logging.debug("stop %s" % (self.name)) if not self.process: - return "stopped" + return 1 else: try: code = self.process.poll() @@ -684,7 +908,6 @@ class ManageCommand(): loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) - try: loop = self.maxWaitEnd while (code is None) and (loop > 0): @@ -695,7 +918,6 @@ class ManageCommand(): loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) - try: loop = self.maxWaitEnd while (code is None) and (loop > 0): @@ -706,25 +928,26 @@ class ManageCommand(): loop -= 1 except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) - try: code = self.process.wait() + self.process.stdin.close() + self.process.stdout.close() self.process = None if self.threadRead: - self.eventRunning.clear() + self.eventRunningReader.clear() self.threadRead.join() self.threadRead = None logging.info("%s stopped (return code:%d)" % (self.name, code)) except ProcessLookupError as e: logging.warning("Stop process (%s)" % str(e)) - return "stopped" + return 1 def getlog(self, firstline): """ Get log """ logging.debug("read log %d " % firstline) outjson = {} pos = self.poslastlog - len(self.log) + 1 - firstlinefound = None + firstlinefound = 0 for line in self.log: if pos >= firstline: outjson.setdefault(pos, line) @@ -735,6 +958,39 @@ class ManageCommand(): outjson.setdefault('last-line', pos - 1) return json.dumps(outjson) + def getstate(self): + """ Get state """ + return json.dumps(self.state) + + def getconfig(self): + outjson = { 'keep_state': str(self.keep_state), + 'bufsize': str(self.bufsize), + 'size_max_state': str(self.size_max_state), + 'path': str(self.path), + 'add_state': str(self.add_state_cmd), + 'del_state': str(self.del_state_cmd), + 'command': str(self.command), + 'maxWaitEnd': str(self.maxWaitEnd), + 'waitDelay': str(self.waitDelay), + 'maxlog': str(self.maxlog), + 'state': str(self.keep_state), + 'egs': str(self.egs_filter) } + return json.dumps(outjson) + + def getinfo(self): + outjson = { 'number_launch': str(self.number_start), + 'first_line': str(self.first_line), + 'last_line': str(self.last_line), + 'number_state': len(self.state), + 'player_connected': len(self.state_active_character) } + return json.dumps(outjson) + + def getplayer(self): + return json.dumps(self.state_active_character) + + def getadmincommand(self): + return json.dumps(self.state_admin) + def action(self, action): """ Send action to program (send input to stdin) """ logging.debug("STDIN '%s'" % action) @@ -749,28 +1005,32 @@ class ManageCommand(): def run(self): """ loop, run child (wait command) """ + statuscmd = {0:'started', 1:'stopped', 2:'crashed'} loop = True + if self.autostart: + savedstate = self.start() + else: + savedstate = 1 while loop: - logging.debug('wait %s' % self.name) - self.event.wait() - logging.debug('received event %s' % self.name) - try: - msg = self.queueIn.get(timeout=4) - except queue.Empty: - self.event.clear() - logging.debug("[%s] Queue empty (no message)" % self.name) - return + logging.debug('wait event %s' % self.name) + msg = self.queueIn.get() logging.debug("command : '%s'" % msg) command = msg.split()[0] if command == "SHUTDOWN": loop = False continue elif command == "START": - self.queueOut.put(self.start()) + #if savedstate != 0: + savedstate = self.start() + self.queueOut.put(statuscmd[savedstate]) elif command == "STATUS": - self.queueOut.put(self.status()) + currentstate = self.status() + if currentstate != 1 or savedstate != 2: + savedstate = currentstate + self.queueOut.put(statuscmd[savedstate]) elif command == "STOP": - self.queueOut.put(self.stop()) + savedstate = self.stop() + self.queueOut.put(statuscmd[savedstate]) elif command == "STDIN": data = msg.split(maxsplit=1)[1] self.queueOut.put(self.action(data)) @@ -780,13 +1040,55 @@ class ManageCommand(): except ValueError: logging.warning("Bad value for param first-line (need integer)") firstline = 0 + except IndexError: + firstline = 0 self.queueOut.put(self.getlog(firstline)) + elif command == "STATE": + self.queueOut.put(self.getstate()) + elif command == "CONFIG": + self.queueOut.put(self.getconfig()) + elif command == "INFO": + self.queueOut.put(self.getinfo()) + elif command == "PLAYER": + self.queueOut.put(self.getplayer()) + elif command == "ADMINCOMMAND": + self.queueOut.put(self.getadmincommand()) + elif command == "STOPPED": + currentstate = self.status() + logging.debug('Received event process stopped (current state:%d, saved state:%d)' % (currentstate, savedstate)) + if currentstate == 2 and savedstate != 1 and self.restart_after_crash: + logging.debug('Prepare restart') + self.stop() + savedstate = 2 + self.eventRunningRestart.clear() + #logging.warning("program (%s) is crashed" % self.name) + try: + self.threadRestart.terminate() + self.threadRestart.join() + except AttributeError: + pass + self.eventRunningRestart.set() + self.threadRestart = threading.Thread(target=self.restart) + self.threadRestart.start() else: logging.warning("Bad command (%s)" % command) self.queueOut.put("error : command unknown") - self.event.clear() + logging.debug('Stop %s' % self.name) self.stop() - self.event.clear() + logging.debug('prepare end') + self.eventRunningReader.clear() + if self.threadRead: + try: + self.threadRead.join() + except AttributeError: + pass + self.eventRunningRestart.clear() + if self.threadRestart: + try: + self.threadRestart.terminate() + self.threadRestart.join() + except AttributeError: + pass logging.debug('end') @@ -909,10 +1211,98 @@ class Manager(): raise ValueError else: bufsize = 100 + if 'keep_state' in config[name]: + try: + tmp = config[name]['keep_state'] + if tmp.upper().strip() == 'YES': + keep_state = True + else: + keep_state = False + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param keep_state (command:%s)", name) + raise ValueError + else: + keep_state = False + if 'size_max_state' in config[name]: + try: + size_max_state = int(config[name]['size_max_state']) + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param size_max_state (command:%s)", name) + raise ValueError + else: + size_max_state = 100 + if 'add_state' in config[name]: + try: + add_state = config[name]['add_state'] + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param add_state (command:%s)", name) + raise ValueError + else: + add_state = '' + if 'del_state' in config[name]: + try: + del_state = config[name]['del_state'] + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param del_state (command:%s)", name) + raise ValueError + else: + del_state = '' + if 'autostart' in config[name]: + try: + tmp = config[name]['autostart'] + if tmp.upper().strip() == 'YES': + autostart = True + else: + autostart = False + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param autostart (command:%s)", name) + raise ValueError + else: + autostart = False + if 'restart_after_crash' in config[name]: + try: + tmp = config[name]['restart_after_crash'] + if tmp.upper().strip() == 'YES': + restart_after_crash = True + else: + restart_after_crash = False + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param restart_after_crash (command:%s)", name) + raise ValueError + else: + restart_after_crash = False + if 'restart_delay' in config[name]: + try: + restart_delay = int(config[name]['restart_delay']) + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param restart_delay (command:%s)", name) + raise ValueError + else: + restart_delay = 10 + if 'egs_filter' in config[name]: + try: + tmp = config[name]['egs_filter'] + if tmp.upper().strip() == 'YES': + egs_filter = True + else: + egs_filter = False + except (TypeError, KeyError, ValueError): + logging.error("Impossible to read param autostart (command:%s)", name) + raise ValueError + else: + egs_filter = False self.param.setdefault(name, {'command': config[name]['command'], 'path': path, 'logsize': logsize, - 'bufsize': bufsize}) + 'bufsize': bufsize, + 'keep_state': keep_state, + 'size_max_state': size_max_state, + 'add_state': add_state, + 'del_state': del_state, + 'autostart': autostart, + 'restart_after_crash': restart_after_crash, + 'restart_delay': restart_delay, + 'egs_filter': egs_filter}) def initialize_http(self): """ @@ -927,7 +1317,9 @@ class Manager(): method=self.method, users=self.users) - def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): + def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore, + keep_state, size_max_state, add_state, del_state, + autostart, restart_after_crash, restart_delay, egs_filter): """ Thread to manage khaganat program """ @@ -939,7 +1331,15 @@ class Manager(): bufsize=bufsize, queueIn=queueIn, queueOut=queueOut, - event=event) + semaphore=semaphore, + keep_state=keep_state, + size_max_state=size_max_state, + add_state=add_state, + del_state=del_state, + autostart=autostart, + restart_after_crash=restart_after_crash, + restart_delay=restart_delay, + egs_filter=egs_filter) manageCommand.run() def launch_server_http(self): @@ -953,8 +1353,13 @@ class Manager(): logging.debug("Initialize '%s'" % name) queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() - self.serverHttp.append(name, queueIn, queueOut, event) + # semaphore = multiprocessing.Semaphore() + semaphore = multiprocessing.BoundedSemaphore() + self.serverHttp.append(name, queueIn, queueOut, semaphore) + if self.launch_program: + autostart = True + else: + autostart = self.param[name]['autostart'] threadCommand = multiprocessing.Process(target=self.runCommand, args=(name, self.param[name]['command'], @@ -963,29 +1368,33 @@ class Manager(): self.param[name]['bufsize'], queueIn, queueOut, - event)) + semaphore, + self.param[name]['keep_state'], + self.param[name]['size_max_state'], + self.param[name]['add_state'], + self.param[name]['del_state'], + autostart, + self.param[name]['restart_after_crash'], + self.param[name]['restart_delay'], + self.param[name]['egs_filter'])) threadCommand.start() self.threadCommand.append(threadCommand) - self.info.setdefault(name, {'queueIn': queueIn, 'queueOut': queueOut, - 'event': event, + 'semaphore': semaphore, 'threadCommand': threadCommand, 'command': self.param[name]['command'], 'path': self.param[name]['path'], 'logsize': self.param[name]['logsize'], - 'bufsize': self.param[name]['bufsize']}) - - if self.launch_program: - event.set() - queueIn.put("START") - try: - item = queueOut.get(timeout=4) - except queue.Empty: - item = "" - logging.debug("[%s] Queue empty (no message)" % name) - return - logging.info("%s => %s" % (name, item)) + 'bufsize': self.param[name]['bufsize'], + 'keep_state': self.param[name]['keep_state'], + 'size_max_state': self.param[name]['size_max_state'], + 'add_state': self.param[name]['add_state'], + 'del_state': self.param[name]['del_state'], + 'autostart': autostart, + 'restart_after_crash': self.param[name]['restart_after_crash'], + 'restart_delay': self.param[name]['restart_delay'], + 'egs_filter': self.param[name]['egs_filter']}) def receive_signal(self, signum, frame): """ Managed signal """ diff --git a/tests/simulate_program.py b/tests/simulate_program.py index 6e467d2..c1a6b3c 100755 --- a/tests/simulate_program.py +++ b/tests/simulate_program.py @@ -44,7 +44,8 @@ class SimulateProgram(): self.line += 1 print(self.line, message) - def main(self, noloop, timeout, refuse_kill): + def main(self, noloop, timeout, refuse_kill, pos): + self.print_output("Initializing") manageSignal = ManageSignal() if refuse_kill: manageSignal.activate() @@ -52,6 +53,34 @@ class SimulateProgram(): self.print_output("Initializing") self.print_output("Starting") self.print_output("Started") + self.print_output("pos: %d" % pos) + n = 0 + if pos > n: + self.print_output("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Kezxaa(Lirria)' from BS stream file 'characters/002/account_2_0_pdr.bin'") + n = n + 1 + if pos > n: + self.print_output("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Puskle(Lirria)' from BS stream file 'characters/002/account_2_1_pdr.bin'") + n = n + 1 + if pos > n: + self.print_output("alpha egs_ecinfo EGS-132 : setActiveCharForPlayer EGS-132 : set active char 1 for player 2") + n = n + 1 + if pos > n: + self.print_output("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:81) ") + n = n + 1 + if pos > n: + self.print_output("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:81) (Row 90501) added to mirror)") + n = n + 1 + if pos > n: + self.print_output("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:81)") + n = n + 1 + if pos > n: + self.print_output("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed") + n = n + 1 + if pos > n: + self.print_output("alpha egs_plinfo EGS-132 : Player with userId = 2 removed") + n = n + 1 + if pos > n: + self.print_output("alpha disconnectPlayer EGS-132 : player 2 is disconnected") while loop is True: try: msg = input() @@ -74,9 +103,11 @@ def main(args=sys.argv[1:]): default=10, help='timeout') parser.add_argument('--disable-kill', action='store_true', help='disable loop', default=False) + parser.add_argument('--message', type=int, + default=0, help='message') args = parser.parse_args() simulate = SimulateProgram() - simulate.main(args.no_loop, args.timeout, args.disable_kill) + simulate.main(args.no_loop, args.timeout, args.disable_kill, args.message) if __name__ == '__main__': main() diff --git a/tests/test_manager.py b/tests/test_manager.py index e1519e4..7c5590a 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -31,6 +31,7 @@ import logging import json from unittest.mock import patch from unittest.mock import MagicMock +import traceback try: import pymanager.manager as Manager @@ -47,11 +48,20 @@ except ImportError: # import pymanager.certificate as cert -def handler(signum, frame): - print("TimeOut !") - raise Exception("end of time") +def handlerCrash(signum, frame): + print("handlerCrash - TimeOut !") + for line in traceback.format_stack(): + print(line.strip()) + # Force Exit with all thread ! + os._exit(2) +def handlerRaise(signum, frame): + print("handlerRaise - TimeOut !") + for line in traceback.format_stack(): + print(line.strip()) + raise "Timeout" + class TestManager(unittest.TestCase): def setUp(self): self.openssl = '/usr/bin/openssl' @@ -68,9 +78,10 @@ class TestManager(unittest.TestCase): self.path = os.path.dirname(os.path.abspath(__file__)) self.program = os.path.join(self.path, 'simulate_program.py') self.badprogram = os.path.join(self.path, 'test.cfg') - signal.signal(signal.SIGALRM, handler) + signal.signal(signal.SIGALRM, handlerCrash) def test_load_config(self): + signal.alarm(10) config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'port', '8000') @@ -92,6 +103,10 @@ class TestManager(unittest.TestCase): config.set('command:test', 'command', '/bin/sleep 10') config.set('command:test', 'logsize', '10') config.set('command:test', 'bufsize', '10') + config.set('command:test', 'keep_state', 'yes') + config.set('command:test', 'size_max_state', '1000') + config.set('command:test', 'add_state', '"^(.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)"') + config.set('command:test', 'del_state', '"^(.*)(disconnectPlayer).+[\s]+(?P.*)[\s]+(is disconnected)"') config.add_section('config:user') config.set('config:user', 'usename', 'filter_all, filter_admin') try: @@ -100,8 +115,10 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error detected on load config') + signal.alarm(0) def test_load_config2(self): + signal.alarm(10) config = configparser.ConfigParser() config.add_section('config:server') config.set('config:server', 'authentification', 'no') @@ -114,8 +131,10 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error detected on load config') + signal.alarm(0) def test_load_config_bad_param_logsize(self): + signal.alarm(10) config = configparser.ConfigParser() config.add_section('command:test') config.set('command:test', 'command', '/bin/sleep 10') @@ -124,8 +143,10 @@ class TestManager(unittest.TestCase): manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) + signal.alarm(0) def test_load_config_bad_param_bufsize(self): + signal.alarm(10) config = configparser.ConfigParser() config.add_section('command:test') config.set('command:test', 'command', '/bin/sleep 10') @@ -134,8 +155,10 @@ class TestManager(unittest.TestCase): manager = Manager.Manager(False) manager._load_config(config) self.assertTrue(True) + signal.alarm(0) def test_load_config_empty(self): + signal.alarm(10) config = configparser.ConfigParser() config.add_section('config:server') config.add_section('config:client') @@ -154,8 +177,10 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error detected on load config') + signal.alarm(0) def test_load_config_file(self): + signal.alarm(10) cfgfile = tempfile.NamedTemporaryFile(suffix="config.cfg", mode='w+t') cfgfile.write('#\n[config:server]\nauthentification = No\n') cfgfile.flush() @@ -165,13 +190,17 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error detected on load configuration') + signal.alarm(0) def test_load_config_file_none(self): + signal.alarm(10) with self.assertRaises(ValueError): manager = Manager.Manager(False) manager.load_config(None) + signal.alarm(0) def test_load_password_file(self): + signal.alarm(10) pwdfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') config = configparser.ConfigParser() @@ -187,14 +216,16 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error detected on load password') + signal.alarm(0) def test_constructor_manager_command(self): + signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_constructor_manager_command', self.program, self.path, @@ -202,19 +233,153 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event) + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False) manageCommand.list_thread() self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') + signal.alarm(0) + + def test_managercommand_analyze_line(self): + manage = Manager.ManageCommand('test_execute_manager_command', + "", + "", + 1000, + 1000, + None, + None, + None, + False, + 1000, + "", + "", + False, + False, + 1, + True) + manage._analyze_line("message 1") + manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Kezxaa(Lirria)' from BS stream file 'characters/002/account_2_0_pdr.bin'") + if '2' not in manage.state_load_character: + self.assertTrue(False, "LOADED - Missing player 2") + if '0' not in manage.state_load_character['2']: + self.assertTrue(False, "LOADED - Missing charactere 0 for player 2") + manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Puskle(Lirria)' from BS stream file 'characters/002/account_2_1_pdr.bin'") + if '1' not in manage.state_load_character['2']: + self.assertTrue(False, "LOADED - Missing charactere 1 for player 2") + manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '3' Character 'Puskle(Lirria)' from BS stream file 'characters/003/account_3_4_pdr.bin'") + if '3' not in manage.state_load_character: + self.assertTrue(False, "LOADED - Missing player 2") + manage._analyze_line("alpha egs_ecinfo EGS-132 : setActiveCharForPlayer EGS-132 : set active char 1 for player 2") + if '2' not in manage.state_active_character: + self.assertTrue(False, "setActiveCharForPlayer - Missing player 2") + if 'NameDomain' not in manage.state_active_character['2']: + self.assertTrue(False, "setActiveCharForPlayer - Missing info player 2") + manage._analyze_line("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:81)") + if 'SID' not in manage.state_active_character['2']: + self.assertTrue(False, "setActiveCharForPlayer - Missing SID player 2") + manage._analyze_line("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:81) (Row 90501) added to mirror)") + manage._analyze_line("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:81)") + manage._analyze_line("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed") + manage._analyze_line("alpha egs_plinfo EGS-132 : Player with userId = 2 removed") + manage._analyze_line("alpha disconnectPlayer EGS-132 : player 2 is disconnected") + if '2' in manage.state_active_character: + self.assertTrue(False, "disconnectPlayer - player 2 always live") + + def test_manager_command_player(self): + signal.signal(signal.SIGALRM, handlerRaise) + signal.alarm(30) + class MockServerHttp: + def append(self, name, queueIn, queueOut, semaphore): + pass + def terminate(self): + pass + def join(self): + pass + manage = Manager.Manager(True) + try: + config = configparser.ConfigParser() + config.add_section('config:server') + config.add_section('command:test') + config.set('command:test', 'command', self.program + ' --message=1 --no-loop --timeout=1') + config.set('command:test', 'restart_after_crash', 'no') + config.set('command:test', 'restart_delay', '1000') + config.set('command:test', 'egs_filter', 'yes') + + manage.serverHttp = MockServerHttp() + manage._load_config(config) + manage.launch_command() + + key = list(manage.info.keys())[0] + queueIn = manage.info[key]['queueIn'] + queueOut = manage.info[key]['queueOut'] + semaphore = manage.info[key]['semaphore'] + + signal.alarm(30) + item = "started" + while item == "started": + print("sleep") + time.sleep(1) + print("semaphore") + semaphore.acquire() + print("status") + queueIn.put("STATUS") + print("queue") + item = queueOut.get(timeout=4) + print(item) + semaphore.release() + print("Lecture STDOUT") + print("sleep") + time.sleep(1) + signal.alarm(30) + print("semaphore") + semaphore.acquire() + print("stdout") + queueIn.put("STDOUT") + print("Attend le retour STDOUT") + item = queueOut.get() + semaphore.release() + print("Resultat STDOUT") + print(item) + time.sleep(1) + signal.alarm(10) + semaphore.acquire() + queueIn.put("SHUTDOWN") + semaphore.release() + with self.assertRaises(queue.Empty): + item = queueOut.get(timeout=4) + #threadCommand.join() + manage.receive_signal(15, 1) + manage.wait_children_commands() + #Disable timeout + signal.alarm(0) + self.assertTrue(True) + signal.alarm(0) + except: + print("Prepare Crash - TimeOut !") + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + print("Force Crash - TimeOut !") + os._exit(2) + signal.alarm(0) + signal.signal(signal.SIGALRM, handlerCrash) def test_execute_manager_command(self): + signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_manager_command', self.program, self.path, @@ -222,7 +387,15 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event) + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False) manageCommand.status() manageCommand.start() manageCommand.status() @@ -266,15 +439,16 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') + signal.alarm(0) def test_execute_crash_manager_command(self): + signal.alarm(10) try: - signal.alarm(30) logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_crash_manager_command', self.program + ' --no-loop --timeout 1', self.path, @@ -282,30 +456,40 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event) + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False) res = manageCommand.start() - self.assertEqual(res, 'started') + self.assertEqual(res, 0) wait = True while wait: time.sleep(1) res = manageCommand.status() - if res == 'stopped': + if res != 0: wait = False + self.assertEqual(res, 2) manageCommand.list_thread() manageCommand.stop() manageCommand.status() self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') + signal.alarm(0) def test_execute_not_kill_manager_command(self): + signal.alarm(30) try: - signal.alarm(30) logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_not_kill_manager_command', self.program + " --disable-kill", self.path, @@ -313,41 +497,51 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event, + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False, maxWaitEnd = 2) except: self.fail('Error initialize object ManageCommand') try: res = manageCommand.start() - self.assertEqual(res, 'started') + self.assertEqual(res, 0) wait = True while wait: time.sleep(1) res = manageCommand.status() - self.assertEqual(res, 'started') + self.assertEqual(res, 0) res = manageCommand.getlog(0) try: resjson = json.loads(res) if 'last-line' in res: - if resjson['last-line'] == 3: - wait = False + if resjson['last-line'] == 5: + wait = False except: pass manageCommand.list_thread() manageCommand.stop() res = manageCommand.status() - self.assertEqual(res, 'stopped') + self.assertEqual(res, 1) self.assertTrue(True) except Exception as e: self.fail('Error when run test (%s)' % str(e)) + signal.alarm(0) def test_execute_command_crashed(self): try: + signal.alarm(20) logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_command_crashed', self.program + " --no-loop --timeout=1", self.path, @@ -355,38 +549,48 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event, - waitDelay = 10) + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False, + waitDelay = 1) manageCommand.start() wait = True while wait: time.sleep(1) res = manageCommand.status() - if res == 'stopped': + if res == 2: wait = False res = manageCommand.status() - self.assertEqual(res, 'stopped') + self.assertEqual(res, 2) manageCommand.start() wait = True while wait: time.sleep(1) res = manageCommand.status() - if res == 'stopped': + if res == 2: wait = False res = manageCommand.status() - self.assertEqual(res, 'stopped') + self.assertEqual(res, 2) manageCommand.stop() self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') + signal.alarm(0) def test_execute_command_file_not_found(self): + signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_command_file_not_found', self.program + "_not_exist", self.path, @@ -394,20 +598,30 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event) + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False) ret = manageCommand.start() manageCommand.stop() - self.assertEqual(ret, "crashed", 'Error object not generate error when program not exist') + self.assertEqual(ret, 2, 'Error object not generate error when program not exist') except: self.fail('Error initialize object ManageCommand') + signal.alarm(0) def test_execute_command_permission(self): + signal.alarm(10) try: logsize = 10 bufsize = 10 queueIn = multiprocessing.Queue() queueOut = multiprocessing.Queue() - event = multiprocessing.Event() + semaphore = multiprocessing.Semaphore() manageCommand = Manager.ManageCommand('test_execute_command_permission', self.badprogram, self.path, @@ -415,17 +629,27 @@ class TestManager(unittest.TestCase): bufsize, queueIn, queueOut, - event) + semaphore, + False, + 1, + "", + "", + False, + False, + 1, + False) ret = manageCommand.start() manageCommand.stop() - self.assertEqual(ret, "crashed", 'Error object not generate error when bad permission') + self.assertEqual(ret, 2, 'Error object not generate error when bad permission') except: self.fail('Error initialize object ManageCommand') + signal.alarm(0) - def _runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): + def _runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore): """ Thread to manage khaganat program """ + signal.alarm(10) manageCommand = Manager.ManageCommand(name=name, command=command, path=path, @@ -433,18 +657,30 @@ class TestManager(unittest.TestCase): bufsize=bufsize, queueIn=queueIn, queueOut=queueOut, - event=event) + semaphore=semaphore, + keep_state=False, + size_max_state=1, + add_state="", + del_state="", + autostart=False, + restart_after_crash=False, + restart_delay=1, + egs_filter=False) manageCommand.run() + signal.alarm(0) def test_root_bad_loglevel(self): + signal.alarm(10) with self.assertRaises(ValueError): Manager.root(None, None, 'NOTEXIST', False, False) + signal.alarm(0) def test_root_bad_configfile(self): + signal.alarm(10) logfile = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') with self.assertRaises(ValueError): Manager.root(None, @@ -452,18 +688,22 @@ class TestManager(unittest.TestCase): 'DEBUG', True, True) + signal.alarm(0) def test_main(self): + signal.alarm(10) config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') config.write('[config:server]\nauthentification=no\n') config.flush() Manager.main(['--conf=' + config.name]) + signal.alarm(0) def test_run_manager_command(self): # Enable timeout + signal.signal(signal.SIGALRM, handlerCrash) signal.alarm(10) class MockServerHttp: - def append(self, name, queueIn, queueOut, event): + def append(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -475,80 +715,114 @@ class TestManager(unittest.TestCase): config.set('command:test', 'command', self.program) manage = Manager.Manager(False) - manage.serverHttp = MockServerHttp() - manage._load_config(config) - manage.launch_command() - for key in manage.info: - queueIn = manage.info[key]['queueIn'] - queueOut = manage.info[key]['queueOut'] - event = manage.info[key]['event'] + try: + manage.serverHttp = MockServerHttp() + manage._load_config(config) + manage.launch_command() + for key in manage.info: + queueIn = manage.info[key]['queueIn'] + queueOut = manage.info[key]['queueOut'] + semaphore = manage.info[key]['semaphore'] - queueIn.put("START") - event.set() - # Enable timeout - signal.alarm(10) - item = queueOut.get(timeout=4) - self.assertEqual(item, "started", 'Error impossible to start program') - signal.alarm(0) - time.sleep(1) - signal.alarm(10) - event.set() - queueIn.put("STATUS") - item = queueOut.get(timeout=4) - self.assertEqual(item, "started", 'Error impossible to read status') - time.sleep(1) - event.set() - queueIn.put("STDIN arg") - item = queueOut.get(timeout=4) - self.assertEqual(item, "ok", 'Error when send STDIN') - signal.alarm(0) - time.sleep(1) - signal.alarm(10) - event.set() - queueIn.put("STDOUT 4") - item = queueOut.get(timeout=4) - signal.alarm(0) - self.assertRegex(item, - '^[{](.*)("first-line": 4)(.*)[}]$', - 'Error when read STDOUT (Missing first-line)') - self.assertRegex(item, - '^[{](.*)("last-line": 4)(.*)[}]$', - 'Error when read STDOUT (Missing last-line)') - self.assertRegex(item, - '^[{](.*)(4 arg")(.*)[}]$', - 'Error when read STDOUT (bad record)') - time.sleep(1) - signal.alarm(10) - event.set() - queueIn.put("BADCOMMAND") - item = queueOut.get(timeout=4) - self.assertEqual(item, "error : command unknown", 'Error impossible to read status') - signal.alarm(0) - time.sleep(1) - signal.alarm(10) - event.set() - queueIn.put("STOP") - item = queueOut.get(timeout=4) - self.assertEqual(item, "stopped", 'Error impossible to read status') - signal.alarm(0) - time.sleep(1) - signal.alarm(10) - event.set() - queueIn.put("SHUTDOWN") - with self.assertRaises(queue.Empty): + logging.debug("[1] --------- send START") + signal.alarm(10) + semaphore.acquire() + queueIn.put("START") + # Enable timeout + signal.alarm(10) item = queueOut.get(timeout=4) - #threadCommand.join() - manage.receive_signal(15, 1) - manage.wait_children_commands() - #Disable timeout + semaphore.release() + self.assertEqual(item, "started", 'Error impossible to start program') + signal.alarm(0) + time.sleep(1) + logging.debug("[2] --------- send STATUS") + signal.alarm(10) + semaphore.acquire() + queueIn.put("STATUS") + item = queueOut.get() + semaphore.release() + self.assertEqual(item, "started", 'Error impossible to read status') + signal.alarm(0) + time.sleep(1) + logging.debug("[3] --------- send STDIN arg") + signal.alarm(10) + semaphore.acquire() + queueIn.put("STDIN arg") + item = queueOut.get() + semaphore.release() + self.assertEqual(item, "ok", 'Error when send STDIN') + signal.alarm(0) + time.sleep(1) + logging.debug("[4] --------- send STDOUT 4") + signal.alarm(10) + semaphore.acquire() + logging.debug("[4.1] --------- send STDOUT 4") + queueIn.put("STDOUT 4") + logging.debug("[4.2] --------- send STDOUT 4") + item = queueOut.get() + logging.debug("[4.3] --------- send STDOUT 4") + semaphore.release() + logging.debug("[4.4] --------- send STDOUT 4") + signal.alarm(0) + logging.debug("[4.5] --------- send STDOUT 4") + self.assertRegex(item, + '^[{](.*)("first-line": 4)(.*)[}]$', + 'Error when read STDOUT (Missing first-line)') + self.assertRegex(item, + '^[{](.*)("last-line": 6)(.*)[}]$', + 'Error when read STDOUT (Missing last-line)') + self.assertRegex(item, + '^[{](.*)(6 arg")(.*)[}]$', + 'Error when read STDOUT (bad record)') + time.sleep(1) + logging.debug("[5] --------- send BADCOMMAND") + signal.alarm(10) + semaphore.acquire() + queueIn.put("BADCOMMAND") + item = queueOut.get() + semaphore.release() + self.assertEqual(item, "error : command unknown", 'Error impossible to read status') + signal.alarm(0) + time.sleep(1) + logging.debug("[6] --------- send STOP") + signal.alarm(10) + semaphore.acquire() + queueIn.put("STOP") + item = queueOut.get() + semaphore.release() + self.assertEqual(item, "stopped", 'Error impossible to read status') + signal.alarm(0) + time.sleep(1) + logging.debug("[7] --------- send SHUTDOWN") + signal.alarm(10) + semaphore.acquire() + queueIn.put("SHUTDOWN") + with self.assertRaises(queue.Empty): + item = queueOut.get(timeout=5) + semaphore.release() + #threadCommand.join() + logging.debug("--------- Stop all") + manage.receive_signal(15, 1) + manage.wait_children_commands() + #Disable timeout + signal.alarm(0) + self.assertTrue(True) + except Exception as e: + logging.error("test_run_manager_command - Prepare Crash - %s" % e) + logging.error(traceback.format_exc()) + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + print("test_run_manager_command - Force Crash - TimeOut !") + os._exit(2) signal.alarm(0) - self.assertTrue(True) + signal.signal(signal.SIGALRM, handlerCrash) def test_run_manager_command_autostart(self): # Enable timeout signal.alarm(10) class MockServerHttp: - def append(self, name, queueIn, queueOut, event): + def append(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -567,25 +841,190 @@ class TestManager(unittest.TestCase): key = list(manage.info.keys())[0] queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] - event = manage.info[key]['event'] + semaphore = manage.info[key]['semaphore'] signal.alarm(10) - event.set() + semaphore.acquire() queueIn.put("STATUS") item = queueOut.get(timeout=4) + semaphore.release() self.assertEqual(item, "started", 'Error impossible to read status') time.sleep(1) signal.alarm(10) - event.set() + semaphore.acquire() queueIn.put("SHUTDOWN") with self.assertRaises(queue.Empty): item = queueOut.get(timeout=4) + semaphore.release() #threadCommand.join() manage.receive_signal(15, 1) manage.wait_children_commands() #Disable timeout signal.alarm(0) self.assertTrue(True) + signal.alarm(0) + + def test_run_manager_command_autostart_option(self): + # Enable timeout + signal.alarm(10) + class MockServerHttp: + def append(self, name, queueIn, queueOut, semaphore): + pass + def terminate(self): + pass + def join(self): + pass + config = configparser.ConfigParser() + config.add_section('config:server') + config.add_section('command:test') + config.set('command:test', 'command', self.program) + config.set('command:test', 'autostart', 'yes') + + manage = Manager.Manager(False) + manage.serverHttp = MockServerHttp() + manage._load_config(config) + manage.launch_command() + time.sleep(5) + + key = list(manage.info.keys())[0] + queueIn = manage.info[key]['queueIn'] + queueOut = manage.info[key]['queueOut'] + semaphore = manage.info[key]['semaphore'] + + signal.alarm(10) + semaphore.acquire() + queueIn.put("STATUS") + item = queueOut.get(timeout=4) + semaphore.release() + self.assertEqual(item, "started", 'program not started') + time.sleep(1) + signal.alarm(10) + semaphore.acquire() + queueIn.put("SHUTDOWN") + with self.assertRaises(queue.Empty): + item = queueOut.get(timeout=4) + semaphore.release() + #threadCommand.join() + manage.receive_signal(15, 1) + manage.wait_children_commands() + #Disable timeout + signal.alarm(0) + self.assertTrue(True) + signal.alarm(0) + + def test_run_manager_command_restart_after_crash(self): + # Enable timeout + signal.signal(signal.SIGALRM, handlerCrash) + signal.alarm(10) + logging.debug("--------- test_run_manager_command_restart_after_crash => Start") + class MockServerHttp: + def append(self, name, queueIn, queueOut, semaphore): + pass + def terminate(self): + pass + def join(self): + pass + config = configparser.ConfigParser() + config.add_section('config:server') + config.add_section('command:test') + config.set('command:test', 'command', self.program + ' --no-loop --timeout=5') + config.set('command:test', 'autostart', 'yes') + config.set('command:test', 'restart_after_crash', 'yes') + config.set('command:test', 'restart_delay', '5') + + manage = Manager.Manager(False) + + try: + manage.serverHttp = MockServerHttp() + manage._load_config(config) + manage.launch_command() + + key = list(manage.info.keys())[0] + queueIn = manage.info[key]['queueIn'] + queueOut = manage.info[key]['queueOut'] + semaphore = manage.info[key]['semaphore'] + + logging.debug("--------- wait not started") + signal.alarm(20) + item = "started" + while item == "started": + time.sleep(1) + semaphore.acquire() + queueIn.put("STATUS") + try: + item = queueOut.get(timeout=4) + except queue.Empty: + pass + semaphore.release() + self.assertEqual(item, "crashed", 'program not started') + signal.alarm(20) + + logging.debug("--------- wait not crashed") + # item = "crashed" + while item == "crashed": + time.sleep(1) + semaphore.acquire() + queueIn.put("STATUS") + try: + item = queueOut.get(timeout=4) + except queue.Empty: + pass + semaphore.release() + signal.alarm(20) + + self.assertEqual(item, "started", 'program not started') + + logging.debug("--------- wait not started") + signal.alarm(20) + item = "started" + while item == "started": + time.sleep(1) + semaphore.acquire() + queueIn.put("STATUS") + try: + item = queueOut.get(timeout=4) + except queue.Empty: + pass + semaphore.release() + self.assertEqual(item, "crashed", 'program not started') + signal.alarm(20) + + self.assertEqual(item, "crashed", 'program not started') + signal.alarm(20) + + logging.debug("--------- wait not crashed") + # item = "crashed" + while item == "crashed": + time.sleep(1) + semaphore.acquire() + queueIn.put("STATUS") + try: + item = queueOut.get(timeout=4) + except queue.Empty: + pass + semaphore.release() + signal.alarm(20) + + logging.debug("--------- SHUTDOWN") + time.sleep(1) + signal.alarm(10) + semaphore.acquire() + queueIn.put("SHUTDOWN") + semaphore.release() + #threadCommand.join() + manage.receive_signal(15, 1) + manage.wait_children_commands() + #Disable timeout + signal.alarm(0) + except: + print("Prepare Crash - TimeOut !") + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + print("Force Crash - TimeOut !") + os._exit(2) + self.assertTrue(True) + signal.alarm(0) class MockStreamRequestHandler(): def __init__(self): @@ -600,7 +1039,7 @@ class TestManager(unittest.TestCase): class MockServer(): def __init__(self): - self.listEvent = {} + self.listSemaphore = {} self.listQueueIn = {} self.listQueueOut = {} self.authentification = False @@ -612,6 +1051,7 @@ class TestManager(unittest.TestCase): manage = Manager.ManageHttpRequest(None, None, None) manage.address_string = MagicMock() manage.log_message('example') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_set_headers(self, init): @@ -627,6 +1067,7 @@ class TestManager(unittest.TestCase): manage._set_headers() manage.send_response.assert_called_with(200) # manage.send_header.assert_called_with('Content-type', 'application/json') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_1(self, init): @@ -642,6 +1083,7 @@ class TestManager(unittest.TestCase): manage.headers = {} manage._command_log() manage.send_error.assert_called_with(400, "bad content-type") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_2(self, init): @@ -657,6 +1099,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, "bad content-length") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_3(self, init): @@ -673,6 +1116,7 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage._command_log() manage.send_error.assert_called_with(400, 'Missing param name') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_4(self, init): @@ -691,6 +1135,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, 'Name unknown') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_5(self, init): @@ -710,6 +1155,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, 'Missing param first-line') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_6(self, init): @@ -729,6 +1175,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() manage.send_error.assert_called_with(400, 'Impossible to read first-line') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_7(self, init): @@ -744,7 +1191,7 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} #manage.server.listQueueOut['test'].put("empty") @@ -752,6 +1199,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() #self.assertEqual(b'empty',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_command_log_8(self, init): @@ -767,7 +1215,7 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.server.listQueueOut['test'].put("empty") @@ -775,6 +1223,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() self.assertEqual(b'empty',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_list(self, init): @@ -790,11 +1239,12 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage._send_list() self.assertEqual(b'{"0": "test"}',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_shutdown(self, init): @@ -810,11 +1260,12 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage._send_shutdown() self.assertEqual(b'{"shutdown": "ok"}',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_all(self, init): @@ -830,12 +1281,13 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.server.listQueueOut['test'].put("empty") manage._send_command_all("test") self.assertEqual(b'{"test": "empty"}',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_all_timeout(self, init): @@ -851,10 +1303,11 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage._send_command_all("test") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_1(self, init): @@ -870,12 +1323,13 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() manage.send_error.assert_called_with(400, 'Missing param name') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_2(self, init): @@ -891,13 +1345,14 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.rfile.define_return( '{"name": "testnew"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() manage.send_error.assert_called_with(400, 'Name unknown') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_3(self, init): @@ -913,13 +1368,14 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.rfile.define_return( '{"name": "test"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() manage.send_error.assert_called_with(400, 'Missing param action') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_4(self, init): @@ -935,13 +1391,14 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.rfile.define_return( '{"name": "test", "action": "example"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() self.assertEqual(None,manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_5(self, init): @@ -957,7 +1414,7 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.server.listQueueOut['test'].put("empty") @@ -965,6 +1422,7 @@ class TestManager(unittest.TestCase): manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() self.assertEqual(b'{"state": "empty"}',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_action_6(self, init): @@ -979,6 +1437,7 @@ class TestManager(unittest.TestCase): manage.headers = {} manage._send_action() + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_1(self, init): @@ -993,6 +1452,7 @@ class TestManager(unittest.TestCase): manage.headers = {} manage._send_command("STATUS") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_2(self, init): @@ -1008,12 +1468,13 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_command("STATUS") manage.send_error.assert_called_with(400, 'Missing param name') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_3(self, init): @@ -1029,7 +1490,7 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} @@ -1037,6 +1498,7 @@ class TestManager(unittest.TestCase): manage.rfile.define_return( '{"name": "test"}' ) manage._send_command("STATUS") self.assertEqual(b'{"state": "empty"}',manage.wfile.message) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_4(self, init): @@ -1052,13 +1514,14 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage.rfile.define_return( '{"name": "test"}' ) manage._send_command("STATUS") manage.send_error.assert_called_with(500, 'Missing return') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_send_command_5(self, init): @@ -1074,7 +1537,7 @@ class TestManager(unittest.TestCase): manage.rfile = TestManager.MockStreamRequestHandler() manage.wfile = TestManager.MockStreamRequestHandler() manage.server = TestManager.MockServer() - manage.server.listEvent = {'test': multiprocessing.Event() } + manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} @@ -1082,6 +1545,7 @@ class TestManager(unittest.TestCase): manage.rfile.define_return( '{"name": "testnew"}' ) manage._send_command("STATUS") manage.send_error.assert_called_with(400, 'Name unknown') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_1(self, init): @@ -1093,6 +1557,7 @@ class TestManager(unittest.TestCase): manage.path = "/STDOUT" manage.do_GET() manage._command_log.assert_called_with() + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_1(self, init): @@ -1102,6 +1567,7 @@ class TestManager(unittest.TestCase): manage.server = TestManager.MockServer() res = manage.check_authentication() self.assertEqual(True, res) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_2(self, init): @@ -1112,6 +1578,7 @@ class TestManager(unittest.TestCase): manage.server.authentification = True res = manage.check_authentication() self.assertEqual(False, res) + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_3(self, init): @@ -1138,6 +1605,7 @@ class TestManager(unittest.TestCase): res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Error detected list index out of range") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_5(self, init): @@ -1151,6 +1619,7 @@ class TestManager(unittest.TestCase): res = manage.check_authentication() self.assertEqual(False, res) logging.error.assert_called_with("Error detected Incorrect padding") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_check_authentication_6(self, init): @@ -1210,6 +1679,7 @@ class TestManager(unittest.TestCase): manage.path = "/STATUS" manage.do_GET() manage._send_command.assert_called_with("STATUS") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_3(self, init): @@ -1221,6 +1691,7 @@ class TestManager(unittest.TestCase): manage.path = "/LIST" manage.do_GET() manage._send_list.assert_called_with() + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_4(self, init): @@ -1232,6 +1703,7 @@ class TestManager(unittest.TestCase): manage.path = "/STATUSALL" manage.do_GET() manage._send_command_all.assert_called_with("STATUS") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_5(self, init): @@ -1243,6 +1715,7 @@ class TestManager(unittest.TestCase): manage.path = "/BADPATH" manage.do_GET() manage.send_error.assert_called_with(400, 'Path unknown') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_GET_6(self, init): @@ -1255,6 +1728,7 @@ class TestManager(unittest.TestCase): manage.path = "/BADPATH" manage.do_GET() manage.send_error.assert_called_with(403, 'Wrong authentication') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_1(self, init): @@ -1266,6 +1740,7 @@ class TestManager(unittest.TestCase): manage.path = "/START" manage.do_POST() manage._send_command.assert_called_with("START") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_2(self, init): @@ -1277,6 +1752,7 @@ class TestManager(unittest.TestCase): manage.path = "/STOP" manage.do_POST() manage._send_command.assert_called_with("STOP") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_3(self, init): @@ -1288,6 +1764,7 @@ class TestManager(unittest.TestCase): manage.path = "/STDIN" manage.do_POST() manage._send_action.assert_called_with() + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_4(self, init): @@ -1299,6 +1776,7 @@ class TestManager(unittest.TestCase): manage.path = "/SHUTDOWN" manage.do_POST() manage._send_shutdown.assert_called_with() + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_5(self, init): @@ -1310,6 +1788,7 @@ class TestManager(unittest.TestCase): manage.path = "/STARTALL" manage.do_POST() manage._send_command_all.assert_called_with("START") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_6(self, init): @@ -1321,6 +1800,7 @@ class TestManager(unittest.TestCase): manage.path = "/STOPALL" manage.do_POST() manage._send_command_all.assert_called_with("STOP") + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_7(self, init): @@ -1340,6 +1820,7 @@ class TestManager(unittest.TestCase): manage.path = "/BADPATH" manage.do_POST() manage.send_error.assert_called_with(400, 'Path unknown') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_POST_8(self, init): @@ -1352,6 +1833,7 @@ class TestManager(unittest.TestCase): manage.path = "/BADPATH" manage.do_POST() manage.send_error.assert_called_with(403, 'Wrong authentication') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_HEAD(self, init): @@ -1366,6 +1848,7 @@ class TestManager(unittest.TestCase): manage.path = "test" manage.do_HEAD() manage.send_error.assert_called_with(404, 'File Not Found: test') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_PUT(self, init): @@ -1380,6 +1863,7 @@ class TestManager(unittest.TestCase): manage.path = "test" manage.do_PUT() manage.send_error.assert_called_with(404, 'File Not Found: test') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_PATCH(self, init): @@ -1394,6 +1878,7 @@ class TestManager(unittest.TestCase): manage.path = "test" manage.do_PATCH() manage.send_error.assert_called_with(404, 'File Not Found: test') + signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') def test_run_manage_do_DELETE(self, init): @@ -1408,6 +1893,12 @@ class TestManager(unittest.TestCase): manage.path = "test" manage.do_DELETE() manage.send_error.assert_called_with(404, 'File Not Found: test') + signal.alarm(0) if __name__ == '__main__': + logging.getLogger('logging') + handlers = [] + handlers.append(logging.StreamHandler()) + logging.basicConfig(handlers=handlers, level=0, + format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s') unittest.main()