opennel-pymanager/pymanager/manager.py

1560 lines
69 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# script to start/stop/status/send command/read log for OpenNeL process
#
# Copyright (C) 2017 AleaJactaEst
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Configuration File
------------------
This script need configuration file (see below for model)::
[config:server]
# Define port listen (default 8000)
port = 8000
# Example to generate all key : see pycertificate
# key
keyfile = /home/gameserver/ca/appli/private/serverkey.pem
# certificate
certfile = /home/gameserver/ca/appli/certs/servercert.pem
# certification to check signature
ca_cert = /home/gameserver/ca/appli/certs/cachaincert.pem
# address listen (default all port)
address =
# method : http or https
method = https
# Admin Executor Service
[command:aes]
# command to launch the program
command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES
# Path : where this program is launched
path = /home/gameserver/khanat/server/
# size buffer log for each program launched (number line stdout)
logsize = 1000
# buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager)
bufsize = 100
# bms_master : backup_service
[command:bms_master]
# command to launch the program
command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990
# Path : where this program is launched
path = /home/gameserver/khanat/server/
# we keep [logsize] last number line stdout
logsize = 1000
# buffer size (define value bufsize on subprocess.Popen)
bufsize = 100
# It's possible to collect some message on output (example player conected) with regex command
# keep some data on array/dict state
2018-10-09 10:21:50 +00:00
activate_filter = yes
# size array/dict state
2018-10-09 10:21:50 +00:00
size_max_filter = 1000
# search regex to add state (python regex)
add_filter = "^((.*)(setActiveCharForPlayer).*(: set active char )[0-9]+( for )(?P<ActivePlayer>.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[ ]+(?P<InactivePlayer>.*)[ ]+(is disconnected))"
del_filter = "^((.*)(setActiveCharForPlayer).*(: set active char )[0-9]+( for )(?P<InactivePlayer>.*)|(.*)(disconnectPlayer)(.+:.+<.+>){0,1}[ ]+(?P<ActivePlayer>.*)[ ]+(is disconnected))"
# autostart (when start OpenNelManager, launch this program)
autostart = no
# restart after crash
restart_after_crash = yes
# Delay after each restart (second)
restart_delay = 10
# Enable special filter EGS (account connection / command admin)
egs_filter = yes
Manager
-------
Manage all process OpenNeL
Launch this prorgam in background and use clientManager to manipulate process
Design
.. graphviz::
digraph Manager {
"Manager" -> "ManageCommand (command 1)";
"ManageCommand (command 1)" -> "read_output (thread1)";
"Manager" -> "ManageCommand (command 2)";
"ManageCommand (command 2)" -> "read_output (thread2)";
"Manager" -> "ServerHttp";
"ServerHttp" -> "OpenNeLHTTPServer";
"OpenNeLHTTPServer" -> "ManageHttpRequest";
"ManageHttpRequest" -> "ManageCommand (command 1)" [style=dashed];
"ManageHttpRequest" -> "ManageCommand (command 2)" [style=dashed];
}
http(s) command :
-----------------
2018-10-09 10:21:50 +00:00
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **Html command** | **Path** | **Argument** {json format} | **Comment** | **Return*** |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **POST** | /SHUTDOWN | | Stop all process and stop pymanager | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **POST** | /STARTALL | | Start all processes | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /STATUSALL | | Get status all processes | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **POST** | /STOPALL | | Stop all processes | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **POST** | /START | {'name': program} | Start for one program | {'state': '<STATE>'} |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /STATUS | {'name': program} | Get status for one program | {'state': '<STATE>'} |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **POST** | /STOP | {'name': program} | Stop for one program | {'state': '<STATE>'} |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /FILTER | {'name': program } | Get all state (key find in stdout add/remove) | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /CONFIG | {'name': program } | Get configuration | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /INFO | {'name': program } | Get Information (number player, ...) | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /PLAYER | {'name': program } | Get active player | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
| **GET** | /ADMINCOMMAND | {'name': program } | Get admin commmand | |
+------------------+------------------+---------------------------------------------+-----------------------------------------------+------------------------------+
Example ::
nohup pymanager --log info --filelog /home/gameserver/log/manager.log -c /home/gameserver/cfg/opennel_manager.cfg 2>/dev/null 1>/dev/null 0</dev/zero &
"""
import sys
import subprocess
import queue
import threading
import signal
import argparse
import configparser
import logging
import logging.config
import multiprocessing
import time
import ssl
import http.server
import json
import fcntl
import os
import base64
import re
from socketserver import ThreadingMixIn
try:
import bcrypt
__DISABLE_BCRYPT__ = False
except ImportError:
__DISABLE_BCRYPT__ = True
__VERSION__ = '1.1.0'
class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
"""
Class ManageHttpRequest receive all request https
* analyze and send to ManageCommand (with queueIn & queueOut)
Methods inherited from SimpleHTTPRequestHandler:
"""
def __init__(self, request, client_address, server):
""" Initialize object """
http.server.SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
def log_message(self, format, *args):
""" function use to send message in log
:param str format: format message
:param list args: argument
"""
logging.info("%s (%s) %s" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def _set_headers(self):
""" Prepare header """
self.send_response(200)
self.send_header('Content-type', 'application/json')
2018-08-05 11:25:16 +00:00
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header("Access-Control-Allow-Headers", "Content-Type, *")
self.end_headers()
def _extract_input_data(self):
""" sub request log (send log on specific process) """
if 'content-type' in self.headers:
ctype = self.headers['content-type']
else:
ctype = 'text'
if ctype != 'application/json':
logging.error("Received request with bad content-type")
self.send_error(400, "bad content-type")
self.end_headers()
return None
try:
sizemsg = int(self.headers['content-length'])
except (TypeError, KeyError, ValueError):
logging.error("Received request with bad content-length")
self.send_error(400, "bad content-length")
self.end_headers()
return None
msg = self.rfile.read(sizemsg)
try:
msgjson = json.loads(msg.decode())
except json.decoder.JSONDecodeError as e:
logging.error("Received request with json (%s)" % str(e))
return None
return msgjson
def _command_log(self):
""" sub request log (send log on specific process) """
msgjson = self._extract_input_data()
2018-02-13 20:36:56 +00:00
if msgjson is None:
return
logging.debug(msgjson)
if 'name' not in msgjson:
self.send_error(400, 'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
if 'first-line' not in msgjson:
self.send_error(400, 'Missing param first-line')
logging.error("Missing param first-line '%s'" % name)
return
firstLine = 0
try:
firstLine = int(msgjson['first-line'])
except ValueError:
self.send_error(400, 'Impossible to read first-line')
logging.error("Impossible to read first-line '%s'" % msgjson['first-line'])
return
logging.debug("%s:%s" % (name, firstLine))
self.server.listSemaphore[name].acquire()
self.server.listQueueIn[name].put("STDOUT %d" % firstLine)
logging.debug("Send request to '%s'" % (name))
try:
item = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
logging.debug("Received nothing from '%s'" % name)
return
self.server.listSemaphore[name].release()
self._set_headers()
2019-04-07 09:33:54 +00:00
try:
self.wfile.write(bytes(json.dumps(item), "utf-8"))
logging.debug("item : %s" % item)
except BrokenPipeError:
# ignore BrokenPipeError: [Errno 32] Broken pipe
logging.debug("BrokenPipeError detected")
pass
def _send_list(self):
""" sub-request to list all program available """
outjson = {}
pos = 0
for program in self.server.listQueueIn:
outjson.setdefault(pos, program)
pos += 1
self._set_headers()
2019-04-07 09:33:54 +00:00
try:
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
except BrokenPipeError:
# ignore BrokenPipeError: [Errno 32] Broken pipe
logging.debug("BrokenPipeError detected")
pass
def _send_shutdown(self):
""" Stop all program and stop manager """
for name in self.server.listQueueIn:
self.server.listSemaphore[name].acquire()
self.server.listQueueIn[name].put("SHUTDOWN")
self.server.listSemaphore[name].release()
self._set_headers()
outjson = {'shutdown': 'ok'}
2019-04-07 09:33:54 +00:00
try:
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
except BrokenPipeError:
# ignore BrokenPipeError: [Errno 32] Broken pipe
logging.debug("BrokenPipeError detected")
pass
def _send_command_all(self, command):
""" Send specific command on all program
:param str action: command (START, STOP, STATUS, ... )
"""
outjson = {}
for name in self.server.listQueueIn:
self.server.listSemaphore[name].acquire()
self.server.listQueueIn[name].put(command)
try:
item = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
logging.debug("pas de message recu pour %s" % name)
return
self.server.listSemaphore[name].release()
outjson.setdefault(name, item)
self._set_headers()
2019-04-07 09:33:54 +00:00
try:
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
except BrokenPipeError:
# ignore BrokenPipeError: [Errno 32] Broken pipe
logging.debug("BrokenPipeError detected")
pass
def _send_action(self):
""" send specific action on one program """
msgjson = self._extract_input_data()
2018-02-13 20:36:56 +00:00
if msgjson is None:
return
logging.debug(msgjson)
if 'name' not in msgjson:
self.send_error(400, 'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
if 'action' not in msgjson:
self.send_error(400, 'Missing param action')
logging.error("Missing param action '%s'" % name)
return
action = msgjson['action']
logging.debug("%s:%s" % (name, action))
self.server.listSemaphore[name].acquire()
self.server.listQueueIn[name].put("STDIN %s" % action)
2018-10-09 10:21:50 +00:00
logging.debug("message sent: %s" % (name))
try:
2018-10-09 10:21:50 +00:00
outjson = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
2018-10-09 10:21:50 +00:00
logging.debug("no return %s" % name)
self.server.listSemaphore[name].release()
return
except Exception as e:
# Trap all error (release semaphore)
self.send_error(500, 'Internal Error')
logging.error("unknown error %s (%s)" % (name, e))
self.server.listSemaphore[name].release()
return
self.server.listSemaphore[name].release()
self._set_headers()
2019-04-07 09:33:54 +00:00
try:
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
except BrokenPipeError:
# ignore BrokenPipeError: [Errno 32] Broken pipe
logging.debug("BrokenPipeError detected")
pass
def _send_command(self, command):
""" Send specific command on one program
:param str command: command (START, STOP, STATUS, ... )
"""
msgjson = self._extract_input_data()
2018-02-13 20:36:56 +00:00
if msgjson is None:
return
if 'name' not in msgjson:
self.send_error(400, 'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
logging.debug("[%s %s] Send command" % (command, name))
self.server.listSemaphore[name].acquire()
self.server.listQueueIn[name].put(command)
try:
2018-10-09 10:21:50 +00:00
outjson = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
self.send_error(500, 'Missing return')
logging.debug("[%s %s] Missing return" % (command, name))
2018-10-09 10:21:50 +00:00
self.server.listSemaphore[name].release()
return
except Exception as e:
# Trap all error (release semaphore)
self.send_error(500, 'Internal Error')
logging.error("unknown error %s (%s)" % (name, e))
self.server.listSemaphore[name].release()
return
self.server.listSemaphore[name].release()
2018-10-09 10:21:50 +00:00
logging.debug("[%s %s] => %s" % (command, name, outjson))
self._set_headers()
2019-04-07 09:33:54 +00:00
try:
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
except BrokenPipeError:
# ignore BrokenPipeError: [Errno 32] Broken pipe
logging.debug("BrokenPipeError detected")
pass
def check_authentication(self):
if not self.server.authentification:
return True
if __DISABLE_BCRYPT__:
logging.error("Error module python bcrypt not installed")
return False
try:
auth_header = self.headers['Authorization'].split()
if auth_header[0] != 'Basic':
logging.error("Authentification with Bad method (%s)" % auth_header[0])
return False
decode = base64.b64decode(auth_header[1]).decode('UTF-8')
account, password = decode.split(':', maxsplit=1)
if account not in self.server.users:
logging.error("Authentification with unknown user (%s)" % account)
return False
hashed_password = self.server.users[account]
2018-03-07 21:41:35 +00:00
if bcrypt.checkpw(password.encode('utf-8'), hashed_password):
return True
else:
logging.error("Authentification with wrong password for user (%s)" % account)
return False
except (ValueError, IndexError, AttributeError) as e:
logging.error("Error detected %s" % e)
return False
def do_GET(self):
"""
Manage request READ
we can execute LOG, STATUS, LIST & STATUSALL
"""
logging.debug('get recieved : %s' % self.path)
if not self.check_authentication():
self.send_error(403, 'Wrong authentication')
logging.error("Wrong authentication")
elif self.path == '/STDOUT':
self._command_log()
2018-10-09 10:21:50 +00:00
elif self.path == "/FILTER":
self._send_command("FILTER")
elif self.path == "/INFO":
self._send_command("INFO")
elif self.path == "/PLAYER":
self._send_command("PLAYER")
elif self.path == "/ADMINCOMMAND":
self._send_command("ADMINCOMMAND")
elif self.path == "/CONFIG":
self._send_command("CONFIG")
elif self.path == '/STATUS':
self._send_command("STATUS")
elif self.path == '/LIST':
self._send_list()
elif self.path == '/STATUSALL':
self._send_command_all("STATUS")
else:
self.send_error(400, 'Path unknown')
logging.error("Path unknwon '%s'" % self.path)
def do_POST(self):
""" Manage request POST (CREATE)
currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL
"""
logging.debug('post recieved : %s' % self.path)
if not self.check_authentication():
self.send_error(403, 'Wrong authentication')
logging.error("Wrong authentication")
elif self.path == '/START':
self._send_command("START")
elif self.path == '/STOP':
self._send_command("STOP")
elif self.path == '/STDIN':
self._send_action()
elif self.path == '/SHUTDOWN':
self._send_shutdown()
elif self.path == '/STARTALL':
self._send_command_all("START")
elif self.path == '/STOPALL':
self._send_command_all("STOP")
else:
self.send_error(400, 'Path unknown')
logging.error("Path unknwon '%s'" % self.path)
def do_HEAD(self):
""" request HEAD received """
logging.debug('head recieved : %s' % self.path)
self.send_error(404, 'File Not Found: %s' % self.path)
def do_PUT(self):
""" request PUT (UPDATE/REPLACE) received """
logging.debug('put recieved!')
self.send_error(404, 'File Not Found: %s' % self.path)
def do_PATCH(self):
""" request PATCH (UPDATE/MODIFY) received """
logging.debug('patch recieved!')
self.send_error(404, 'File Not Found: %s' % self.path)
def do_DELETE(self):
""" request DELETE received """
logging.debug('delete recieved!')
self.send_error(404, 'File Not Found: %s' % self.path)
def do_OPTIONS(self):
""" request OPTIONS received """
self.send_response(200, "ok")
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
2018-08-05 11:25:16 +00:00
self.send_header("Access-Control-Allow-Headers", "Content-Type, *")
self.end_headers()
class OpenNeLHTTPServer(ThreadingMixIn, http.server.HTTPServer):
"""
Class OpenNeLHTTPServer
Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest)
"""
def __init__(self,
listQueueIn,
listQueueOut,
listSemaphore,
server_address,
RequestHandlerClass,
authentification,
users,
bind_and_activate=True):
http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)
self.listQueueIn = listQueueIn
self.listQueueOut = listQueueOut
self.listSemaphore = listSemaphore
self.authentification = authentification
self.users = users
class ServerHttp(multiprocessing.Process):
"""
Initialize server HTTPS
* define Dictionnary queueIn & queueOut (with key as section's name in configuration)
"""
def __init__(self, keyfile, certfile, ca_cert, address='',
port=8000, authentification=True, method='http', users={}):
multiprocessing.Process.__init__(self)
self.listQueueIn = {}
self.listQueueOut = {}
self.listSemaphore = {}
self.port = port
self.key_file = keyfile
self.cert_file = certfile
self.ca_cert = ca_cert
self.address = address
self.authentification = authentification
self.users = users
self.method = method
def run(self):
server_address = (self.address, self.port)
httpd = OpenNeLHTTPServer(self.listQueueIn,
self.listQueueOut,
self.listSemaphore,
server_address,
ManageHttpRequest,
self.authentification,
self.users)
if self.method == 'http':
logging.info('http listen')
elif self.method == 'https':
if self.ca_cert:
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_cert,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1_2,
server_side=True)
else:
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=self.key_file,
certfile=self.cert_file,
server_side=True)
logging.info('https listen')
else:
logging.error("Bad value 'method' (%s)" % str(self.method))
raise ValueError
httpd.serve_forever()
2018-10-09 10:21:50 +00:00
logging.info("End")
2018-10-09 10:21:50 +00:00
def appendchild(self, name, queueIn, queueOut, semaphore):
self.listQueueIn.setdefault(name, queueIn)
self.listQueueOut.setdefault(name, queueOut)
self.listSemaphore.setdefault(name, semaphore)
class ManageCommand():
"""
Manage Command (only one)
* start/stop/status/get log/send an action [stdin] for command (receive order with queueIn)
* read output [in other thread]
* communicate with ManageHttpRequest (with queueOut)
"""
def __init__(self, name,
command, path,
logsize, bufsize, queueIn, queueOut, semaphore,
2018-10-09 10:21:50 +00:00
activate_filter, size_max_filter, add_filter, del_filter,
autostart, restart_after_crash, restart_delay,
egs_filter,
maxWaitEnd=10, waitDelay=1):
self.process = None
self.queueIn = queueIn
self.queueOut = queueOut
self.name = name
self.command = command
self.path = path
self.log = []
self.poslastlog = 0
self.maxlog = logsize
self.semaphore = semaphore
self.bufsize = bufsize
self.threadRead = None
self.running = False
2018-10-09 10:21:50 +00:00
# self.state = multiprocessing.Queue()
self.pipeIn, self.pipeOut = multiprocessing.Pipe()
self.eventRunningReader = threading.Event()
self.eventRunningRestart = threading.Event()
self.maxWaitEnd = maxWaitEnd
self.waitDelay = waitDelay
2018-10-09 10:21:50 +00:00
self.activate_filter = activate_filter
self.size_max_filter = size_max_filter
self.add_filter_cmd = add_filter[1:-1]
self.del_filter_cmd = del_filter[1:-1]
self.filter_add_filter = re.compile(self.add_filter_cmd)
self.filter_del_filter = re.compile(self.del_filter_cmd)
self.filter = {}
self.autostart = autostart
self.restart_after_crash = restart_after_crash
self.restart_delay = restart_delay
self.threadRestart = None
self.egs_filter = egs_filter
self.egs_filter_register_entity = re.compile(r".*(registerEntity).*(: EIT: Register EId )[(](?P<EId>.*)[)] EntityName '(?P<NameDomain>[^']+)' UId (?P<UID>[\d]+) UserName '(?P<UserName>[^']+)'")
self.egs_filter_active_character = re.compile(r".*(setActiveCharForPlayer).*(: set active char )(?P<IDCHAR>[\d]+)( for player )(?P<UID>[\d]+)")
self.egs_filter_sid = re.compile(r".*(Mapping UID )(?P<UID>[\d]+)( => Sid )[(](?P<SID>.*)[)]")
self.egs_filter_client_ready = re.compile(r".*(Updating IS_NEWBIE flag for character: )[(](?P<ID>.*)[)]")
self.egs_filter_disconnected = re.compile(r".*(disconnectPlayer).+[\s]+(player )(?P<UID>[\d]+)[\s]+(is disconnected)")
self.egs_filter_admin = re.compile(r"(.*)(cbClientAdmin).*(: ADMIN)(: Player )[(](?P<SID>.*)[)](?P<ACTION>.+)")
self.filter_register_entity = {}
2018-10-09 10:21:50 +00:00
self.filter_active_character = {}
self.filter_admin = {}
self.number_start = 0
self.first_line = 0
self.last_line = 0
2018-10-09 10:21:50 +00:00
self.pos_admin = 0
def _analyze_line(self, msg):
now = time.strftime('%Y/%m/%d %H:%M:%S %Z')
self.poslastlog += 1
while len(self.log) >= self.maxlog:
self.log.pop(0)
self.first_line = self.first_line + 1
self.log.append(now + ' ' + msg)
self.last_line = self.last_line + 1
# If option sate is defined, analyze message and keep state (example , all player connected)
logging.debug("recu: '%s'" % (msg))
2018-10-09 10:21:50 +00:00
if self.activate_filter:
res = self.filter_add_filter.match(msg)
if res:
2018-10-09 10:21:50 +00:00
logging.debug("add_filter found")
if len(self.filter) < self.size_max_filter:
logging.debug("include add_filter found")
dico = res.groupdict()
for key in dico:
2018-10-09 10:21:50 +00:00
logging.debug("set add_filter found [%s]" % (str(key)))
if dico[key]:
2018-10-09 10:21:50 +00:00
logging.debug("set1 add_filter found [%s][%s]" % (str(key), str(dico[key])))
self.filter.setdefault(key, {})
self.filter[key].setdefault(dico[key], now)
res = self.filter_del_filter.match(msg)
if res:
2018-10-09 10:21:50 +00:00
logging.debug("del_filter found")
dico = res.groupdict()
for key in dico:
2018-10-09 10:21:50 +00:00
logging.debug("prepare del_filter found %s" % str(key))
if dico[key]:
2018-10-09 10:21:50 +00:00
self.filter.setdefault(key, {})
if dico[key] in self.filter[key]:
logging.debug("del1 del_filter found [%s][%s][%s]" % (str(key), str(dico[key]), str(self.filter[key])))
del self.filter[key][dico[key]]
if self.egs_filter:
res = self.egs_filter_register_entity.match(msg)
if res:
logging.debug("egs_filter_register_entity found")
if len(self.filter_register_entity) < self.size_max_filter:
logging.debug("include filter_register_entity found")
dico = res.groupdict()
try:
self.filter_register_entity.setdefault(dico['UID'], {})
IDCHAR = len(self.filter_register_entity[dico['UID']])
if IDCHAR > 0:
# check UserName
if dico['UserName'] != self.filter_register_entity[dico['UID']][0]['UserName']:
logging.info("Remove old player %s by %s (UID:%d)" % (self.filter_register_entity[dico['UID']][IDCHAR]['UserName'], dico['UserName'], dico['UID']))
del self.filter_register_entity[dico['UID']]
logging.debug("add filter_register_entity (uid:%s char:%d)" % (dico['UID'], IDCHAR))
self.filter_register_entity[dico['UID']].setdefault(IDCHAR, {'NameDomain': dico['NameDomain'], 'UserName': dico['UserName'], 'UID': dico['UID'], "EId": dico['EId'], 'When': now, 'CHAR': IDCHAR})
except KeyError as e:
logging.error('Missing key when read "register_entity" (%s)' % e)
else:
logging.warning("impossible to add param 'register_entity' (size too high)")
return
res = self.egs_filter_active_character.match(msg)
if res:
logging.debug("egs_filter_active_character found")
dico = res.groupdict()
2018-10-09 10:21:50 +00:00
if len(self.filter_active_character) < self.size_max_filter:
try:
2018-10-09 10:21:50 +00:00
self.filter_active_character.setdefault(dico['UID'], {})
self.filter_active_character[dico['UID']] = self.filter_register_entity[dico['UID']][int(dico['IDCHAR'])]
except KeyError as e:
logging.error('Missing key when read "active_character" (%s)' % e)
else:
logging.warning("impossible to add param 'active_character' (size too high)")
# try:
# del self.filter_register_entity[dico['UID']]
# except KeyError as e:
# logging.error('Missing key when delete "active_character" (%s)' % e)
return
res = self.egs_filter_sid.match(msg)
if res:
logging.debug("egs_filter_sid found")
dico = res.groupdict()
try:
2018-10-09 10:21:50 +00:00
if dico['UID'] in self.filter_active_character:
self.filter_active_character[dico['UID']].setdefault("SID", dico['SID'])
self.filter_active_character[dico['UID']].setdefault("When", now)
else:
logging.error('Impossible to add SID on player %s (Player not found)' % dico['UID'])
except KeyError as e:
logging.error('Missing key when read "sid" (%s)' % e)
return
res = self.egs_filter_disconnected.match(msg)
if res:
logging.debug("egs_filter_sid found")
dico = res.groupdict()
try:
2018-10-09 10:21:50 +00:00
if dico['UID'] in self.filter_active_character:
del self.filter_active_character[dico['UID']]
else:
logging.error('Impossible to remove player %s (Player not found)' % dico['UID'])
except KeyError as e:
logging.error('Missing key when remove player (%s)' % e)
return
2018-10-09 10:21:50 +00:00
res = self.egs_filter_admin.match(msg)
if res:
logging.debug("egs_filter_admin found")
2018-10-09 10:21:50 +00:00
while len(self.filter_admin) >= self.maxlog:
print(self.pos_admin, self.pos_admin - self.maxlog)
2018-10-09 10:21:50 +00:00
del self.filter_admin[self.pos_admin - self.maxlog]
try:
2018-10-09 10:21:50 +00:00
dico = res.groupdict()
UserName = ''
NameDomain = ''
try:
2018-10-09 10:21:50 +00:00
for key in self.filter_active_character:
if self.filter_active_character[key]['SID'] == dico['SID']:
NameDomain = self.filter_active_character[key]['NameDomain']
UserName = self.filter_active_character[key]['UserName']
EId = self.filter_active_character[key]['EId']
break
except KeyError:
pass
self.filter_admin.setdefault(self.pos_admin, {'When': now, 'SID': dico['SID'], "EId": EId, 'ACTION': dico['ACTION'], 'NameDomain': NameDomain, 'UserName': UserName})
except KeyError as e:
logging.error('Missing key when admin player (%s)' % e)
self.pos_admin = self.pos_admin + 1
return
def _readline_stdout(self):
try:
line = self.process.stdout.readline()
except AttributeError:
logging.error("process %s down (not detected)" % self.name)
return True, False
except ValueError:
logging.error("process %s down (not detected)" % self.name)
return True, False
if not line:
time.sleep(self.waitDelay)
return False, True
logging.debug("line %s " % line)
self._analyze_line(line.decode().strip())
return False, False
def read_output(self):
""" Thread to read output (stdout) """
fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
logging.debug("Start reader %s" % self.name)
crashed = False
while self.eventRunningReader.is_set():
try:
logging.debug("ping")
code = self.process.poll()
if code is not None:
logging.error("process %s down" % self.name)
crashed = True
except AttributeError as e:
logging.warning("process %s down (%s)" % (self.name, e))
break
crashedbis, end = self._readline_stdout()
if end and (crashed or crashedbis):
break
# Send to thread manage process
if crashed:
logging.debug("Process stopped : '%s'" % self.name)
wait_semaphore = self.semaphore.acquire(False)
while self.eventRunningReader.is_set() and not wait_semaphore:
time.sleep(1)
wait_semaphore = self.semaphore.acquire(False)
if wait_semaphore:
self.queueIn.put("STOPPED")
self.semaphore.release()
2018-10-09 10:21:50 +00:00
if self.activate_filter:
self.filter_load_character = {}
self.filter_active_character = {}
logging.debug("End reader: '%s'" % self.name)
def restart(self):
""" Thread to restart after crash """
logging.debug('initialize process restart %s (wait %ds)' % (self.name, self.restart_delay))
time.sleep(self.restart_delay)
logging.debug('Prepare restart service %s' % (self.name))
wait_semaphore = self.semaphore.acquire(False)
while self.eventRunningRestart.is_set() and not wait_semaphore:
logging.debug('Ping - restart service %s' % (self.name))
time.sleep(1)
wait_semaphore = self.semaphore.acquire(False)
logging.debug('Prepare restart service %s (step 2)' % (self.name))
if wait_semaphore:
logging.debug('Restart service %s' % (self.name))
self.queueIn.put("START")
self.queueOut.get()
self.semaphore.release()
logging.debug('Prepare restart service %s (step 3)' % (self.name))
2018-10-09 10:21:50 +00:00
def receive_signal(self, signum, frame):
""" Managed signal (not used) """
2018-10-09 10:21:50 +00:00
logging.info("Received signal %s (%d)" % (self.name, signum))
self.queueIn.put("SHUTDOWN")
self.queueIn.put("SHUTDOWN")
def start(self):
""" Start program """
logging.debug("start %s" % (self.name))
if self.process:
logging.debug("%s already exist" % self.name)
code = self.process.poll()
if code is None:
logging.debug("%s already exist" % self.name)
return 0
else:
logging.debug("%s crashed" % self.name)
code = self.process.wait()
self.process.stdin.close()
self.process.stdout.close()
logging.error("%s crashed (return code:%d) - restart program" % (self.name, code))
try:
self.process = subprocess.Popen(self.command.split(),
cwd=self.path,
shell=False,
bufsize=self.bufsize,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True)
except FileNotFoundError as e:
logging.error("Impossible to start %s (%s)" % (self.name, e))
return 2
except PermissionError as e:
logging.error("Impossible to start %s (%s)" % (self.name, e))
return 2
if self.threadRead:
self.eventRunningReader.clear()
self.threadRead.join()
self.threadRead = None
self.running = True
self.eventRunningReader.set()
self.threadRead = threading.Thread(target=self.read_output)
self.threadRead.start()
tmp = self.number_start
tmp = tmp + 1
if tmp > self.number_start:
self.number_start = tmp
return 0
def status(self):
""" Get status of program """
logging.debug("status %s" % (self.name))
if self.process:
logging.debug("status %s - check" % (self.name))
code = self.process.poll()
if code is None:
logging.debug("%s status [started]" % (self.name))
return 0
else:
logging.error("%s crashed (return code:%d)" % (self.name, code))
# self.queueIn.put("STOPPED")
return 2
else:
logging.debug("%s status [stopped]" % (self.name))
return 1
def list_thread(self):
""" List number thrad (not used) """
logging.debug('list thread')
# main_thread = threading.currentThread()
for t in threading.enumerate():
logging.debug('thread %s', t.getName())
logging.debug("id %d" % t.ident)
def stop(self):
""" Stop program """
logging.debug("stop %s" % (self.name))
if not self.process:
return 1
else:
try:
code = self.process.poll()
loop = self.maxWaitEnd
while (code is None) and (loop > 0):
logging.debug("stop process %s", self.name)
self.process.send_signal(15)
time.sleep(1)
code = self.process.poll()
loop -= 1
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
try:
loop = self.maxWaitEnd
while (code is None) and (loop > 0):
logging.debug("terminate process %s", self.name)
self.process.terminate()
time.sleep(1)
code = self.process.poll()
loop -= 1
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
try:
loop = self.maxWaitEnd
while (code is None) and (loop > 0):
logging.debug("kill process %s", self.name)
self.process.send_signal(9)
time.sleep(1)
code = self.process.poll()
loop -= 1
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
try:
code = self.process.wait()
self.process.stdin.close()
self.process.stdout.close()
self.process = None
if self.threadRead:
self.eventRunningReader.clear()
self.threadRead.join()
self.threadRead = None
logging.info("%s stopped (return code:%d)" % (self.name, code))
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
return 1
def getlog(self, firstline):
""" Get log """
logging.debug("read log %d " % firstline)
outjson = {}
pos = self.poslastlog - len(self.log) + 1
firstlinefound = 0
for line in self.log:
if pos >= firstline:
outjson.setdefault(pos, line)
if not firstlinefound:
firstlinefound = pos
pos += 1
outjson.setdefault('first-line', firstlinefound)
outjson.setdefault('last-line', pos - 1)
2018-10-09 10:21:50 +00:00
return outjson
2018-10-09 10:21:50 +00:00
def getfilter(self):
""" Get filter """
return self.filter
def getconfig(self):
outjson = {'activate_filter': str(self.activate_filter),
'bufsize': str(self.bufsize),
'size_max_filter': str(self.size_max_filter),
'path': str(self.path),
'add_filter': str(self.add_filter_cmd),
'del_filter': str(self.del_filter_cmd),
'command': str(self.command),
'maxWaitEnd': str(self.maxWaitEnd),
'waitDelay': str(self.waitDelay),
'maxlog': str(self.maxlog),
'filter': str(self.activate_filter),
'egs': str(self.egs_filter)}
2018-10-09 10:21:50 +00:00
return outjson
def getinfo(self):
outjson = {'number_launch': str(self.number_start),
'first_line': str(self.first_line),
'last_line': str(self.last_line),
'number_filter': len(self.filter),
'player_connected': len(self.filter_active_character)}
2018-10-09 10:21:50 +00:00
return outjson
def getplayer(self):
2018-10-09 10:21:50 +00:00
return self.filter_active_character
def getadmincommand(self):
2018-10-09 10:21:50 +00:00
return self.filter_admin
def action(self, action):
""" Send action to program (send input to stdin) """
logging.debug("STDIN '%s'" % action)
if self.process:
code = self.process.poll()
if code is None:
if action:
self.process.stdin.write(bytes(action+'\n', 'UTF-8'))
self.process.stdin.flush()
return "ok"
return "ko"
def run(self):
""" loop, run child (wait command) """
2018-10-09 10:21:50 +00:00
signal.signal(signal.SIGABRT, self.receive_signal)
signal.signal(signal.SIGTERM, self.receive_signal)
statuscmd = {0: 'started', 1: 'stopped', 2: 'crashed'}
loop = True
if self.autostart:
savedstate = self.start()
else:
savedstate = 1
while loop:
logging.debug('wait event %s' % self.name)
msg = self.queueIn.get()
logging.debug("command : '%s'" % msg)
command = msg.split()[0]
if command == "SHUTDOWN":
loop = False
continue
elif command == "START":
savedstate = self.start()
2018-10-09 10:21:50 +00:00
self.queueOut.put({'state': statuscmd[savedstate]})
elif command == "STATUS":
currentstate = self.status()
if currentstate != 1 or savedstate != 2:
savedstate = currentstate
2018-10-09 10:21:50 +00:00
self.queueOut.put({'state': statuscmd[savedstate],
'last_line': str(self.last_line),
'number_launch': str(self.number_start),
'filter': str(self.activate_filter),
'egs': str(self.egs_filter)})
elif command == "STOP":
savedstate = self.stop()
2018-10-09 10:21:50 +00:00
self.queueOut.put({'state': statuscmd[savedstate]})
elif command == "STDIN":
data = msg.split(maxsplit=1)[1]
2018-10-09 10:21:50 +00:00
self.queueOut.put({'state': self.action(data)})
elif command == "STDOUT":
try:
firstline = int(msg.split(maxsplit=1)[1])
except ValueError:
logging.warning("Bad value for param first-line (need integer)")
firstline = 0
except IndexError:
firstline = 0
self.queueOut.put(self.getlog(firstline))
2018-10-09 10:21:50 +00:00
elif command == "FILTER":
self.queueOut.put(self.getfilter())
elif command == "CONFIG":
self.queueOut.put(self.getconfig())
elif command == "INFO":
self.queueOut.put(self.getinfo())
elif command == "PLAYER":
self.queueOut.put(self.getplayer())
elif command == "ADMINCOMMAND":
self.queueOut.put(self.getadmincommand())
elif command == "STOPPED":
currentstate = self.status()
logging.debug('Received event process stopped (current state:%d, saved state:%d)' % (currentstate, savedstate))
if currentstate == 2 and savedstate != 1 and self.restart_after_crash:
logging.debug('Prepare restart')
self.stop()
savedstate = 2
self.eventRunningRestart.clear()
try:
self.threadRestart.terminate()
self.threadRestart.join()
except AttributeError:
pass
self.eventRunningRestart.set()
self.threadRestart = threading.Thread(target=self.restart)
self.threadRestart.start()
else:
logging.warning("Bad command (%s)" % command)
self.queueOut.put({"error": "command unknown"})
logging.debug('Stop %s' % self.name)
self.stop()
logging.debug('prepare end')
self.eventRunningReader.clear()
if self.threadRead:
try:
self.threadRead.join()
except AttributeError:
pass
self.eventRunningRestart.clear()
if self.threadRestart:
try:
self.threadRestart.terminate()
self.threadRestart.join()
except AttributeError:
pass
logging.debug('end')
class Manager():
"""
Manage all services
(read configuration, launch ManageCommand & launch ServerHttp & wait the end)
* https service
* all child to manage (it start ManageCommand by command define in configuration)
"""
def __init__(self, launch_program):
self.threadCommand = []
2018-08-16 19:08:37 +00:00
self.info = {}
self.command = []
self.launch_program = launch_program
self.param = {}
self.users = {}
self.passwordfile = None
2018-02-09 22:29:12 +00:00
self.serverHttp = None
self.port = 8000
self.address = ''
self.keyfile = 'crt/key.pem'
self.certfile = 'crt/cert.pem'
self.ca_cert = 'crt/ca_cert.crt'
self.authentification = False
self.method = 'http'
def load_config(self, filecfg):
if filecfg is None:
raise ValueError
config = configparser.ConfigParser()
config.read_file(filecfg)
self._load_config(config)
def load_password(self):
if self.passwordfile:
with open(self.passwordfile, 'rt') as fp:
for line in fp:
line = line.strip()
if not line:
continue
username, password = line.split(':', maxsplit=1)
self.users.setdefault(username, password)
def _load_config(self, config):
"""
Read configuration object
param: config: configuration object
"""
logging.debug("Sections :%s" % config.sections())
for name in config.sections():
if name == 'config:client':
continue
if name == 'config:user':
continue
elif name == 'config:server':
logging.debug("read config '%s'" % name)
try:
self.port = int(config[name]['port'])
except (TypeError, KeyError, ValueError):
pass
try:
self.address = config[name]['address']
except (TypeError, KeyError):
pass
try:
self.keyfile = config[name]['keyfile']
except (TypeError, KeyError):
pass
try:
self.certfile = config[name]['certfile']
except (TypeError, KeyError):
pass
try:
self.ca_cert = config[name]['ca_cert']
except (TypeError, KeyError):
pass
try:
tmp = config[name]['authentification']
if tmp.upper().strip() == 'YES':
self.authentification = True
else:
self.authentification = False
except (TypeError, KeyError):
pass
try:
self.passwordfile = config[name]['passwordfile']
except (TypeError, KeyError):
pass
try:
self.method = config[name]['method']
except (TypeError, KeyError):
pass
else:
try:
head, value = name.split(':', maxsplit=1)
except ValueError:
logging.warning("ignore bad parameter '%s'" % (name))
continue
if head == 'command' and 'command' in config[name]:
logging.debug("read command '%s'" % name)
if 'path' in config[name]:
path = config[name]['path']
else:
path = None
if 'logsize' in config[name]:
try:
logsize = int(config[name]['logsize'])
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param logsize (command:%s)", name)
raise ValueError
else:
logsize = 100
if 'bufsize' in config[name]:
try:
bufsize = int(config[name]['bufsize'])
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param bufsize (command:%s)", name)
raise ValueError
else:
bufsize = 100
2018-10-09 10:21:50 +00:00
if 'activate_filter' in config[name]:
try:
2018-10-09 10:21:50 +00:00
tmp = config[name]['activate_filter']
if tmp.upper().strip() == 'YES':
2018-10-09 10:21:50 +00:00
activate_filter = True
else:
2018-10-09 10:21:50 +00:00
activate_filter = False
except (TypeError, KeyError, ValueError):
2018-10-09 10:21:50 +00:00
logging.error("Impossible to read param activate_filter (command:%s)", name)
raise ValueError
else:
2018-10-09 10:21:50 +00:00
activate_filter = False
if 'size_max_filter' in config[name]:
try:
2018-10-09 10:21:50 +00:00
size_max_filter = int(config[name]['size_max_filter'])
except (TypeError, KeyError, ValueError):
2018-10-09 10:21:50 +00:00
logging.error("Impossible to read param size_max_filter (command:%s)", name)
raise ValueError
else:
2018-10-09 10:21:50 +00:00
size_max_filter = 100
if 'add_filter' in config[name]:
try:
2018-10-09 10:21:50 +00:00
add_filter = config[name]['add_filter']
except (TypeError, KeyError, ValueError):
2018-10-09 10:21:50 +00:00
logging.error("Impossible to read param add_filter (command:%s)", name)
raise ValueError
else:
2018-10-09 10:21:50 +00:00
add_filter = ''
if 'del_filter' in config[name]:
try:
2018-10-09 10:21:50 +00:00
del_filter = config[name]['del_filter']
except (TypeError, KeyError, ValueError):
2018-10-09 10:21:50 +00:00
logging.error("Impossible to read param del_filter (command:%s)", name)
raise ValueError
else:
2018-10-09 10:21:50 +00:00
del_filter = ''
if 'autostart' in config[name]:
try:
tmp = config[name]['autostart']
if tmp.upper().strip() == 'YES':
autostart = True
else:
autostart = False
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param autostart (command:%s)", name)
raise ValueError
else:
autostart = False
if 'restart_after_crash' in config[name]:
try:
tmp = config[name]['restart_after_crash']
if tmp.upper().strip() == 'YES':
restart_after_crash = True
else:
restart_after_crash = False
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param restart_after_crash (command:%s)", name)
raise ValueError
else:
restart_after_crash = False
if 'restart_delay' in config[name]:
try:
restart_delay = int(config[name]['restart_delay'])
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param restart_delay (command:%s)", name)
raise ValueError
else:
restart_delay = 10
if 'egs_filter' in config[name]:
try:
tmp = config[name]['egs_filter']
if tmp.upper().strip() == 'YES':
egs_filter = True
else:
egs_filter = False
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param autostart (command:%s)", name)
raise ValueError
else:
egs_filter = False
self.param.setdefault(name, {'command': config[name]['command'],
'path': path,
'logsize': logsize,
'bufsize': bufsize,
2018-10-09 10:21:50 +00:00
'activate_filter': activate_filter,
'size_max_filter': size_max_filter,
'add_filter': add_filter,
'del_filter': del_filter,
'autostart': autostart,
'restart_after_crash': restart_after_crash,
'restart_delay': restart_delay,
'egs_filter': egs_filter})
def initialize_http(self):
"""
Initialize object serverHttp
"""
2018-10-09 10:21:50 +00:00
logging.debug("Initialize server http(s)")
self.serverHttp = ServerHttp(self.keyfile,
self.certfile,
self.ca_cert,
self.address,
self.port,
authentification=self.authentification,
method=self.method,
users=self.users)
def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, semaphore,
2018-10-09 10:21:50 +00:00
activate_filter, size_max_filter, add_filter, del_filter,
autostart, restart_after_crash, restart_delay, egs_filter):
"""
Thread to manage OpenNeL program
"""
logging.debug("Initialize '%s'" % name)
manageCommand = ManageCommand(name=name,
command=command,
path=path,
logsize=logsize,
bufsize=bufsize,
queueIn=queueIn,
queueOut=queueOut,
semaphore=semaphore,
2018-10-09 10:21:50 +00:00
activate_filter=activate_filter,
size_max_filter=size_max_filter,
add_filter=add_filter,
del_filter=del_filter,
autostart=autostart,
restart_after_crash=restart_after_crash,
restart_delay=restart_delay,
egs_filter=egs_filter)
manageCommand.run()
def launch_server_http(self):
""" Launch server https """
self.serverHttp.daemon = True
2018-02-09 22:29:12 +00:00
self.serverHttp.start()
def launch_command(self):
""" Launch child to manage each program """
for name in self.param:
logging.debug("Initialize '%s'" % name)
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
# semaphore = multiprocessing.Semaphore()
semaphore = multiprocessing.BoundedSemaphore()
2018-10-09 10:21:50 +00:00
self.serverHttp.appendchild(name, queueIn, queueOut, semaphore)
if self.launch_program:
autostart = True
else:
autostart = self.param[name]['autostart']
threadCommand = multiprocessing.Process(target=self.runCommand,
args=(name,
self.param[name]['command'],
self.param[name]['path'],
self.param[name]['logsize'],
self.param[name]['bufsize'],
queueIn,
queueOut,
semaphore,
2018-10-09 10:21:50 +00:00
self.param[name]['activate_filter'],
self.param[name]['size_max_filter'],
self.param[name]['add_filter'],
self.param[name]['del_filter'],
autostart,
self.param[name]['restart_after_crash'],
self.param[name]['restart_delay'],
self.param[name]['egs_filter']))
threadCommand.start()
self.threadCommand.append(threadCommand)
2018-08-16 19:08:37 +00:00
self.info.setdefault(name, {'queueIn': queueIn,
'queueOut': queueOut,
'semaphore': semaphore,
2018-08-16 19:08:37 +00:00
'threadCommand': threadCommand,
'command': self.param[name]['command'],
'path': self.param[name]['path'],
'logsize': self.param[name]['logsize'],
'bufsize': self.param[name]['bufsize'],
2018-10-09 10:21:50 +00:00
'activate_filter': self.param[name]['activate_filter'],
'size_max_filter': self.param[name]['size_max_filter'],
'add_filter': self.param[name]['add_filter'],
'del_filter': self.param[name]['del_filter'],
'autostart': autostart,
'restart_after_crash': self.param[name]['restart_after_crash'],
'restart_delay': self.param[name]['restart_delay'],
'egs_filter': self.param[name]['egs_filter']})
def receive_signal(self, signum, frame):
""" Managed signal """
2018-10-09 10:21:50 +00:00
logging.info("Received signal (%d)" % (signum))
for child in self.threadCommand:
2018-10-09 10:21:50 +00:00
logging.info("send signal to child %s" % (child.name))
try:
child.terminate()
child.join()
except AttributeError:
logging.info("child not started")
pass
if self.serverHttp:
2018-10-09 10:21:50 +00:00
logging.info("send signal to server http")
try:
self.serverHttp.terminate()
except AttributeError:
pass
2018-10-09 10:21:50 +00:00
logging.info("Finalize signal (%d)" % (signum))
os._exit(0)
def wait_children_commands(self):
for child in self.threadCommand:
child.join()
def wait_child_server_http(self):
self.serverHttp.terminate()
self.serverHttp.join()
def run(self):
""" launch all """
2018-10-09 10:21:50 +00:00
signal.signal(signal.SIGABRT, self.receive_signal)
signal.signal(signal.SIGTERM, self.receive_signal)
self.launch_command()
self.launch_server_http()
logging.info('started')
self.wait_children_commands()
logging.info('execute shutdown')
signal.alarm(0)
logging.info('wait thread http')
time.sleep(1)
self.wait_child_server_http()
logging.info('shutdown completed')
def root(filecfg, fileLog, logLevel, launch_program, show_log_console):
"""
Main function
:param str filecfg: configuration file
:param str fileLog: log file
:param bool launch_program: do you launch program when you start manager (auto start)
:param bool show_log_console: do you need show log on console
"""
# Manage log
logging.getLogger('logging')
numeric_level = getattr(logging, logLevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % logLevel)
handlers = []
if show_log_console:
handlers.append(logging.StreamHandler())
if fileLog:
handlers.append(logging.FileHandler(fileLog.name))
logging.basicConfig(handlers=handlers, level=numeric_level,
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
if filecfg is None:
logging.error("Missing configuration file")
raise ValueError
manager = Manager(launch_program)
manager.load_config(filecfg)
manager.load_password()
manager.initialize_http()
manager.run()
def main(args=sys.argv[1:]):
""" Main function
:param list args: all arguments ('--help, '--version', ...)
"""
parser = argparse.ArgumentParser(description='Manage OpenNeL process')
parser.add_argument('--version', action='version', version='%(prog)s ' + __VERSION__)
parser.add_argument('-c', '--conf', type=argparse.FileType('r'),
default='opennel_manager.cfg', help='configuration file')
parser.add_argument('--show-log-console', action='store_true',
help='show message in console', default=False)
parser.add_argument('--filelog', type=argparse.FileType('wt'),
default=None, help='log file')
parser.add_argument('--log',
default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR')
parser.add_argument('--launch-program', action='store_true',
help='launch program when start manager', default=False)
param = parser.parse_args(args)
root(filecfg=param.conf,
fileLog=param.filelog,
logLevel=param.log,
launch_program=param.launch_program,
show_log_console=param.show_log_console)
2018-10-09 10:21:50 +00:00
logging.debug("End")
2018-08-16 19:08:37 +00:00
if __name__ == '__main__':
main()