adding comment, and update exception on script manage.py

This commit is contained in:
Jerome Sagnole 2017-11-05 23:37:02 +01:00
parent dc84b3d9b9
commit 5aab6b97f1

View file

@ -21,16 +21,25 @@
Manage all process khaganat Manage all process khaganat
Launch this prorgam in background and use clientManager to manipulate process Launch this prorgam in background and use clientManager to manipulate process
you can launch command : Design
[POST] SHUTDOWN : Stop all process and stop manager + Manager
[POST] STARTALL : Start all process + -> ManageCommand [NumCommand]
[GET] STATUSALL : Get status all process | + -> read_output (thread)
[POST] STOPALL : Stop all process + -> ServerHttp -> khaganatHTTPServer
[POST] START {'name': program} : Start one program | + -> ManageHttpRequest (each request are send on this class) [NumRequestHttp]
[POST] ACTION {'name': program, 'action' : action} : Send action one program (send to input program)
[GET] STATUS {'name': program} : Get status one program http(s) command :
[POST] STOP {'name': program} : Stop one program [Method] /Path {json data}
[GET] LOG {'name': program, 'first-line': firstline } : Get log for one program --------------------------
[POST] /SHUTDOWN : Stop all process and stop manager
[POST] /STARTALL : Start all process
[GET] /STATUSALL : Get status all process
[POST] /STOPALL : Stop all process
[POST] /START {'name': program} : Start one program
[POST] /ACTION {'name': program, 'action' : action} : Send action one program (send to input program)
[GET] /STATUS {'name': program} : Get status one program
[POST] /STOP {'name': program} : Stop one program
[GET] /LOG {'name': program, 'first-line': firstline } : Get log for one program
Configuration File : This script need configuration file (see below for model) Configuration File : This script need configuration file (see below for model)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -95,7 +104,8 @@ import os
class ManageHttpRequest(http.server.SimpleHTTPRequestHandler): class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
""" """
Class received all request and send to manager process Class ManageHttpRequest receive all request https
* analyze and send to ManageCommand (with queueIn & queueOut)
""" """
def __init__(self, request, client_address, server): def __init__(self, request, client_address, server):
""" Initialize object """ """ Initialize object """
@ -121,18 +131,15 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
else: else:
ctype = 'text' ctype = 'text'
if ctype != 'application/json': if ctype != 'application/json':
self.send_response(400) logging.error("Received request with bad content-type")
self.send_response(400, "bad content-type")
self.end_headers() self.end_headers()
return return
if 'content-length' in self.headers: try:
try: sizemsg = int(self.headers['content-length'])
sizemsg = int(self.headers['content-length']) except (TypeError, KeyError, ValueError):
except: logging.error("Received request with bad content-length")
self.send_response(400) self.send_response(400, "bad content-length")
self.end_headers()
return
else:
self.send_response(400)
self.end_headers() self.end_headers()
return return
@ -141,25 +148,25 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
logging.debug(msgjson) logging.debug(msgjson)
if 'name' not in msgjson: if 'name' not in msgjson:
self.send_error(400,'Missing param name') self.send_error(400, 'Missing param name')
logging.error("Missing param name") logging.error("Missing param name")
return return
name = msgjson['name'] name = msgjson['name']
if name not in self.server.listQueueIn: if name not in self.server.listQueueIn:
self.send_error(400,'Name unknown') self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name) logging.error("Name unknwon '%s'" % name)
return return
if 'first-line' not in msgjson: if 'first-line' not in msgjson:
self.send_error(400,'Missing param first-line') self.send_error(400, 'Missing param first-line')
logging.error("Missing param first-line '%s'" % name) logging.error("Missing param first-line '%s'" % name)
return return
firstLine = 0 firstLine = 0
try: try:
firstLine = int(msgjson['first-line']) firstLine = int(msgjson['first-line'])
except: except ValueError:
self.send_error(400,'Impossible to read first-line') self.send_error(400, 'Impossible to read first-line')
logging.error("Impossible to read first-line '%s'" % msgjson['first-line']) logging.error("Impossible to read first-line '%s'" % msgjson['first-line'])
return return
logging.debug("%s:%s" % (name, firstLine)) logging.debug("%s:%s" % (name, firstLine))
@ -190,7 +197,6 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
for name in self.server.listQueueIn: for name in self.server.listQueueIn:
self.server.listEvent[name].set() self.server.listEvent[name].set()
self.server.listQueueIn[name].put("SHUTDOWN") self.server.listQueueIn[name].put("SHUTDOWN")
self._set_headers() self._set_headers()
outjson = {'shutdown':'ok'} outjson = {'shutdown':'ok'}
self.wfile.write(bytes(json.dumps(outjson), "utf-8")) self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
@ -201,14 +207,12 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
for name in self.server.listQueueIn: for name in self.server.listQueueIn:
self.server.listEvent[name].set() self.server.listEvent[name].set()
self.server.listQueueIn[name].put(action) self.server.listQueueIn[name].put(action)
try: try:
item = self.server.listQueueOut[name].get(timeout = 4) item = self.server.listQueueOut[name].get(timeout = 4)
except queue.Empty: except queue.Empty:
logging.debug("pas de message recu pour %s" % name) logging.debug("pas de message recu pour %s" % name)
return return
outjson.setdefault(name, item) outjson.setdefault(name, item)
self._set_headers() self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8")) self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
@ -219,18 +223,15 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
else: else:
ctype = 'text' ctype = 'text'
if ctype != 'application/json': if ctype != 'application/json':
self.send_response(400) logging.error("Bad content-type")
self.send_response(400, "bad content-type")
self.end_headers() self.end_headers()
return return
if 'content-length' in self.headers: try:
try: sizemsg = int(self.headers['content-length'])
sizemsg = int(self.headers['content-length']) except (TypeError, KeyError, ValueError):
except: logging.error("Bad content-length")
self.send_response(400) self.send_response(400, "bad content-length")
self.end_headers()
return
else:
self.send_response(400)
self.end_headers() self.end_headers()
return return
@ -239,26 +240,26 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
logging.debug(msgjson) logging.debug(msgjson)
if 'name' not in msgjson: if 'name' not in msgjson:
self.send_error(400,'Missing param name') self.send_error(400, 'Missing param name')
logging.error("Missing param name") logging.error("Missing param name")
return return
name = msgjson['name'] name = msgjson['name']
if name not in self.server.listQueueIn: if name not in self.server.listQueueIn:
self.send_error(400,'Name unknown') self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name) logging.error("Name unknwon '%s'" % name)
return return
if 'action' not in msgjson: if 'action' not in msgjson:
self.send_error(400,'Missing param action') self.send_error(400, 'Missing param action')
logging.error("Missing param action '%s'" % name) logging.error("Missing param action '%s'" % name)
return return
action = '' action = ''
try: try:
action = msgjson['action'] action = msgjson['action']
except: except KeyError:
self.send_error(400,'Impossible to read action')
logging.error("Impossible to read first-line '%s'" % msgjson['action']) logging.error("Impossible to read first-line '%s'" % msgjson['action'])
self.send_error(400, 'Impossible to read action')
return return
logging.debug("%s:%s" % (name, action)) logging.debug("%s:%s" % (name, action))
self.server.listEvent[name].set() self.server.listEvent[name].set()
@ -281,39 +282,35 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
else: else:
ctype = 'text' ctype = 'text'
if ctype != 'application/json': if ctype != 'application/json':
self.send_response(400) logging.error("Bad content-type")
self.send_response(400, "Bad content-type")
self.end_headers() self.end_headers()
return return
if 'content-length' in self.headers: try:
try: sizemsg = int(self.headers['content-length'])
sizemsg = int(self.headers['content-length']) except (TypeError, KeyError, ValueError):
except: logging.error("Bad content-length")
self.send_response(400) self.send_response(400, "Bad content-length")
self.end_headers()
return
else:
self.send_response(400)
self.end_headers() self.end_headers()
return return
msg = self.rfile.read(sizemsg) msg = self.rfile.read(sizemsg)
msgjson = json.loads(msg.decode()) msgjson = json.loads(msg.decode())
if 'name' not in msgjson: if 'name' not in msgjson:
self.send_error(400,'Missing param name') self.send_error(400, 'Missing param name')
logging.error("Missing param name") logging.error("Missing param name")
return return
name = msgjson['name'] name = msgjson['name']
if name not in self.server.listQueueIn: if name not in self.server.listQueueIn:
self.send_error(400,'Name unknown') self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name) logging.error("Name unknwon '%s'" % name)
return return
logging.debug("[%s %s] Send command" % (command, name)) logging.debug("[%s %s] Send command" % (command, name))
self.server.listEvent[name].set() self.server.listEvent[name].set()
logging.debug("[%s %s] Sent command" % (command, name))
self.server.listQueueIn[name].put(command) self.server.listQueueIn[name].put(command)
try: try:
result = self.server.listQueueOut[name].get(timeout = 4) result = self.server.listQueueOut[name].get(timeout = 4)
except queue.Empty: except queue.Empty:
self.send_error(500,'Missing return') self.send_error(500, 'Missing return')
logging.debug("[%s %s] Missing return" % (command, name)) logging.debug("[%s %s] Missing return" % (command, name))
return return
logging.debug("[%s %s] => %s" % (command, name, result)) logging.debug("[%s %s] => %s" % (command, name, result))
@ -323,8 +320,9 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
self.wfile.write(bytes(json.dumps(outjson), "utf-8")) self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def do_GET(self): # READ def do_GET(self): # READ
""" Manage request READ """
currently, we execute LOG, STATUS & LIST Manage request READ
we can execute LOG, STATUS, LIST & STATUSALL
""" """
logging.debug('get recieved : %s' % self.path) logging.debug('get recieved : %s' % self.path)
if self.path == '/LOG': if self.path == '/LOG':
@ -336,13 +334,13 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
elif self.path == '/STATUSALL': elif self.path == '/STATUSALL':
self.send_command_all("STATUS") self.send_command_all("STATUS")
else: else:
self.send_error(400,'Path unknown') self.send_error(400, 'Path unknown')
logging.error("Path unknwon '%s'" % self.path) logging.error("Path unknwon '%s'" % self.path)
return return
def do_POST(self): # CREATE def do_POST(self): # CREATE
""" Manage request POST """ Manage request POST
currently, we execute START, STOP, ACTION & SHUTDOWN currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL
""" """
logging.debug('post recieved : %s' % self.path) logging.debug('post recieved : %s' % self.path)
if self.path == '/START': if self.path == '/START':
@ -384,6 +382,7 @@ class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
class khaganatHTTPServer(http.server.HTTPServer): class khaganatHTTPServer(http.server.HTTPServer):
""" """
Class khaganatHTTPServer Class khaganatHTTPServer
Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest)
""" """
def __init__(self, def __init__(self,
listQueueIn, listQueueIn,
@ -398,7 +397,10 @@ class khaganatHTTPServer(http.server.HTTPServer):
self.listEvent = listEvent self.listEvent = listEvent
class ServerHttp(multiprocessing.Process): class ServerHttp(multiprocessing.Process):
""" Initialize server HTTPS """ """
Initialize server HTTPS
* define Dictionnary queueIn & queueOut (with key as section's name in configuration)
"""
def __init__(self, keyfile, certfile, address = '', port=8000): def __init__(self, keyfile, certfile, address = '', port=8000):
multiprocessing.Process.__init__(self) multiprocessing.Process.__init__(self)
self.listQueueIn = {} self.listQueueIn = {}
@ -431,7 +433,10 @@ class ServerHttp(multiprocessing.Process):
class ManageCommand(): class ManageCommand():
""" """
Thread manage all program Manage Command (only one)
* start/stop/status/get log/send an action [stdin] for command (receive order with queueIn)
* read output [in other thread]
* communicate with ManageHttpRequest (with queueOut)
""" """
def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event): def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
self.process = None self.process = None
@ -457,22 +462,18 @@ class ManageCommand():
fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
logging.debug("Start reader %s " % self.name) logging.debug("Start reader %s " % self.name)
while self.eventRunning.is_set(): while self.eventRunning.is_set():
#logging.debug("Start reader %s " % self.name) line = self.process.stdout.readline()
try: if not line:
line = self.process.stdout.readline() time.sleep(1)
if not line:
time.sleep(1)
continue
now = time.strftime('%Y/%m/%d %H:%M:%S %Z')
logging.debug("line %s " % line)
self.poslastlog += 1
while len(self.log) >= self.maxlog:
self.log.pop(0)
msg = line.decode().strip()
self.log.append(now + ' ' + msg)
logging.debug("recu: '%s'" %(msg))
except:
continue continue
now = time.strftime('%Y/%m/%d %H:%M:%S %Z')
logging.debug("line %s " % line)
self.poslastlog += 1
while len(self.log) >= self.maxlog:
self.log.pop(0)
msg = line.decode().strip()
self.log.append(now + ' ' + msg)
logging.debug("recu: '%s'" %(msg))
logging.debug("End reader: '%s'" % self.name) logging.debug("End reader: '%s'" % self.name)
def handler(self, signum, frame): def handler(self, signum, frame):
@ -643,7 +644,7 @@ class ManageCommand():
elif command == "LOG": elif command == "LOG":
try: try:
firstline = int(msg.split(maxsplit=1)[1]) firstline = int(msg.split(maxsplit=1)[1])
except: except ValueError:
firstline = 0 firstline = 0
self.queueOut.put(self.getlog(firstline)) self.queueOut.put(self.getlog(firstline))
else: else:
@ -655,9 +656,12 @@ class ManageCommand():
class Manager(): class Manager():
""" Manage all services """
Manage all services
(read configuration, launch ManageCommand & launch ServerHttp & wait the end)
* https service * https service
* all child to manage each program * all child to manage (it start ManageCommand by command define in configuration)
""" """
def __init__(self, filecfg, launch_program): def __init__(self, filecfg, launch_program):
self.threadCommand = [] self.threadCommand = []
@ -673,19 +677,19 @@ class Manager():
logging.debug("read config '%s'" % name) logging.debug("read config '%s'" % name)
try: try:
port = int(config[name]['port']) port = int(config[name]['port'])
except: except (TypeError, KeyError, ValueError):
port = 8000 port = 8000
try: try:
address = config[name]['address'] address = config[name]['address']
except: except (TypeError, KeyError):
address = '' address = ''
try: try:
keyfile = config[name]['keyfile'] keyfile = config[name]['keyfile']
except: except (TypeError, KeyError):
keyfile = 'crt/key.pem' keyfile = 'crt/key.pem'
try: try:
certfile = config[name]['certfile'] certfile = config[name]['certfile']
except: except (TypeError, KeyError):
certfile = 'crt/cert.pem' certfile = 'crt/cert.pem'
elif 'command' in config[name]: elif 'command' in config[name]:
logging.debug("read command '%s'" % name) logging.debug("read command '%s'" % name)
@ -696,7 +700,7 @@ class Manager():
if 'logsize' in config[name]: if 'logsize' in config[name]:
try: try:
logsize = int(config[name]['logsize']) logsize = int(config[name]['logsize'])
except: except (TypeError, KeyError, ValueError):
logsize = 100 logsize = 100
logging.warning("Impossible to read param logsize (command:%s)", name) logging.warning("Impossible to read param logsize (command:%s)", name)
else: else:
@ -704,7 +708,7 @@ class Manager():
if 'bufsize' in config[name]: if 'bufsize' in config[name]:
try: try:
bufsize = int(config[name]['bufsize']) bufsize = int(config[name]['bufsize'])
except: except (TypeError, KeyError, ValueError):
bufsize = 100 bufsize = 100
logging.warning("Impossible to read param bufsize (command:%s)", name) logging.warning("Impossible to read param bufsize (command:%s)", name)
else: else:
@ -744,14 +748,14 @@ class Manager():
event = multiprocessing.Event() event = multiprocessing.Event()
self.serverHttp.append(name, queueIn, queueOut, event) self.serverHttp.append(name, queueIn, queueOut, event)
threadCommand = multiprocessing.Process(target=self.runCommand, threadCommand = multiprocessing.Process(target=self.runCommand,
args=(name, args=(name,
self.param[name]['command'], self.param[name]['command'],
self.param[name]['path'], self.param[name]['path'],
self.param[name]['logsize'], self.param[name]['logsize'],
self.param[name]['bufsize'], self.param[name]['bufsize'],
queueIn, queueIn,
queueOut, queueOut,
event)) event))
threadCommand.start() threadCommand.start()
if self.launch_program: if self.launch_program:
event.set() event.set()
@ -773,19 +777,25 @@ class Manager():
if self.serverHttp: if self.serverHttp:
self.serverHttp.terminate() self.serverHttp.terminate()
def wait_children_commands(self):
for child in self.threadCommand:
child.join()
def wait_child_server_http(self):
self.serverHttp.terminate()
self.serverHttp.join()
def run(self): def run(self):
""" launch all """ """ launch all """
self.launch_command() self.launch_command()
self.launch_server_http() self.launch_server_http()
logging.info('started') logging.info('started')
for child in self.threadCommand: self.wait_children_commands()
child.join()
logging.info('end') logging.info('end')
signal.alarm(0) signal.alarm(0)
logging.info('wait thread http') logging.info('wait thread http')
time.sleep(1) time.sleep(1)
self.serverHttp.terminate() self.wait_child_server_http()
self.serverHttp.join()
logging.info('end') logging.info('end')