From ef3cc5347ddc20f99601522df7de1a3b640f8a7d Mon Sep 17 00:00:00 2001 From: AleaJactaEst Date: Tue, 9 Oct 2018 12:21:50 +0200 Subject: [PATCH] update feature #1 --- pymanager/__init__.py | 2 +- pymanager/manager.py | 362 +++++++++++++++++++++----------------- tests/simulate_program.py | 6 + tests/test_manager.py | 263 +++++++++++++++------------ 4 files changed, 357 insertions(+), 276 deletions(-) diff --git a/pymanager/__init__.py b/pymanager/__init__.py index 474073b..24f1f38 100644 --- a/pymanager/__init__.py +++ b/pymanager/__init__.py @@ -17,4 +17,4 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -__version__ = '1.0.0' +__version__ = '1.1.0' diff --git a/pymanager/manager.py b/pymanager/manager.py index ba432e5..e522f01 100755 --- a/pymanager/manager.py +++ b/pymanager/manager.py @@ -70,12 +70,12 @@ This script need configuration file (see below for model):: bufsize = 100 # It's possible to collect some message on output (example player conected) with regex command # keep some data on array/dict state - keep_state = yes + activate_filter = yes # size array/dict state - size_max_state = 1000 + size_max_filter = 1000 # search regex to add state (python regex) - add_state = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" - del_state = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" + add_filter = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" + del_filter = "^((.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[\s]+(?P.*)[\s]+(is disconnected))" # autostart (when start OpenNelManager, launch this program) autostart = no # restart after crash @@ -111,37 +111,37 @@ Design http(s) command : ----------------- -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **Html command** | **Path** | **Argument** {json format} | **Comment** | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **POST** | /SHUTDOWN | | Stop all process and stop pymanager | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **POST** | /STARTALL | | Start all processes | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /STATUSALL | | Get status all processes | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **POST** | /STOPALL | | Stop all processes | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **POST** | /START | {'name': program} | Start for one program | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /STATUS | {'name': program} | Get status for one program | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **POST** | /STOP | {'name': program} | Stop for one program | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /GETSTATE | {'name': program } | Get all state (key find in stdout add/remove) | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /CONFIG | {'name': program } | Get configuration | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /INFO | {'name': program } | Get Information (number player, ...) | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /PLAYER | {'name': program } | Get active player | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ -| **GET** | /ADMINCOMMAND | {'name': program } | Get admin commmand | -+------------------+------------------+---------------------------------------------+-----------------------------------------------+ ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **Html command** | **Path** | **Argument** {json format} | **Comment** | **Return*** | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **POST** | /SHUTDOWN | | Stop all process and stop pymanager | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **POST** | /STARTALL | | Start all processes | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /STATUSALL | | Get status all processes | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **POST** | /STOPALL | | Stop all processes | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **POST** | /START | {'name': program} | Start for one program | {'state': ''} | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /STATUS | {'name': program} | Get status for one program | {'state': ''} | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **POST** | /STOP | {'name': program} | Stop for one program | {'state': ''} | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /FILTER | {'name': program } | Get all state (key find in stdout add/remove) | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /CONFIG | {'name': program } | Get configuration | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /INFO | {'name': program } | Get Information (number player, ...) | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /PLAYER | {'name': program } | Get active player | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ +| **GET** | /ADMINCOMMAND | {'name': program } | Get admin commmand | | ++------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+ Example :: @@ -275,7 +275,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): return self.server.listSemaphore[name].release() self._set_headers() - self.wfile.write(bytes(item, "utf-8")) + self.wfile.write(bytes(json.dumps(item), "utf-8")) logging.debug("item : %s" % item) def _send_list(self): @@ -343,15 +343,21 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.debug("%s:%s" % (name, action)) self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put("STDIN %s" % action) - logging.debug("message envoye: %s" % (name)) + logging.debug("message sent: %s" % (name)) try: - result = self.server.listQueueOut[name].get(timeout=4) + outjson = self.server.listQueueOut[name].get(timeout=4) except queue.Empty: - logging.debug("pas de message recu pour %s" % name) + logging.debug("no return %s" % name) + self.server.listSemaphore[name].release() + return + except Exception as e: + # Trap all error (release semaphore) + self.send_error(500, 'Internal Error') + logging.error("unknown error %s (%s)" % (name, e)) + self.server.listSemaphore[name].release() return self.server.listSemaphore[name].release() - outjson = {'state': result} self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -377,15 +383,20 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): self.server.listSemaphore[name].acquire() self.server.listQueueIn[name].put(command) try: - result = self.server.listQueueOut[name].get(timeout=4) + outjson = self.server.listQueueOut[name].get(timeout=4) except queue.Empty: self.send_error(500, 'Missing return') logging.debug("[%s %s] Missing return" % (command, name)) + self.server.listSemaphore[name].release() + return + except Exception as e: + # Trap all error (release semaphore) + self.send_error(500, 'Internal Error') + logging.error("unknown error %s (%s)" % (name, e)) + self.server.listSemaphore[name].release() return self.server.listSemaphore[name].release() - logging.debug("[%s %s] => %s" % (command, name, result)) - - outjson = {'state': result} + logging.debug("[%s %s] => %s" % (command, name, outjson)) self._set_headers() self.wfile.write(bytes(json.dumps(outjson), "utf-8")) @@ -426,8 +437,8 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): logging.error("Wrong authentication") elif self.path == '/STDOUT': self._command_log() - elif self.path == "/STATE": - self._send_command("STATE") + elif self.path == "/FILTER": + self._send_command("FILTER") elif self.path == "/INFO": self._send_command("INFO") elif self.path == "/PLAYER": @@ -571,8 +582,9 @@ class ServerHttp(multiprocessing.Process): logging.error("Bad value 'method' (%s)" % str(self.method)) raise ValueError httpd.serve_forever() + logging.info("End") - def append(self, name, queueIn, queueOut, semaphore): + def appendchild(self, name, queueIn, queueOut, semaphore): self.listQueueIn.setdefault(name, queueIn) self.listQueueOut.setdefault(name, queueOut) self.listSemaphore.setdefault(name, semaphore) @@ -588,7 +600,7 @@ class ManageCommand(): def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore, - keep_state, size_max_state, add_state, del_state, + activate_filter, size_max_filter, add_filter, del_filter, autostart, restart_after_crash, restart_delay, egs_filter, maxWaitEnd=10, waitDelay=1): @@ -605,36 +617,38 @@ class ManageCommand(): self.bufsize = bufsize self.threadRead = None self.running = False - self.state = multiprocessing.Queue() + # self.state = multiprocessing.Queue() self.pipeIn, self.pipeOut = multiprocessing.Pipe() self.eventRunningReader = threading.Event() self.eventRunningRestart = threading.Event() self.maxWaitEnd = maxWaitEnd self.waitDelay = waitDelay - self.keep_state = keep_state - self.size_max_state = size_max_state - self.add_state_cmd = add_state[1:-1] - self.del_state_cmd = del_state[1:-1] - self.filter_add_state = re.compile(self.add_state_cmd) - self.filter_del_state = re.compile(self.del_state_cmd) - self.state = {} + self.activate_filter = activate_filter + self.size_max_filter = size_max_filter + self.add_filter_cmd = add_filter[1:-1] + self.del_filter_cmd = del_filter[1:-1] + self.filter_add_filter = re.compile(self.add_filter_cmd) + self.filter_del_filter = re.compile(self.del_filter_cmd) + self.filter = {} self.autostart = autostart self.restart_after_crash = restart_after_crash self.restart_delay = restart_delay self.threadRestart = None self.egs_filter = egs_filter - self.egs_filter_load_character = re.compile(".*(LOADED User )'(?P[\d]+)' Character '(?P[^']+)' from BS stream file 'characters/([\d]+)/account_(?P[\d]+)_(?P[\d]+)_pdr.bin") + self.egs_filter_load_character = re.compile(".*(egs_plinfo).*(: LOADED User )'(?P[\d]+)' Character '(?P[^']+)' from BS stream file 'characters/([\d]+)/account_(?P[\d]+)_(?P[\d]+)_pdr.bin") self.egs_filter_active_character = re.compile(".*(setActiveCharForPlayer).*(: set active char )(?P[\d]+)( for player )(?P[\d]+)") self.egs_filter_sid = re.compile(".*(Mapping UID )(?P[\d]+)( => Sid )\((?P.*)\)") self.egs_filter_client_ready = re.compile(".*(Updating IS_NEWBIE flag for character: )\((?P.*)\)") self.egs_filter_disconnected = re.compile(".*(disconnectPlayer).+[\s]+(player )(?P[\d]+)[\s]+(is disconnected)") - self.egs_filter_admin = re.compile("(.*)(cbClientAdmin EGS-136 : )(ADMIN)(: Player )\((?P.*)\)(?P.+)") - self.state_load_character = {} - self.state_active_character = {} - self.state_admin = [] + self.egs_filter_admin = re.compile("(.*)(cbClientAdmin).*(: ADMIN)(: Player )\((?P.*)\)(?P.+)") + # cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:86) tried to execute a no valid client admin command 'info' + self.filter_load_character = {} + self.filter_active_character = {} + self.filter_admin = {} self.number_start = 0 self.first_line = 0 self.last_line = 0 + self.pos_admin = 0 def _analyze_line(self, msg): now = time.strftime('%Y/%m/%d %H:%M:%S %Z') @@ -646,40 +660,40 @@ class ManageCommand(): self.last_line = self.last_line + 1 # If option sate is defined, analyze message and keep state (example , all player connected) logging.debug("recu: '%s'" % (msg)) - if self.keep_state: - res = self.filter_add_state.match(msg) + if self.activate_filter: + res = self.filter_add_filter.match(msg) if res: - logging.debug("add_state found") - if len(self.state) < self.size_max_state: - logging.debug("include add_state found") + logging.debug("add_filter found") + if len(self.filter) < self.size_max_filter: + logging.debug("include add_filter found") dico = res.groupdict() for key in dico: - logging.debug("set add_state found [%s]" % (str(key))) + logging.debug("set add_filter found [%s]" % (str(key))) if dico[key]: - logging.debug("set1 add_state found [%s][%s]" % (str(key), str(dico[key]))) - self.state.setdefault(key, {}) - self.state[key].setdefault(dico[key], now) - res = self.filter_del_state.match(msg) + logging.debug("set1 add_filter found [%s][%s]" % (str(key), str(dico[key]))) + self.filter.setdefault(key, {}) + self.filter[key].setdefault(dico[key], now) + res = self.filter_del_filter.match(msg) if res: - logging.debug("del_state found") + logging.debug("del_filter found") dico = res.groupdict() for key in dico: - logging.debug("prepare del_state found %s" % str(key)) + logging.debug("prepare del_filter found %s" % str(key)) if dico[key]: - self.state.setdefault(key, {}) - if dico[key] in self.state[key]: - logging.debug("del1 del_state found [%s][%s][%s]" % (str(key), str(dico[key]), str(self.state[key]))) - del self.state[key][dico[key]] + self.filter.setdefault(key, {}) + if dico[key] in self.filter[key]: + logging.debug("del1 del_filter found [%s][%s][%s]" % (str(key), str(dico[key]), str(self.filter[key]))) + del self.filter[key][dico[key]] if self.egs_filter: res = self.egs_filter_load_character.match(msg) if res: logging.debug("egs_filter_load_character found") - if len(self.state_load_character) < self.size_max_state: - logging.debug("include add_state found") + if len(self.filter_load_character) < self.size_max_filter: + logging.debug("include add_filter found") dico = res.groupdict() try: - self.state_load_character.setdefault(dico['UID'], {}) - self.state_load_character[dico['UID']].setdefault(dico['IDCHAR'], {'NameDomain': dico['NameDomain'], 'UIDBIS': dico['UIDBIS'], 'when': now}) + self.filter_load_character.setdefault(dico['UID'], {}) + self.filter_load_character[dico['UID']].setdefault(dico['IDCHAR'], {'NameDomain': dico['NameDomain'], 'UID': dico['UIDBIS'], 'when': now}) except KeyError as e: logging.error('Missing key when read "load_character" (%s)' % e) else: @@ -688,12 +702,12 @@ class ManageCommand(): res = self.egs_filter_active_character.match(msg) if res: logging.debug("egs_filter_active_character found") - if len(self.state_active_character) < self.size_max_state: + if len(self.filter_active_character) < self.size_max_filter: dico = res.groupdict() try: - self.state_active_character.setdefault(dico['UID'], {}) - self.state_active_character[dico['UID']] = self.state_load_character[dico['UID']][dico['IDCHAR']] - del self.state_load_character[dico['UID']] + self.filter_active_character.setdefault(dico['UID'], {}) + self.filter_active_character[dico['UID']] = self.filter_load_character[dico['UID']][dico['IDCHAR']] + del self.filter_load_character[dico['UID']] except KeyError as e: logging.error('Missing key when read "active_character" (%s)' % e) else: @@ -704,8 +718,8 @@ class ManageCommand(): logging.debug("egs_filter_sid found") dico = res.groupdict() try: - if dico['UID'] in self.state_active_character: - self.state_active_character[dico['UID']].setdefault("SID", dico['SID']) + if dico['UID'] in self.filter_active_character: + self.filter_active_character[dico['UID']].setdefault("SID", dico['SID']) else: logging.error('Impossible to add SID on player %s (Player not found)' % dico['UID']) except KeyError as e: @@ -716,28 +730,30 @@ class ManageCommand(): logging.debug("egs_filter_sid found") dico = res.groupdict() try: - if dico['UID'] in self.state_active_character: - del self.state_active_character[dico['UID']] + if dico['UID'] in self.filter_active_character: + del self.filter_active_character[dico['UID']] else: logging.error('Impossible to remove player %s (Player not found)' % dico['UID']) except KeyError as e: logging.error('Missing key when remove player (%s)' % e) return - res =self.egs_filter_admin.match(msg) + res = self.egs_filter_admin.match(msg) if res: logging.debug("egs_filter_admin found") - while len(self.state_admin) >= self.maxlog: - self.state_admin.pop(0) + while len(self.filter_admin) >= self.maxlog: + print(self.pos_admin, self.pos_admin - self.maxlog ) + del self.filter_admin[self.pos_admin - self.maxlog] try: + dico = res.groupdict() username = '' try: - for key in self.state_active_character: - if self.state_active_character[key]['SID'] == dico['SID']: - username = self.state_active_character[key]['NameDomain'] + for key in self.filter_active_character: + if self.filter_active_character[key]['SID'] == dico['SID']: + username = self.filter_active_character[key]['NameDomain'] break except KeyError: pass - self.state_admin.append( {'pos': self.pos_admin, 'when': now, 'SID': dico['SID'], 'ACTION': dico['ACTION'], 'USER': username}) + self.filter_admin.setdefault( self.pos_admin, {'when': now, 'SID': dico['SID'], 'ACTION': dico['ACTION'], 'USER': username}) except KeyError as e: logging.error('Missing key when admin player (%s)' % e) self.pos_admin = self.pos_admin + 1 @@ -789,9 +805,9 @@ class ManageCommand(): if wait_semaphore == True: self.queueIn.put("STOPPED") self.semaphore.release() - if self.keep_state: - self.state_load_character = {} - self.state_active_character = {} + if self.activate_filter: + self.filter_load_character = {} + self.filter_active_character = {} logging.debug("End reader: '%s'" % self.name) def restart(self): @@ -812,14 +828,11 @@ class ManageCommand(): self.semaphore.release() logging.debug('Prepare restart service %s (step 3)' % (self.name)) - def handler(self, signum, frame): + def receive_signal(self, signum, frame): """ Managed signal (not used) """ - if self.process: - # logging.debug("Send signal %d to '%s'" %(signum, self.name)) - self.process.send_signal(signum) - else: - logging.error("Impossible to send signal %d to '%s'" % (signum, self.name)) - raise IOError("signal received") + logging.info("Received signal %s (%d)" % (self.name, signum)) + self.queueIn.put("SHUTDOWN") + self.queueIn.put("SHUTDOWN") def start(self): """ Start program """ @@ -956,40 +969,40 @@ class ManageCommand(): pos += 1 outjson.setdefault('first-line', firstlinefound) outjson.setdefault('last-line', pos - 1) - return json.dumps(outjson) + return outjson - def getstate(self): - """ Get state """ - return json.dumps(self.state) + def getfilter(self): + """ Get filter """ + return self.filter def getconfig(self): - outjson = { 'keep_state': str(self.keep_state), + outjson = { 'activate_filter': str(self.activate_filter), 'bufsize': str(self.bufsize), - 'size_max_state': str(self.size_max_state), + 'size_max_filter': str(self.size_max_filter), 'path': str(self.path), - 'add_state': str(self.add_state_cmd), - 'del_state': str(self.del_state_cmd), + 'add_filter': str(self.add_filter_cmd), + 'del_filter': str(self.del_filter_cmd), 'command': str(self.command), 'maxWaitEnd': str(self.maxWaitEnd), 'waitDelay': str(self.waitDelay), 'maxlog': str(self.maxlog), - 'state': str(self.keep_state), + 'filter': str(self.activate_filter), 'egs': str(self.egs_filter) } - return json.dumps(outjson) + return outjson def getinfo(self): outjson = { 'number_launch': str(self.number_start), 'first_line': str(self.first_line), 'last_line': str(self.last_line), - 'number_state': len(self.state), - 'player_connected': len(self.state_active_character) } - return json.dumps(outjson) + 'number_filter': len(self.filter), + 'player_connected': len(self.filter_active_character) } + return outjson def getplayer(self): - return json.dumps(self.state_active_character) + return self.filter_active_character def getadmincommand(self): - return json.dumps(self.state_admin) + return self.filter_admin def action(self, action): """ Send action to program (send input to stdin) """ @@ -1005,6 +1018,8 @@ class ManageCommand(): def run(self): """ loop, run child (wait command) """ + signal.signal(signal.SIGABRT, self.receive_signal) + signal.signal(signal.SIGTERM, self.receive_signal) statuscmd = {0:'started', 1:'stopped', 2:'crashed'} loop = True if self.autostart: @@ -1022,18 +1037,22 @@ class ManageCommand(): elif command == "START": #if savedstate != 0: savedstate = self.start() - self.queueOut.put(statuscmd[savedstate]) + self.queueOut.put({'state': statuscmd[savedstate]}) elif command == "STATUS": currentstate = self.status() if currentstate != 1 or savedstate != 2: savedstate = currentstate - self.queueOut.put(statuscmd[savedstate]) + self.queueOut.put({'state': statuscmd[savedstate], + 'last_line': str(self.last_line), + 'number_launch': str(self.number_start), + 'filter': str(self.activate_filter), + 'egs': str(self.egs_filter)}) elif command == "STOP": savedstate = self.stop() - self.queueOut.put(statuscmd[savedstate]) + self.queueOut.put({'state': statuscmd[savedstate]}) elif command == "STDIN": data = msg.split(maxsplit=1)[1] - self.queueOut.put(self.action(data)) + self.queueOut.put({'state': self.action(data)}) elif command == "STDOUT": try: firstline = int(msg.split(maxsplit=1)[1]) @@ -1043,8 +1062,8 @@ class ManageCommand(): except IndexError: firstline = 0 self.queueOut.put(self.getlog(firstline)) - elif command == "STATE": - self.queueOut.put(self.getstate()) + elif command == "FILTER": + self.queueOut.put(self.getfilter()) elif command == "CONFIG": self.queueOut.put(self.getconfig()) elif command == "INFO": @@ -1072,7 +1091,7 @@ class ManageCommand(): self.threadRestart.start() else: logging.warning("Bad command (%s)" % command) - self.queueOut.put("error : command unknown") + self.queueOut.put( {"error" : "command unknown"} ) logging.debug('Stop %s' % self.name) self.stop() logging.debug('prepare end') @@ -1211,42 +1230,42 @@ class Manager(): raise ValueError else: bufsize = 100 - if 'keep_state' in config[name]: + if 'activate_filter' in config[name]: try: - tmp = config[name]['keep_state'] + tmp = config[name]['activate_filter'] if tmp.upper().strip() == 'YES': - keep_state = True + activate_filter = True else: - keep_state = False + activate_filter = False except (TypeError, KeyError, ValueError): - logging.error("Impossible to read param keep_state (command:%s)", name) + logging.error("Impossible to read param activate_filter (command:%s)", name) raise ValueError else: - keep_state = False - if 'size_max_state' in config[name]: + activate_filter = False + if 'size_max_filter' in config[name]: try: - size_max_state = int(config[name]['size_max_state']) + size_max_filter = int(config[name]['size_max_filter']) except (TypeError, KeyError, ValueError): - logging.error("Impossible to read param size_max_state (command:%s)", name) + logging.error("Impossible to read param size_max_filter (command:%s)", name) raise ValueError else: - size_max_state = 100 - if 'add_state' in config[name]: + size_max_filter = 100 + if 'add_filter' in config[name]: try: - add_state = config[name]['add_state'] + add_filter = config[name]['add_filter'] except (TypeError, KeyError, ValueError): - logging.error("Impossible to read param add_state (command:%s)", name) + logging.error("Impossible to read param add_filter (command:%s)", name) raise ValueError else: - add_state = '' - if 'del_state' in config[name]: + add_filter = '' + if 'del_filter' in config[name]: try: - del_state = config[name]['del_state'] + del_filter = config[name]['del_filter'] except (TypeError, KeyError, ValueError): - logging.error("Impossible to read param del_state (command:%s)", name) + logging.error("Impossible to read param del_filter (command:%s)", name) raise ValueError else: - del_state = '' + del_filter = '' if 'autostart' in config[name]: try: tmp = config[name]['autostart'] @@ -1295,10 +1314,10 @@ class Manager(): 'path': path, 'logsize': logsize, 'bufsize': bufsize, - 'keep_state': keep_state, - 'size_max_state': size_max_state, - 'add_state': add_state, - 'del_state': del_state, + 'activate_filter': activate_filter, + 'size_max_filter': size_max_filter, + 'add_filter': add_filter, + 'del_filter': del_filter, 'autostart': autostart, 'restart_after_crash': restart_after_crash, 'restart_delay': restart_delay, @@ -1308,6 +1327,7 @@ class Manager(): """ Initialize object serverHttp """ + logging.debug("Initialize server http(s)") self.serverHttp = ServerHttp(self.keyfile, self.certfile, self.ca_cert, @@ -1318,7 +1338,7 @@ class Manager(): users=self.users) def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore, - keep_state, size_max_state, add_state, del_state, + activate_filter, size_max_filter, add_filter, del_filter, autostart, restart_after_crash, restart_delay, egs_filter): """ Thread to manage khaganat program @@ -1332,10 +1352,10 @@ class Manager(): queueIn=queueIn, queueOut=queueOut, semaphore=semaphore, - keep_state=keep_state, - size_max_state=size_max_state, - add_state=add_state, - del_state=del_state, + activate_filter=activate_filter, + size_max_filter=size_max_filter, + add_filter=add_filter, + del_filter=del_filter, autostart=autostart, restart_after_crash=restart_after_crash, restart_delay=restart_delay, @@ -1355,7 +1375,7 @@ class Manager(): queueOut = multiprocessing.Queue() # semaphore = multiprocessing.Semaphore() semaphore = multiprocessing.BoundedSemaphore() - self.serverHttp.append(name, queueIn, queueOut, semaphore) + self.serverHttp.appendchild(name, queueIn, queueOut, semaphore) if self.launch_program: autostart = True else: @@ -1369,10 +1389,10 @@ class Manager(): queueIn, queueOut, semaphore, - self.param[name]['keep_state'], - self.param[name]['size_max_state'], - self.param[name]['add_state'], - self.param[name]['del_state'], + self.param[name]['activate_filter'], + self.param[name]['size_max_filter'], + self.param[name]['add_filter'], + self.param[name]['del_filter'], autostart, self.param[name]['restart_after_crash'], self.param[name]['restart_delay'], @@ -1387,10 +1407,10 @@ class Manager(): 'path': self.param[name]['path'], 'logsize': self.param[name]['logsize'], 'bufsize': self.param[name]['bufsize'], - 'keep_state': self.param[name]['keep_state'], - 'size_max_state': self.param[name]['size_max_state'], - 'add_state': self.param[name]['add_state'], - 'del_state': self.param[name]['del_state'], + 'activate_filter': self.param[name]['activate_filter'], + 'size_max_filter': self.param[name]['size_max_filter'], + 'add_filter': self.param[name]['add_filter'], + 'del_filter': self.param[name]['del_filter'], 'autostart': autostart, 'restart_after_crash': self.param[name]['restart_after_crash'], 'restart_delay': self.param[name]['restart_delay'], @@ -1398,10 +1418,19 @@ class Manager(): def receive_signal(self, signum, frame): """ Managed signal """ + logging.info("Received signal (%d)" % (signum)) for child in self.threadCommand: - child.terminate() + logging.info("send signal to child %s" % (child.name)) + try: + child.terminate() + child.join() + except AttributeError: + logging.info("child not started") + pass if self.serverHttp: + logging.info("send signal to server http") self.serverHttp.terminate() + logging.info("Finalize signal (%d)" % (signum)) def wait_children_commands(self): for child in self.threadCommand: @@ -1413,6 +1442,8 @@ class Manager(): def run(self): """ launch all """ + signal.signal(signal.SIGABRT, self.receive_signal) + signal.signal(signal.SIGTERM, self.receive_signal) self.launch_command() self.launch_server_http() logging.info('started') @@ -1478,6 +1509,7 @@ def main(args=sys.argv[1:]): logLevel=param.log, launch_program=param.launch_program, show_log_console=param.show_log_console) + logging.debug("End") if __name__ == '__main__': diff --git a/tests/simulate_program.py b/tests/simulate_program.py index c1a6b3c..04f28e5 100755 --- a/tests/simulate_program.py +++ b/tests/simulate_program.py @@ -73,6 +73,12 @@ class SimulateProgram(): if pos > n: self.print_output("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:81)") n = n + 1 + if pos > n: + self.print_output("cbClientAdmin EGS-136 : ADMIN: Player (0x0000000021:00:00:81) doesn't have privilege to execute the client admin command 'infos'") + n = n + 1 + if pos > n: + self.print_output("cbClientAdmin EGS-136 : ADMIN: Player (0x0000000021:00:00:81) tried to execute a no valid client admin command 'INFO'") + n = n + 1 if pos > n: self.print_output("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed") n = n + 1 diff --git a/tests/test_manager.py b/tests/test_manager.py index 7c5590a..a3bec81 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -28,7 +28,6 @@ import queue import signal import http.server import logging -import json from unittest.mock import patch from unittest.mock import MagicMock import traceback @@ -47,6 +46,9 @@ except ImportError: # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # import pymanager.certificate as cert +class TimeoutError(Exception): + pass + def handlerCrash(signum, frame): print("handlerCrash - TimeOut !") @@ -60,7 +62,7 @@ def handlerRaise(signum, frame): print("handlerRaise - TimeOut !") for line in traceback.format_stack(): print(line.strip()) - raise "Timeout" + raise TimeoutError class TestManager(unittest.TestCase): def setUp(self): @@ -103,10 +105,10 @@ class TestManager(unittest.TestCase): config.set('command:test', 'command', '/bin/sleep 10') config.set('command:test', 'logsize', '10') config.set('command:test', 'bufsize', '10') - config.set('command:test', 'keep_state', 'yes') - config.set('command:test', 'size_max_state', '1000') - config.set('command:test', 'add_state', '"^(.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)"') - config.set('command:test', 'del_state', '"^(.*)(disconnectPlayer).+[\s]+(?P.*)[\s]+(is disconnected)"') + config.set('command:test', 'keep_filter', 'yes') + config.set('command:test', 'size_max_filter', '1000') + config.set('command:test', 'add_filter', '"^(.*)(setActiveCharForPlayer).*(: set active char )[\d]+( for )(?P.*)"') + config.set('command:test', 'del_filter', '"^(.*)(disconnectPlayer).+[\s]+(?P.*)[\s]+(is disconnected)"') config.add_section('config:user') config.set('config:user', 'usename', 'filter_all, filter_admin') try: @@ -267,37 +269,42 @@ class TestManager(unittest.TestCase): True) manage._analyze_line("message 1") manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Kezxaa(Lirria)' from BS stream file 'characters/002/account_2_0_pdr.bin'") - if '2' not in manage.state_load_character: + if '2' not in manage.filter_load_character: self.assertTrue(False, "LOADED - Missing player 2") - if '0' not in manage.state_load_character['2']: + if '0' not in manage.filter_load_character['2']: self.assertTrue(False, "LOADED - Missing charactere 0 for player 2") manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '2' Character 'Puskle(Lirria)' from BS stream file 'characters/002/account_2_1_pdr.bin'") - if '1' not in manage.state_load_character['2']: + if '1' not in manage.filter_load_character['2']: self.assertTrue(False, "LOADED - Missing charactere 1 for player 2") manage._analyze_line("alpha egs_plinfo EGS-132 : LOADED User '3' Character 'Puskle(Lirria)' from BS stream file 'characters/003/account_3_4_pdr.bin'") - if '3' not in manage.state_load_character: + if '3' not in manage.filter_load_character: self.assertTrue(False, "LOADED - Missing player 2") manage._analyze_line("alpha egs_ecinfo EGS-132 : setActiveCharForPlayer EGS-132 : set active char 1 for player 2") - if '2' not in manage.state_active_character: + if '2' not in manage.filter_active_character: self.assertTrue(False, "setActiveCharForPlayer - Missing player 2") - if 'NameDomain' not in manage.state_active_character['2']: + if 'NameDomain' not in manage.filter_active_character['2']: self.assertTrue(False, "setActiveCharForPlayer - Missing info player 2") - manage._analyze_line("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:81)") - if 'SID' not in manage.state_active_character['2']: + manage._analyze_line("alpha egs_ecinfo EGS-132 : Mapping UID 2 => Sid (0x0000000021:00:00:83)") + if 'SID' not in manage.filter_active_character['2']: self.assertTrue(False, "setActiveCharForPlayer - Missing SID player 2") - manage._analyze_line("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:81) (Row 90501) added to mirror)") - manage._analyze_line("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:81)") + manage._analyze_line("alpha egs_ecinfo EGS-132 : Client ready (entity (0x0000000021:00:00:83) (Row 90501) added to mirror)") + manage._analyze_line("alpha finalizeClientReady EGS-132 : Updating IS_NEWBIE flag for character: (0x0000000021:00:00:83)") manage._analyze_line("alpha 1383 disconnectPlayer EGS-132 : (EGS) player 2 (Row 90501) removed") manage._analyze_line("alpha egs_plinfo EGS-132 : Player with userId = 2 removed") + manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) doesn't have privilege to execute the client admin command 'infos'") + manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) tried to execute a no valid client admin command 'INFO'") + for i in range(0, 2000): + manage._analyze_line("cbClientAdmin EGS-133 : ADMIN: Player (0x0000000021:00:00:83) tried to execute a no valid client admin command 'INFO' %d" % i) + self.assertTrue(len(manage.filter_admin), 1000) manage._analyze_line("alpha disconnectPlayer EGS-132 : player 2 is disconnected") - if '2' in manage.state_active_character: + if '2' in manage.filter_active_character: self.assertTrue(False, "disconnectPlayer - player 2 always live") def test_manager_command_player(self): signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(30) class MockServerHttp: - def append(self, name, queueIn, queueOut, semaphore): + def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -317,6 +324,7 @@ class TestManager(unittest.TestCase): manage._load_config(config) manage.launch_command() + signal.alarm(10) key = list(manage.info.keys())[0] queueIn = manage.info[key]['queueIn'] queueOut = manage.info[key]['queueOut'] @@ -325,29 +333,26 @@ class TestManager(unittest.TestCase): signal.alarm(30) item = "started" while item == "started": - print("sleep") + logging.debug("sleep") time.sleep(1) - print("semaphore") + logging.debug("semaphore") semaphore.acquire() - print("status") + logging.debug("status") queueIn.put("STATUS") - print("queue") - item = queueOut.get(timeout=4) - print(item) + logging.debug("queue") + item = queueOut.get() semaphore.release() - print("Lecture STDOUT") - print("sleep") + logging.debug("Lecture STDOUT") time.sleep(1) signal.alarm(30) - print("semaphore") + logging.debug("semaphore") semaphore.acquire() - print("stdout") + logging.debug("stdout") queueIn.put("STDOUT") - print("Attend le retour STDOUT") + logging.debug("Attend le retour STDOUT") item = queueOut.get() semaphore.release() - print("Resultat STDOUT") - print(item) + logging.debug("Resultat STDOUT") time.sleep(1) signal.alarm(10) semaphore.acquire() @@ -362,17 +367,26 @@ class TestManager(unittest.TestCase): signal.alarm(0) self.assertTrue(True) signal.alarm(0) - except: + except TimeoutError: print("Prepare Crash - TimeOut !") #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() print("Force Crash - TimeOut !") os._exit(2) + except Exception as e: + print("Prepare Crash - %s" % e) + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + handlerCrash(15, 1) + print("Force Crash - TimeOut !") + os._exit(2) + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) - signal.signal(signal.SIGALRM, handlerCrash) def test_execute_manager_command(self): + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) try: logsize = 10 @@ -406,10 +420,10 @@ class TestManager(unittest.TestCase): while loop > 0: time.sleep(1) out = manageCommand.getlog(0) - if foundEnd.match(out): + if foundEnd.match(str(out)): break loop -= 1 - if not foundEnd.match(out): + if not foundEnd.match(str(out)): manageCommand.stop() self.assertTrue(False, 'Missing message in log') manageCommand.list_thread() @@ -424,10 +438,10 @@ class TestManager(unittest.TestCase): while loop > 0: time.sleep(1) out = manageCommand.getlog(0) - if foundY.match(out): + if foundY.match(str(out)): break loop -= 1 - if not foundY.match(out): + if not foundY.match(str(out)): manageCommand.stop() self.assertTrue(False, 'Missing message in log') @@ -439,9 +453,11 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_crash_manager_command(self): + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) try: logsize = 10 @@ -480,10 +496,12 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except: self.fail('Error initialize object ManageCommand') + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_not_kill_manager_command(self): signal.alarm(30) + signal.signal(signal.SIGALRM, handlerCrash) try: logsize = 10 bufsize = 10 @@ -519,9 +537,8 @@ class TestManager(unittest.TestCase): self.assertEqual(res, 0) res = manageCommand.getlog(0) try: - resjson = json.loads(res) if 'last-line' in res: - if resjson['last-line'] == 5: + if res['last-line'] == 5: wait = False except: pass @@ -532,6 +549,7 @@ class TestManager(unittest.TestCase): self.assertTrue(True) except Exception as e: self.fail('Error when run test (%s)' % str(e)) + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_execute_command_crashed(self): @@ -658,10 +676,10 @@ class TestManager(unittest.TestCase): queueIn=queueIn, queueOut=queueOut, semaphore=semaphore, - keep_state=False, - size_max_state=1, - add_state="", - del_state="", + keep_filter=False, + size_max_filter=1, + add_filter="", + del_filter="", autostart=False, restart_after_crash=False, restart_delay=1, @@ -691,11 +709,15 @@ class TestManager(unittest.TestCase): signal.alarm(0) def test_main(self): + signal.signal(signal.SIGALRM, handlerCrash) signal.alarm(10) config = tempfile.NamedTemporaryFile(suffix="password.cfg", mode='w+t') config.write('[config:server]\nauthentification=no\n') config.flush() + logging.debug("load") Manager.main(['--conf=' + config.name]) + logging.debug("end") + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_run_manager_command(self): @@ -703,7 +725,7 @@ class TestManager(unittest.TestCase): signal.signal(signal.SIGALRM, handlerCrash) signal.alarm(10) class MockServerHttp: - def append(self, name, queueIn, queueOut, semaphore): + def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -730,9 +752,9 @@ class TestManager(unittest.TestCase): queueIn.put("START") # Enable timeout signal.alarm(10) - item = queueOut.get(timeout=4) + item = queueOut.get() semaphore.release() - self.assertEqual(item, "started", 'Error impossible to start program') + self.assertEqual(item['state'], 'started', 'Error impossible to start program') signal.alarm(0) time.sleep(1) logging.debug("[2] --------- send STATUS") @@ -741,7 +763,7 @@ class TestManager(unittest.TestCase): queueIn.put("STATUS") item = queueOut.get() semaphore.release() - self.assertEqual(item, "started", 'Error impossible to read status') + self.assertEqual(item['state'], 'started', 'Error impossible to read status') signal.alarm(0) time.sleep(1) logging.debug("[3] --------- send STDIN arg") @@ -750,7 +772,7 @@ class TestManager(unittest.TestCase): queueIn.put("STDIN arg") item = queueOut.get() semaphore.release() - self.assertEqual(item, "ok", 'Error when send STDIN') + self.assertEqual(item['state'], 'ok', 'Error when send STDIN') signal.alarm(0) time.sleep(1) logging.debug("[4] --------- send STDOUT 4") @@ -765,14 +787,12 @@ class TestManager(unittest.TestCase): logging.debug("[4.4] --------- send STDOUT 4") signal.alarm(0) logging.debug("[4.5] --------- send STDOUT 4") - self.assertRegex(item, - '^[{](.*)("first-line": 4)(.*)[}]$', - 'Error when read STDOUT (Missing first-line)') - self.assertRegex(item, - '^[{](.*)("last-line": 6)(.*)[}]$', - 'Error when read STDOUT (Missing last-line)') - self.assertRegex(item, - '^[{](.*)(6 arg")(.*)[}]$', + self.assertEqual(item['first-line'], 4, 'Error when read STDOUT (Missing first-line)') + self.assertEqual(item['last-line'], 6, 'Error when read STDOUT (Missing first-line)') + # self.assertRegex(item, '^[{](.*)("first-line": 4)(.*)[}]$', 'Error when read STDOUT (Missing first-line)') + # self.assertRegex(item, '^[{](.*)("last-line": 6)(.*)[}]$', 'Error when read STDOUT (Missing last-line)') + self.assertRegex(str(item), + '^(.*)(6 arg)(.*)$', 'Error when read STDOUT (bad record)') time.sleep(1) logging.debug("[5] --------- send BADCOMMAND") @@ -781,7 +801,7 @@ class TestManager(unittest.TestCase): queueIn.put("BADCOMMAND") item = queueOut.get() semaphore.release() - self.assertEqual(item, "error : command unknown", 'Error impossible to read status') + self.assertEqual(item['error'], 'command unknown', 'Error impossible to read status') signal.alarm(0) time.sleep(1) logging.debug("[6] --------- send STOP") @@ -790,7 +810,7 @@ class TestManager(unittest.TestCase): queueIn.put("STOP") item = queueOut.get() semaphore.release() - self.assertEqual(item, "stopped", 'Error impossible to read status') + self.assertEqual(item["state"], "stopped", 'Error impossible to read status') signal.alarm(0) time.sleep(1) logging.debug("[7] --------- send SHUTDOWN") @@ -816,13 +836,13 @@ class TestManager(unittest.TestCase): print("test_run_manager_command - Force Crash - TimeOut !") os._exit(2) signal.alarm(0) - signal.signal(signal.SIGALRM, handlerCrash) + signal.signal(signal.SIGALRM, handlerRaise) def test_run_manager_command_autostart(self): # Enable timeout signal.alarm(10) class MockServerHttp: - def append(self, name, queueIn, queueOut, semaphore): + def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -848,7 +868,7 @@ class TestManager(unittest.TestCase): queueIn.put("STATUS") item = queueOut.get(timeout=4) semaphore.release() - self.assertEqual(item, "started", 'Error impossible to read status') + self.assertEqual(item['state'], "started", 'Error impossible to read status') time.sleep(1) signal.alarm(10) semaphore.acquire() @@ -866,9 +886,10 @@ class TestManager(unittest.TestCase): def test_run_manager_command_autostart_option(self): # Enable timeout + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) class MockServerHttp: - def append(self, name, queueIn, queueOut, semaphore): + def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -881,44 +902,61 @@ class TestManager(unittest.TestCase): config.set('command:test', 'autostart', 'yes') manage = Manager.Manager(False) - manage.serverHttp = MockServerHttp() - manage._load_config(config) - manage.launch_command() - time.sleep(5) + try: + manage.serverHttp = MockServerHttp() + manage._load_config(config) + manage.launch_command() + time.sleep(5) - key = list(manage.info.keys())[0] - queueIn = manage.info[key]['queueIn'] - queueOut = manage.info[key]['queueOut'] - semaphore = manage.info[key]['semaphore'] + key = list(manage.info.keys())[0] + queueIn = manage.info[key]['queueIn'] + queueOut = manage.info[key]['queueOut'] + semaphore = manage.info[key]['semaphore'] - signal.alarm(10) - semaphore.acquire() - queueIn.put("STATUS") - item = queueOut.get(timeout=4) - semaphore.release() - self.assertEqual(item, "started", 'program not started') - time.sleep(1) - signal.alarm(10) - semaphore.acquire() - queueIn.put("SHUTDOWN") - with self.assertRaises(queue.Empty): + signal.alarm(10) + semaphore.acquire() + queueIn.put("STATUS") item = queueOut.get(timeout=4) - semaphore.release() - #threadCommand.join() - manage.receive_signal(15, 1) - manage.wait_children_commands() - #Disable timeout - signal.alarm(0) + semaphore.release() + self.assertEqual(item["state"], "started", 'program not started') + time.sleep(1) + signal.alarm(10) + semaphore.acquire() + queueIn.put("SHUTDOWN") + with self.assertRaises(queue.Empty): + item = queueOut.get(timeout=4) + semaphore.release() + #threadCommand.join() + manage.receive_signal(15, 1) + manage.wait_children_commands() + #Disable timeout + signal.alarm(0) + except TimeoutError: + print("test_run_manager_command_restart_after_crash - Prepare Crash - TimeOut !") + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") + os._exit(2) + except Exception as e: + print("test_run_manager_command_restart_after_crash - Prepare Crash - %s" % e) + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + handlerCrash(15, 1) + print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") + os._exit(2) self.assertTrue(True) + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) def test_run_manager_command_restart_after_crash(self): # Enable timeout - signal.signal(signal.SIGALRM, handlerCrash) + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(10) logging.debug("--------- test_run_manager_command_restart_after_crash => Start") class MockServerHttp: - def append(self, name, queueIn, queueOut, semaphore): + def appendchild(self, name, queueIn, queueOut, semaphore): pass def terminate(self): pass @@ -946,22 +984,21 @@ class TestManager(unittest.TestCase): logging.debug("--------- wait not started") signal.alarm(20) - item = "started" - while item == "started": + item = {"state": "started"} + while item['state'] == "started": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") try: - item = queueOut.get(timeout=4) + item = queueOut.get() except queue.Empty: pass semaphore.release() - self.assertEqual(item, "crashed", 'program not started') + self.assertEqual(item['state'], "crashed", 'program not started') signal.alarm(20) logging.debug("--------- wait not crashed") - # item = "crashed" - while item == "crashed": + while item['state'] == "crashed": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") @@ -972,12 +1009,11 @@ class TestManager(unittest.TestCase): semaphore.release() signal.alarm(20) - self.assertEqual(item, "started", 'program not started') + self.assertEqual(item['state'], "started", 'program not started') logging.debug("--------- wait not started") signal.alarm(20) - item = "started" - while item == "started": + while item['state'] == "started": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") @@ -986,20 +1022,17 @@ class TestManager(unittest.TestCase): except queue.Empty: pass semaphore.release() - self.assertEqual(item, "crashed", 'program not started') - signal.alarm(20) - - self.assertEqual(item, "crashed", 'program not started') + self.assertEqual(item['state'], "crashed", 'program not started') signal.alarm(20) logging.debug("--------- wait not crashed") # item = "crashed" - while item == "crashed": + while item['state'] == "crashed": time.sleep(1) semaphore.acquire() queueIn.put("STATUS") try: - item = queueOut.get(timeout=4) + item = queueOut.get() except queue.Empty: pass semaphore.release() @@ -1016,14 +1049,23 @@ class TestManager(unittest.TestCase): manage.wait_children_commands() #Disable timeout signal.alarm(0) - except: - print("Prepare Crash - TimeOut !") + except TimeoutError: + print("test_run_manager_command_restart_after_crash - Prepare Crash - TimeOut !") #self.fail('Error initialize object ManageCommand') manage.receive_signal(15, 1) manage.wait_children_commands() - print("Force Crash - TimeOut !") + print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") + os._exit(2) + except Exception as e: + print("test_run_manager_command_restart_after_crash - Prepare Crash - %s" % e) + #self.fail('Error initialize object ManageCommand') + manage.receive_signal(15, 1) + manage.wait_children_commands() + handlerCrash(15, 1) + print("test_run_manager_command_restart_after_crash - Force Crash - TimeOut !") os._exit(2) self.assertTrue(True) + signal.signal(signal.SIGALRM, handlerRaise) signal.alarm(0) class MockStreamRequestHandler(): @@ -1222,7 +1264,8 @@ class TestManager(unittest.TestCase): manage.rfile.define_return( '{"name": "test", "first-line" : "1"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._command_log() - self.assertEqual(b'empty',manage.wfile.message) + print(manage.wfile.message) + self.assertEqual(b'"empty"',manage.wfile.message) signal.alarm(0) @patch.object(http.server.SimpleHTTPRequestHandler, '__init__') @@ -1417,7 +1460,7 @@ class TestManager(unittest.TestCase): manage.server.listSemaphore = {'test': multiprocessing.Semaphore() } manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} - manage.server.listQueueOut['test'].put("empty") + manage.server.listQueueOut['test'].put({"state": "empty"}) manage.rfile.define_return( '{"name": "test", "action": "example"}' ) manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} manage._send_action() @@ -1494,7 +1537,7 @@ class TestManager(unittest.TestCase): manage.server.listQueueIn = {'test': multiprocessing.Queue() } manage.server.listQueueOut = {'test': multiprocessing.Queue()} manage.headers = {'content-length' : '1000', 'content-type' : 'application/json'} - manage.server.listQueueOut['test'].put("empty") + manage.server.listQueueOut['test'].put({"state": "empty"}) manage.rfile.define_return( '{"name": "test"}' ) manage._send_command("STATUS") self.assertEqual(b'{"state": "empty"}',manage.wfile.message)