opennel-pymanager/pymanager/manager.py

1075 lines
42 KiB
Python
Executable file

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# script to start/stop/status/send command/read log for khaganat process
#
# Copyright (C) 2017 AleaJactaEst
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Configuration File
------------------
This script need configuration file (see below for model)::
[config:server]
# Define port listen (default 8000)
port = 8000
# Example to generate all key : see pycertificate
# key
keyfile = /home/gameserver/ca/appli/private/serverkey.pem
# certificate
certfile = /home/gameserver/ca/appli/certs/servercert.pem
# certification to check signature
ca_cert = /home/gameserver/ca/appli/certs/cachaincert.pem
# address listen (default all port)
address =
# method : http or https
method = https
# Admin Executor Service
[command:aes]
# command to launch the program
command = ryzom_admin_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/log/khanat --nobreak --fulladminname=admin_executor_service --shortadminname=AES
# Path : where this program is launched
path = /home/gameserver/khanat/server/
# size buffer log for each program launched (number line stdout)
logsize = 1000
# buffer size (define value bufsize on subprocess.Popen, this buffer is use before read by manager)
bufsize = 100
# bms_master : backup_service
[command:bms_master]
# command to launch the program
command = ryzom_backup_service -A/home/gameserver/khanat/server -C/home/gameserver/khanat/server -L/home/gameserver/khanat/server/log --nobreak --writepid -P49990
# Path : where this program is launched
path = /home/gameserver/khanat/server/
# we keep [logsize] last number line stdout
logsize = 1000
# buffer size (define value bufsize on subprocess.Popen)
bufsize = 100
Manager
-------
Manage all process khaganat
Launch this prorgam in background and use clientManager to manipulate process
Design
.. graphviz::
digraph Manager {
"Manager" -> "ManageCommand (command 1)";
"ManageCommand (command 1)" -> "read_output (thread1)";
"Manager" -> "ManageCommand (command 2)";
"ManageCommand (command 2)" -> "read_output (thread2)";
"Manager" -> "ServerHttp";
"ServerHttp" -> "khaganatHTTPServer";
"khaganatHTTPServer" -> "ManageHttpRequest";
"ManageHttpRequest" -> "ManageCommand (command 1)" [style=dashed];
"ManageHttpRequest" -> "ManageCommand (command 2)" [style=dashed];
}
http(s) command :
-----------------
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **Html command** | **Path** | **Argument** {json format} | **Comment** |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **POST** | /SHUTDOWN | | Stop all process and stop pymanager |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **POST** | /STARTALL | | Start all processes |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **GET** | /STATUSALL | | Get status all processes |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **POST** | /STOPALL | | Stop all processes |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **POST** | /START | {'name': program} | Start for one program |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **POST** | /STDIN | {'name': program, 'action': action} | Send action for one program (send to input) |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **GET** | /STATUS | {'name': program} | Get status for one program |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **POST** | /STOP | {'name': program} | Stop for one program |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
| **GET** | /STDOUT | {'name': program, 'first-line': firstline } | Get log for one program |
+------------------+-------------+---------------------------------------------+---------------------------------------------+
Example ::
nohup pymanager --log info --filelog /home/gameserver/log/manager.log -c /home/gameserver/cfg/khaganat.cfg 2>/dev/null 1>/dev/null 0</dev/zero &
"""
import sys
import subprocess
import queue
import threading
import signal
import argparse
import configparser
import logging
import logging.config
import multiprocessing
import time
import ssl
import http.server
import json
import fcntl
import os
import base64
from socketserver import ThreadingMixIn
try:
import bcrypt
__DISABLE_BCRYPT__ = False
except ImportError:
__DISABLE_BCRYPT__ = True
__VERSION__ = '1.0'
class ManageHttpRequest(http.server.SimpleHTTPRequestHandler):
"""
Class ManageHttpRequest receive all request https
* analyze and send to ManageCommand (with queueIn & queueOut)
Methods inherited from SimpleHTTPRequestHandler:
"""
def __init__(self, request, client_address, server):
""" Initialize object """
http.server.SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
def log_message(self, format, *args):
""" function use to send message in log
:param str format: format message
:param list args: argument
"""
logging.info("%s (%s) %s" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def _set_headers(self):
""" Prepare header """
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header("Access-Control-Allow-Headers", "Content-Type, *")
self.end_headers()
def _extract_input_data(self):
""" sub request log (send log on specific process) """
if 'content-type' in self.headers:
ctype = self.headers['content-type']
else:
ctype = 'text'
if ctype != 'application/json':
logging.error("Received request with bad content-type")
self.send_error(400, "bad content-type")
self.end_headers()
return None
try:
sizemsg = int(self.headers['content-length'])
except (TypeError, KeyError, ValueError):
logging.error("Received request with bad content-length")
self.send_error(400, "bad content-length")
self.end_headers()
return None
msg = self.rfile.read(sizemsg)
try:
msgjson = json.loads(msg.decode())
except json.decoder.JSONDecodeError as e:
logging.error("Received request with json (%s)" % str(e))
return None
return msgjson
def _command_log(self):
""" sub request log (send log on specific process) """
msgjson = self._extract_input_data()
if msgjson is None:
return
logging.debug(msgjson)
if 'name' not in msgjson:
self.send_error(400, 'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
if 'first-line' not in msgjson:
self.send_error(400, 'Missing param first-line')
logging.error("Missing param first-line '%s'" % name)
return
firstLine = 0
try:
firstLine = int(msgjson['first-line'])
except ValueError:
self.send_error(400, 'Impossible to read first-line')
logging.error("Impossible to read first-line '%s'" % msgjson['first-line'])
return
logging.debug("%s:%s" % (name, firstLine))
self.server.listEvent[name].set()
self.server.listQueueIn[name].put("STDOUT %d" % firstLine)
logging.debug("Send request to '%s'" % (name))
try:
item = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
logging.debug("Received nothing from '%s'" % name)
return
self._set_headers()
self.wfile.write(bytes(item, "utf-8"))
logging.debug("item : %s" % item)
def _send_list(self):
""" sub-request to list all program available """
outjson = {}
pos = 0
for program in self.server.listQueueIn:
outjson.setdefault(pos, program)
pos += 1
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def _send_shutdown(self):
""" Stop all program and stop manager """
for name in self.server.listQueueIn:
self.server.listEvent[name].set()
self.server.listQueueIn[name].put("SHUTDOWN")
self._set_headers()
outjson = {'shutdown': 'ok'}
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def _send_command_all(self, command):
""" Send specific command on all program
:param str action: command (START, STOP, STATUS, ... )
"""
outjson = {}
for name in self.server.listQueueIn:
self.server.listEvent[name].set()
self.server.listQueueIn[name].put(command)
try:
item = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
logging.debug("pas de message recu pour %s" % name)
return
outjson.setdefault(name, item)
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def _send_action(self):
""" send specific action on one program """
msgjson = self._extract_input_data()
if msgjson is None:
return
logging.debug(msgjson)
if 'name' not in msgjson:
self.send_error(400, 'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
if 'action' not in msgjson:
self.send_error(400, 'Missing param action')
logging.error("Missing param action '%s'" % name)
return
action = msgjson['action']
logging.debug("%s:%s" % (name, action))
self.server.listEvent[name].set()
self.server.listQueueIn[name].put("STDIN %s" % action)
logging.debug("message envoye: %s" % (name))
try:
result = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
logging.debug("pas de message recu pour %s" % name)
return
outjson = {'state': result}
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def _send_command(self, command):
""" Send specific command on one program
:param str command: command (START, STOP, STATUS, ... )
"""
msgjson = self._extract_input_data()
if msgjson is None:
return
if 'name' not in msgjson:
self.send_error(400, 'Missing param name')
logging.error("Missing param name")
return
name = msgjson['name']
if name not in self.server.listQueueIn:
self.send_error(400, 'Name unknown')
logging.error("Name unknwon '%s'" % name)
return
logging.debug("[%s %s] Send command" % (command, name))
self.server.listEvent[name].set()
self.server.listQueueIn[name].put(command)
try:
result = self.server.listQueueOut[name].get(timeout=4)
except queue.Empty:
self.send_error(500, 'Missing return')
logging.debug("[%s %s] Missing return" % (command, name))
return
logging.debug("[%s %s] => %s" % (command, name, result))
outjson = {'state': result}
self._set_headers()
self.wfile.write(bytes(json.dumps(outjson), "utf-8"))
def check_authentication(self):
if not self.server.authentification:
return True
if __DISABLE_BCRYPT__:
logging.error("Error module python bcrypt not installed")
return False
try:
auth_header = self.headers['Authorization'].split()
if auth_header[0] != 'Basic':
logging.error("Authentification with Bad method (%s)" % auth_header[0])
return False
decode = base64.b64decode(auth_header[1]).decode('UTF-8')
account, password = decode.split(':', maxsplit=1)
if account not in self.server.users:
logging.error("Authentification with unknown user (%s)" % account)
return False
hashed_password = self.server.users[account]
if bcrypt.checkpw(password.encode('utf-8'), hashed_password):
return True
else:
logging.error("Authentification with wrong password for user (%s)" % account)
return False
except (ValueError, IndexError, AttributeError) as e:
logging.error("Error detected %s" % e)
return False
def do_GET(self):
"""
Manage request READ
we can execute LOG, STATUS, LIST & STATUSALL
"""
logging.debug('get recieved : %s' % self.path)
if not self.check_authentication():
self.send_error(403, 'Wrong authentication')
logging.error("Wrong authentication")
elif self.path == '/STDOUT':
self._command_log()
elif self.path == '/STATUS':
self._send_command("STATUS")
elif self.path == '/LIST':
self._send_list()
elif self.path == '/STATUSALL':
self._send_command_all("STATUS")
else:
self.send_error(400, 'Path unknown')
logging.error("Path unknwon '%s'" % self.path)
def do_POST(self):
""" Manage request POST (CREATE)
currently, we execute START, STOP, ACTION, SHUTDOWN, STARTALL & STOPALL
"""
logging.debug('post recieved : %s' % self.path)
if not self.check_authentication():
self.send_error(403, 'Wrong authentication')
logging.error("Wrong authentication")
elif self.path == '/START':
self._send_command("START")
elif self.path == '/STOP':
self._send_command("STOP")
elif self.path == '/STDIN':
self._send_action()
elif self.path == '/SHUTDOWN':
self._send_shutdown()
elif self.path == '/STARTALL':
self._send_command_all("START")
elif self.path == '/STOPALL':
self._send_command_all("STOP")
else:
self.send_error(400, 'Path unknown')
logging.error("Path unknwon '%s'" % self.path)
def do_HEAD(self):
""" request HEAD received """
logging.debug('head recieved : %s' % self.path)
self.send_error(404, 'File Not Found: %s' % self.path)
def do_PUT(self):
""" request PUT (UPDATE/REPLACE) received """
logging.debug('put recieved!')
self.send_error(404, 'File Not Found: %s' % self.path)
def do_PATCH(self):
""" request PATCH (UPDATE/MODIFY) received """
logging.debug('patch recieved!')
self.send_error(404, 'File Not Found: %s' % self.path)
def do_DELETE(self):
""" request DELETE received """
logging.debug('delete recieved!')
self.send_error(404, 'File Not Found: %s' % self.path)
def do_OPTIONS(self):
""" request OPTIONS received """
self.send_response(200, "ok")
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header("Access-Control-Allow-Headers", "Content-Type, *")
self.end_headers()
class khaganatHTTPServer(ThreadingMixIn, http.server.HTTPServer):
"""
Class khaganatHTTPServer
Redefine HTTPServer (adding queue input & queue output, use by ManageHttpRequest)
"""
def __init__(self,
listQueueIn,
listQueueOut,
listEvent,
server_address,
RequestHandlerClass,
authentification,
users,
bind_and_activate=True):
http.server.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)
self.listQueueIn = listQueueIn
self.listQueueOut = listQueueOut
self.listEvent = listEvent
self.authentification = authentification
self.users = users
class ServerHttp(multiprocessing.Process):
"""
Initialize server HTTPS
* define Dictionnary queueIn & queueOut (with key as section's name in configuration)
"""
def __init__(self, keyfile, certfile, ca_cert, address='',
port=8000, authentification=True, method='http', users={}):
multiprocessing.Process.__init__(self)
self.listQueueIn = {}
self.listQueueOut = {}
self.listEvent = {}
self.port = port
self.key_file = keyfile
self.cert_file = certfile
self.ca_cert = ca_cert
self.address = address
self.authentification = authentification
self.users = users
self.method = method
def run(self):
server_address = (self.address, self.port)
httpd = khaganatHTTPServer(self.listQueueIn,
self.listQueueOut,
self.listEvent,
server_address,
ManageHttpRequest,
self.authentification,
self.users)
if self.method == 'http':
logging.info('http listen')
elif self.method == 'https':
if self.ca_cert:
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=self.key_file,
certfile=self.cert_file,
ca_certs=self.ca_cert,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1_2,
server_side=True)
else:
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=self.key_file,
certfile=self.cert_file,
server_side=True)
logging.info('https listen')
else:
logging.error("Bad value 'method' (%s)" % str(self.method))
raise ValueError
httpd.serve_forever()
def append(self, name, queueIn, queueOut, event):
self.listQueueIn.setdefault(name, queueIn)
self.listQueueOut.setdefault(name, queueOut)
self.listEvent.setdefault(name, event)
class ManageCommand():
"""
Manage Command (only one)
* start/stop/status/get log/send an action [stdin] for command (receive order with queueIn)
* read output [in other thread]
* communicate with ManageHttpRequest (with queueOut)
"""
def __init__(self, name, command, path, logsize, bufsize, queueIn, queueOut, event, maxWaitEnd=10, waitDelay=1):
self.process = None
self.queueIn = queueIn
self.queueOut = queueOut
self.name = name
self.command = command
self.path = path
self.log = []
self.poslastlog = 0
self.maxlog = logsize
self.event = event
self.bufsize = bufsize
self.threadRead = None
self.running = False
self.state = multiprocessing.Queue()
self.pipeIn, self.pipeOut = multiprocessing.Pipe()
self.eventRunning = threading.Event()
self.maxWaitEnd = maxWaitEnd
self.waitDelay = waitDelay
def read_output(self):
""" Thread to read output (stdout) """
fl = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
logging.debug("Start reader %s" % self.name)
while self.eventRunning.is_set():
code = self.process.poll()
if code is not None:
logging.error("process %s down" % self.name)
self.eventRunning.clear()
continue
try:
line = self.process.stdout.readline()
except AttributeError:
logging.error("process %s down (not detected)" % self.name)
self.eventRunning.clear()
continue
if not line:
time.sleep(self.waitDelay)
continue
now = time.strftime('%Y/%m/%d %H:%M:%S %Z')
logging.debug("line %s " % line)
self.poslastlog += 1
while len(self.log) >= self.maxlog:
self.log.pop(0)
msg = line.decode().strip()
self.log.append(now + ' ' + msg)
logging.debug("recu: '%s'" % (msg))
logging.debug("End reader: '%s'" % self.name)
def handler(self, signum, frame):
""" Managed signal (not used) """
if self.process:
# logging.debug("Send signal %d to '%s'" %(signum, self.name))
self.process.send_signal(signum)
else:
logging.error("Impossible to send signal %d to '%s'" % (signum, self.name))
raise IOError("signal received")
def start(self):
""" Start program """
logging.debug("start %s" % (self.name))
if self.process:
logging.debug("%s already exist" % self.name)
code = self.process.poll()
if code is None:
logging.debug("%s already exist" % self.name)
return "already-started"
else:
logging.debug("%s crashed" % self.name)
code = self.process.wait()
logging.error("%s crashed (return code:%d) - restart program" % (self.name, code))
try:
self.process = subprocess.Popen(self.command.split(),
cwd=self.path,
shell=False,
bufsize=self.bufsize,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True)
except FileNotFoundError as e:
logging.error("Impossible to start %s (%s)" % (self.name, e))
return "crashed"
except PermissionError as e:
logging.error("Impossible to start %s (%s)" % (self.name, e))
return "crashed"
self.eventRunning.set()
if self.threadRead:
self.eventRunning.clear()
self.threadRead.join()
self.threadRead = None
self.running = True
self.threadRead = threading.Thread(target=self.read_output)
self.threadRead.start()
return "started"
def status(self):
""" Get status of program """
logging.debug("status %s" % (self.name))
if self.process:
logging.debug("status %s - check" % (self.name))
code = self.process.poll()
if code is None:
logging.debug("%s status" % (self.name))
return "started"
else:
logging.error("%s crashed (return code:%d)" % (self.name, code))
self.process = None
return "stopped"
else:
logging.debug("%s status" % (self.name))
return "stopped"
def list_thread(self):
""" List number thrad (not used) """
logging.debug('list thread')
# main_thread = threading.currentThread()
for t in threading.enumerate():
logging.debug('thread %s', t.getName())
logging.debug("id %d" % t.ident)
def stop(self):
""" Stop program """
logging.debug("stop %s" % (self.name))
if not self.process:
return "stopped"
else:
try:
code = self.process.poll()
loop = self.maxWaitEnd
while (code is None) and (loop > 0):
logging.debug("stop process %s", self.name)
self.process.send_signal(15)
time.sleep(1)
code = self.process.poll()
loop -= 1
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
try:
loop = self.maxWaitEnd
while (code is None) and (loop > 0):
logging.debug("terminate process %s", self.name)
self.process.terminate()
time.sleep(1)
code = self.process.poll()
loop -= 1
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
try:
loop = self.maxWaitEnd
while (code is None) and (loop > 0):
logging.debug("kill process %s", self.name)
self.process.send_signal(9)
time.sleep(1)
code = self.process.poll()
loop -= 1
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
try:
code = self.process.wait()
self.process = None
if self.threadRead:
self.eventRunning.clear()
self.threadRead.join()
self.threadRead = None
logging.info("%s stopped (return code:%d)" % (self.name, code))
except ProcessLookupError as e:
logging.warning("Stop process (%s)" % str(e))
return "stopped"
def getlog(self, firstline):
""" Get log """
logging.debug("read log %d " % firstline)
outjson = {}
pos = self.poslastlog - len(self.log) + 1
firstlinefound = None
for line in self.log:
if pos >= firstline:
outjson.setdefault(pos, line)
if not firstlinefound:
firstlinefound = pos
pos += 1
outjson.setdefault('first-line', firstlinefound)
outjson.setdefault('last-line', pos - 1)
return json.dumps(outjson)
def action(self, action):
""" Send action to program (send input to stdin) """
logging.debug("STDIN '%s'" % action)
if self.process:
code = self.process.poll()
if code is None:
if action:
self.process.stdin.write(bytes(action+'\n', 'UTF-8'))
self.process.stdin.flush()
return "ok"
return "ko"
def run(self):
""" loop, run child (wait command) """
loop = True
while loop:
logging.debug('wait %s' % self.name)
self.event.wait()
logging.debug('received event %s' % self.name)
try:
msg = self.queueIn.get(timeout=4)
except queue.Empty:
self.event.clear()
logging.debug("[%s] Queue empty (no message)" % self.name)
return
logging.debug("command : '%s'" % msg)
command = msg.split()[0]
if command == "SHUTDOWN":
loop = False
continue
elif command == "START":
self.queueOut.put(self.start())
elif command == "STATUS":
self.queueOut.put(self.status())
elif command == "STOP":
self.queueOut.put(self.stop())
elif command == "STDIN":
data = msg.split(maxsplit=1)[1]
self.queueOut.put(self.action(data))
elif command == "STDOUT":
try:
firstline = int(msg.split(maxsplit=1)[1])
except ValueError:
logging.warning("Bad value for param first-line (need integer)")
firstline = 0
self.queueOut.put(self.getlog(firstline))
else:
logging.warning("Bad command (%s)" % command)
self.queueOut.put("error : command unknown")
self.event.clear()
self.stop()
self.event.clear()
logging.debug('end')
class Manager():
"""
Manage all services
(read configuration, launch ManageCommand & launch ServerHttp & wait the end)
* https service
* all child to manage (it start ManageCommand by command define in configuration)
"""
def __init__(self, launch_program):
self.threadCommand = []
self.info = {}
self.command = []
self.launch_program = launch_program
self.param = {}
self.users = {}
self.passwordfile = None
self.serverHttp = None
self.port = 8000
self.address = ''
self.keyfile = 'crt/key.pem'
self.certfile = 'crt/cert.pem'
self.ca_cert = 'crt/ca_cert.crt'
self.authentification = False
self.method = 'http'
def load_config(self, filecfg):
if filecfg is None:
raise ValueError
config = configparser.ConfigParser()
config.read_file(filecfg)
self._load_config(config)
def load_password(self):
if self.passwordfile:
with open(self.passwordfile, 'rt') as fp:
for line in fp:
line = line.strip()
if not line:
continue
username, password = line.split(':', maxsplit=1)
self.users.setdefault(username, password)
def _load_config(self, config):
"""
Read configuration object
param: config: configuration object
"""
logging.debug("Sections :%s" % config.sections())
for name in config.sections():
if name == 'config:client':
continue
if name == 'config:user':
continue
elif name == 'config:server':
logging.debug("read config '%s'" % name)
try:
self.port = int(config[name]['port'])
except (TypeError, KeyError, ValueError):
pass
try:
self.address = config[name]['address']
except (TypeError, KeyError):
pass
try:
self.keyfile = config[name]['keyfile']
except (TypeError, KeyError):
pass
try:
self.certfile = config[name]['certfile']
except (TypeError, KeyError):
pass
try:
self.ca_cert = config[name]['ca_cert']
except (TypeError, KeyError):
pass
try:
tmp = config[name]['authentification']
if tmp.upper().strip() == 'YES':
self.authentification = True
else:
self.authentification = False
except (TypeError, KeyError):
pass
try:
self.passwordfile = config[name]['passwordfile']
except (TypeError, KeyError):
pass
try:
self.method = config[name]['method']
except (TypeError, KeyError):
pass
else:
try:
head, value = name.split(':', maxsplit=1)
except ValueError:
logging.warning("ignore bad parameter '%s'" % (name))
continue
if head == 'command' and 'command' in config[name]:
logging.debug("read command '%s'" % name)
if 'path' in config[name]:
path = config[name]['path']
else:
path = None
if 'logsize' in config[name]:
try:
logsize = int(config[name]['logsize'])
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param logsize (command:%s)", name)
raise ValueError
else:
logsize = 100
if 'bufsize' in config[name]:
try:
bufsize = int(config[name]['bufsize'])
except (TypeError, KeyError, ValueError):
logging.error("Impossible to read param bufsize (command:%s)", name)
raise ValueError
else:
bufsize = 100
self.param.setdefault(name, {'command': config[name]['command'],
'path': path,
'logsize': logsize,
'bufsize': bufsize})
def initialize_http(self):
"""
Initialize object serverHttp
"""
self.serverHttp = ServerHttp(self.keyfile,
self.certfile,
self.ca_cert,
self.address,
self.port,
authentification=self.authentification,
method=self.method,
users=self.users)
def runCommand(self, name, command, path, logsize, bufsize, queueIn, queueOut, event):
"""
Thread to manage khaganat program
"""
logging.debug("Initialize '%s'" % name)
manageCommand = ManageCommand(name=name,
command=command,
path=path,
logsize=logsize,
bufsize=bufsize,
queueIn=queueIn,
queueOut=queueOut,
event=event)
manageCommand.run()
def launch_server_http(self):
""" Launch server https """
self.serverHttp.daemon = True
self.serverHttp.start()
def launch_command(self):
""" Launch child to manage each program """
for name in self.param:
logging.debug("Initialize '%s'" % name)
queueIn = multiprocessing.Queue()
queueOut = multiprocessing.Queue()
event = multiprocessing.Event()
self.serverHttp.append(name, queueIn, queueOut, event)
threadCommand = multiprocessing.Process(target=self.runCommand,
args=(name,
self.param[name]['command'],
self.param[name]['path'],
self.param[name]['logsize'],
self.param[name]['bufsize'],
queueIn,
queueOut,
event))
threadCommand.start()
self.threadCommand.append(threadCommand)
self.info.setdefault(name, {'queueIn': queueIn,
'queueOut': queueOut,
'event': event,
'threadCommand': threadCommand,
'command': self.param[name]['command'],
'path': self.param[name]['path'],
'logsize': self.param[name]['logsize'],
'bufsize': self.param[name]['bufsize']})
if self.launch_program:
event.set()
queueIn.put("START")
try:
item = queueOut.get(timeout=4)
except queue.Empty:
item = ""
logging.debug("[%s] Queue empty (no message)" % name)
return
logging.info("%s => %s" % (name, item))
def receive_signal(self, signum, frame):
""" Managed signal """
for child in self.threadCommand:
child.terminate()
if self.serverHttp:
self.serverHttp.terminate()
def wait_children_commands(self):
for child in self.threadCommand:
child.join()
def wait_child_server_http(self):
self.serverHttp.terminate()
self.serverHttp.join()
def run(self):
""" launch all """
self.launch_command()
self.launch_server_http()
logging.info('started')
self.wait_children_commands()
logging.info('execute shutdown')
signal.alarm(0)
logging.info('wait thread http')
time.sleep(1)
self.wait_child_server_http()
logging.info('shutdown completed')
def root(filecfg, fileLog, logLevel, launch_program, show_log_console):
"""
Main function
:param str filecfg: configuration file
:param str fileLog: log file
:param bool launch_program: do you launch program when you start manager (auto start)
:param bool show_log_console: do you need show log on console
"""
# Manage log
logging.getLogger('logging')
numeric_level = getattr(logging, logLevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % logLevel)
handlers = []
if show_log_console:
handlers.append(logging.StreamHandler())
if fileLog:
handlers.append(logging.FileHandler(fileLog.name))
logging.basicConfig(handlers=handlers, level=numeric_level,
format='%(asctime)s %(levelname)s [pid:%(process)d] [%(funcName)s:%(lineno)d] %(message)s')
if filecfg is None:
logging.error("Missing configuration file")
raise ValueError
manager = Manager(launch_program)
manager.load_config(filecfg)
manager.load_password()
manager.initialize_http()
manager.run()
def main(args=sys.argv[1:]):
""" Main function
:param list args: all arguments ('--help, '--version', ...)
"""
parser = argparse.ArgumentParser(description='Manage khaganat process')
parser.add_argument('--version', action='version', version='%(prog)s ' + __VERSION__)
parser.add_argument('-c', '--conf', type=argparse.FileType('r'),
default='khaganat.cfg', help='configuration file')
parser.add_argument('--show-log-console', action='store_true',
help='show message in console', default=False)
parser.add_argument('--filelog', type=argparse.FileType('wt'),
default=None, help='log file')
parser.add_argument('--log',
default='INFO', help='log level [DEBUG, INFO, WARNING, ERROR')
parser.add_argument('--launch-program', action='store_true',
help='launch program when start manager', default=False)
param = parser.parse_args(args)
root(filecfg=param.conf,
fileLog=param.filelog,
logLevel=param.log,
launch_program=param.launch_program,
show_log_console=param.show_log_console)
if __name__ == '__main__':
main()