mirror of
https://port.numenaute.org/aleajactaest/clientbot.git
synced 2024-11-25 00:26:33 +00:00
409 lines
16 KiB
Python
409 lines
16 KiB
Python
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# module DecodeDatabase
|
|
#
|
|
# Copyright (C) 2019 AleaJactaEst
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging
|
|
from tools import getPowerOf2
|
|
from tools import BitStream
|
|
|
|
LOGGER='DecodeDatabase'
|
|
|
|
def show_dico(dico, level=1):
|
|
try:
|
|
for ele in dico:
|
|
if isinstance(dico[ele], dict):
|
|
logging.getLogger(LOGGER).debug("%s %s %s" % ("." * level, ele , ":"))
|
|
if isinstance(dico[ele], dict):
|
|
show_dico(dico[ele], level+1)
|
|
else:
|
|
logging.getLogger(LOGGER).debug("%s %s %s" % ("." * level, ele, ':', dico[ele]))
|
|
except:
|
|
logging.getLogger(LOGGER).debug("empty")
|
|
|
|
def print_dico(dico, level=1):
|
|
try:
|
|
for ele in dico:
|
|
if isinstance(dico[ele], dict):
|
|
print("%s %s %s" % ("." * level, ele , ":"))
|
|
if isinstance(dico[ele], dict):
|
|
show_dico(dico[ele], level+1)
|
|
else:
|
|
print("%s %s %s" % ("." * level, ele, ':', dico[ele]))
|
|
except:
|
|
pass
|
|
|
|
def child(ele):
|
|
ret = {}
|
|
ret_branch = {}
|
|
ref_other = {}
|
|
id_branch = 0
|
|
id_other = 0
|
|
min_i = -1
|
|
max_i = -1
|
|
for k in ele.keys():
|
|
ret[k] = ele.get(k)
|
|
logging.getLogger(LOGGER).debug("%s %s" % (str(k), str(ele.get(k))))
|
|
for _child in list(ele):
|
|
x = child(_child)
|
|
if x['name'] == 'branch':
|
|
ret_branch.setdefault(id_branch, x)
|
|
id_branch += 1
|
|
else:
|
|
ref_other.setdefault(id_other, x)
|
|
id_other += 1
|
|
if ret_branch or ref_other:
|
|
ret['child'] = {}
|
|
id = 0
|
|
for x in ret_branch:
|
|
min_i = max_i + 1
|
|
max_i = min_i
|
|
#show_dico( ret_branch[x])
|
|
if 'count' in ret_branch[x]:
|
|
max_i = min_i + int(ret_branch[x]['count']) - 1
|
|
ret_branch[x]['min'] = min_i
|
|
ret_branch[x]['max'] = max_i
|
|
ret['child'].setdefault(id, ret_branch[x])
|
|
id += 1
|
|
for x in ref_other:
|
|
min_i = max_i + 1
|
|
max_i = min_i
|
|
#show_dico( ref_other[x])
|
|
if 'count' in ref_other[x]:
|
|
max_i = min_i + int(ref_other[x]['count']) - 1
|
|
ref_other[x]['min'] = min_i
|
|
ref_other[x]['max'] = max_i
|
|
ret['child'].setdefault(id, ref_other[x])
|
|
id += 1
|
|
return ret
|
|
|
|
def count_elements(head):
|
|
#print(head)
|
|
if 'child' in head.keys():
|
|
list_ele = [x for x in list(head['child'].keys()) if isinstance(x, int)]
|
|
else:
|
|
list_ele = [x for x in list(head.keys()) if isinstance(x, int)]
|
|
logging.getLogger(LOGGER).debug(str(list_ele))
|
|
last_key = max(list_ele)
|
|
logging.getLogger(LOGGER).debug(str(last_key ))
|
|
try:
|
|
last_key = max(list_ele)
|
|
if 'child' in head.keys():
|
|
return head['child'][last_key]['max'] + 1
|
|
else:
|
|
return head[last_key]['max'] + 1
|
|
except TypeError:
|
|
return 0
|
|
|
|
def get_element(head, id):
|
|
logging.getLogger(LOGGER).debug("id: %s" % str(id))
|
|
for ele in head['child']:
|
|
logging.getLogger(LOGGER).debug("ele:%s" % str(ele))
|
|
logging.getLogger(LOGGER).debug("head:%s" % str(head))
|
|
logging.getLogger(LOGGER).debug("max:%s", str(head['child'][ele]['max']))
|
|
logging.getLogger(LOGGER).debug("min:%s", str(head['child'][ele]['min']))
|
|
if id <= head['child'][ele]['max'] and id >= head['child'][ele]['min']:
|
|
return head['child'][ele]
|
|
return None
|
|
|
|
|
|
class LeafDatabase():
|
|
def __init__(self):
|
|
self.name = ""
|
|
self.type = ""
|
|
|
|
def loadXmlWithoutCount(self, xmldata, extra=""):
|
|
self.name = xmldata.get('name') + extra
|
|
self.type = xmldata.get('type')
|
|
|
|
def countLeaves(self):
|
|
logging.getLogger(LOGGER).debug("countLeaves leaf %s (nb:1)" % (self.name))
|
|
return 1
|
|
|
|
def show(self, level=1):
|
|
logging.getLogger(LOGGER).debug("%s %s Leaf %s [%s]" % ("*" * level, str(level), self.name, str(self.type)))
|
|
|
|
def execute(self, msgin, name=""):
|
|
if name:
|
|
tmp = name
|
|
else:
|
|
tmp = self.name
|
|
if self.type[0] == 'I': # Unsigned
|
|
logging.getLogger(LOGGER).debug("Read:" + str (self.type))
|
|
value = int(self.type[1:])
|
|
data = msgin.readSerialUint64(value, name=tmp, typeName="Uint" + str(value))
|
|
elif self.type[0] == 'S': # Signed
|
|
logging.getLogger(LOGGER).debug("Read:" + str (self.type))
|
|
value = int(self.type[1:])
|
|
# _ = msgin.readNbChar(value, name='DatabaseXML' + tmp)
|
|
data = msgin.readSerialUint64(value, name=tmp, typeName="Sint" + str(value))
|
|
elif self.type == 'TEXT':
|
|
logging.getLogger(LOGGER).debug("type:" + str (self.type))
|
|
value = 32
|
|
data = msgin.readSerialSint64(value, name=idname, typeName="Uint32/TEXT")
|
|
else:
|
|
logging.getLogger(LOGGER).error("Type inconnu:" + str (self.type))
|
|
raise "type not managed"
|
|
return True, {tmp: data}
|
|
|
|
def execute_atom(self, level, pos, msgin, name=""):
|
|
if name:
|
|
extraName = name + "/" + self.name
|
|
else:
|
|
extraName = self.name
|
|
idname = extraName
|
|
logging.getLogger(LOGGER).debug('level:'+ str(level) + ' pos:' + str(pos) + ' idname:' + idname)
|
|
if level < pos:
|
|
return level + 1, None
|
|
if self.type[0] == 'I':
|
|
value = int(self.type[1:])
|
|
if value > 64:
|
|
raise "type not managed"
|
|
logging.getLogger(LOGGER).debug('value:' + str(value))
|
|
data = msgin.readSerialUint64(value, name=idname, typeName="Uint" + str(value))
|
|
return level+1, {extraName: data}
|
|
elif self.type[0] == 'S':
|
|
logging.getLogger(LOGGER).debug("type:" + str (self.type))
|
|
value = int(self.type[1:])
|
|
data = msgin.readSerialSint64(value, name=idname, typeName="Sint" + str(value))
|
|
return level+1, {extraName: data}
|
|
elif self.type == 'TEXT':
|
|
logging.getLogger(LOGGER).debug("type:" + str (self.type))
|
|
value = 32
|
|
data = msgin.readSerialUint64(value, name=idname, typeName="Uint32/TEXT")
|
|
return level+1, {extraName: data}
|
|
else:
|
|
logging.getLogger(LOGGER).debug("Type inconnu:", self.type)
|
|
raise "type not managed"
|
|
return level, None
|
|
|
|
|
|
class BranchDatabase():
|
|
def __init__(self):
|
|
self.name = None
|
|
self.branch = []
|
|
self.leaf = []
|
|
self.min = None
|
|
self.max = None
|
|
self.atom = False
|
|
|
|
def loadXml(self, xmldata, extra="", filter=None):
|
|
if filter:
|
|
if 'bank' in xmldata:
|
|
if filter != xmldata['bank']:
|
|
return
|
|
else:
|
|
return
|
|
self.name = xmldata.get('name') + extra
|
|
if xmldata.get('atom'):
|
|
self.atom = True
|
|
for ele in xmldata:
|
|
if ele.get('count'):
|
|
count = int(ele.get('count'))
|
|
if ele.tag == 'branch':
|
|
for i in range(0, count):
|
|
newbranch = BranchDatabase()
|
|
newbranch.loadXml(ele, str(i))
|
|
self.branch.append(newbranch)
|
|
elif ele.tag == 'leaf':
|
|
for i in range(0, count):
|
|
newleaf = LeafDatabase()
|
|
newleaf.loadXmlWithoutCount(ele, str(i))
|
|
self.leaf.append(newleaf)
|
|
else:
|
|
if ele.tag == 'branch':
|
|
newbranch = BranchDatabase()
|
|
newbranch.loadXml(ele, "")
|
|
self.branch.append(newbranch)
|
|
elif ele.tag == 'leaf':
|
|
newleaf = LeafDatabase()
|
|
newleaf.loadXmlWithoutCount(ele, "")
|
|
self.leaf.append(newleaf)
|
|
|
|
def loadRootXml(self, xmldata, filter=None):
|
|
for ele in xmldata:
|
|
if ele.tag == 'branch':
|
|
if filter:
|
|
if ele.get('bank') == filter:
|
|
newbranch = BranchDatabase()
|
|
newbranch.loadXml(ele)
|
|
self.branch.append(newbranch)
|
|
|
|
def getIdBits(self):
|
|
count = 0
|
|
for ele in self.branch:
|
|
count += 1
|
|
for ele in self.leaf:
|
|
count += 1
|
|
return count
|
|
|
|
def countLeaves(self):
|
|
count = 0
|
|
for ele in self.branch:
|
|
count += ele.countLeaves()
|
|
for ele in self.leaf:
|
|
count += ele.countLeaves()
|
|
return count
|
|
|
|
def execute_atom_found(self, level, pos, msgin, name=""):
|
|
step=1
|
|
if name:
|
|
extraName = name + self.name
|
|
else:
|
|
extraName = self.name
|
|
logging.getLogger(LOGGER).debug('level:' + str(level) + ' pos:' + str(pos) + ' name:' + name)
|
|
idname = extraName
|
|
for ele in self.branch:
|
|
level, data = ele.execute_atom_found(level, pos, msgin, idname)
|
|
logging.getLogger(LOGGER).debug('level:' + str(level) + ' pos:' + str(pos) + ' name:' + name + ' idname:' + str(idname))
|
|
if level > pos:
|
|
return level, data
|
|
for ele in self.leaf:
|
|
level, data = ele.execute_atom(level, pos, msgin, idname)
|
|
logging.getLogger(LOGGER).debug('level:' + str(level) + ' pos:' + str(pos) + ' name:' + name + ' idname:' + str(idname))
|
|
if level > pos:
|
|
return level, data
|
|
return level, None
|
|
|
|
def show(self, level=0, pos=0, filterlevel=None):
|
|
logging.getLogger(LOGGER).debug( "%s %s pos:%s Branch:%s atom:%s getIdBits:%s" % ("*" * level, str(level), str(pos), str(self.name), str(self.atom), str(self.getIdBits())))
|
|
if filterlevel is not None:
|
|
if filterlevel <= level:
|
|
return
|
|
i = 0
|
|
for ele in self.branch:
|
|
ele.show(level + 1, i, filterlevel)
|
|
i += 1
|
|
for ele in self.leaf:
|
|
ele.show(level + 1)
|
|
|
|
def execute_atom(self, msgin, parent):
|
|
ret = {}
|
|
nbchild = self.countLeaves()
|
|
# nbchild = len(self.leaf) + len(self.branch)
|
|
#nbBit = getPowerOf2.getPowerOf2_Bis(nbchild)
|
|
|
|
logging.getLogger(LOGGER).debug("needRead:" + str(msgin.needRead()) + " nbchild:" + str(nbchild))
|
|
logging.getLogger(LOGGER).debug(msgin.showAllData())
|
|
if self.name:
|
|
idname = parent + self.name +'/'
|
|
else:
|
|
idname = parent
|
|
cBitSet = msgin.readCBitSet(nbchild, idname + ":Param", typeName = 'Uint' + str(nbchild))
|
|
#cBitSet = CBitSet.CBitSet()
|
|
#cBitSet.readSerialExtra(msgin, nbchild, self.name + "/Param")
|
|
logging.getLogger(LOGGER).debug(msgin.showAllData())
|
|
todelete_count_true = 0
|
|
for i in range(0, nbchild):
|
|
#ii = nbchild - i
|
|
logging.getLogger(LOGGER).debug(str(i) + " - " + str(cBitSet.get(i)))
|
|
if cBitSet.get(i):
|
|
todelete_count_true += 1
|
|
level, data = self.execute_atom_found(0, i, msgin, parent)
|
|
#print(data)
|
|
for key in data:
|
|
ret.setdefault( key, data[key])
|
|
#ret.setdefault( self.name+str(i), data )
|
|
if todelete_count_true > 1:
|
|
logging.getLogger(LOGGER).debug(msgin.showAllData())
|
|
logging.getLogger(LOGGER).debug(msgin.showAllData())
|
|
if ret:
|
|
return True, ret
|
|
else:
|
|
return False, None
|
|
|
|
def execute_normal(self, msgin, parent):
|
|
nbchild = self.getIdBits()
|
|
#nbBit = getPowerOf2.getPowerOf2_Bis(nbchild)
|
|
nbBit = getPowerOf2.getPowerOf2_ter(nbchild)
|
|
if nbBit > msgin.needRead() :
|
|
logging.getLogger(LOGGER).debug("nbBit:" + str(nbBit) + " nbchild:" + str(nbchild) + " needRead:" + str(msgin.needRead() ))
|
|
return False, None
|
|
logging.getLogger(LOGGER).debug("needRead:" + str(msgin.needRead()) + " nbBit:" + str(nbBit))
|
|
id = msgin.readSerial(nbBit, name=parent, typeName='Uint'+str(nbBit), emulate=True)
|
|
logging.getLogger(LOGGER).debug("read : needRead:" + str(msgin.needRead()) + " nbBit:" + str(nbBit) + " id:" + str(id))
|
|
i = 0
|
|
ii = 0
|
|
for ele in self.branch:
|
|
#logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name))
|
|
i += 1
|
|
if i > id:
|
|
comment = ""
|
|
logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name))
|
|
if self.name:
|
|
idnameshort = parent + self.name + '/'
|
|
else:
|
|
idnameshort = parent
|
|
idname = idnameshort + ele.name
|
|
_= msgin.readSerial(nbBit, name=idname, typeName='Uint'+str(nbBit), emulate=False, commentValue=ele.name+comment)
|
|
logging.getLogger(LOGGER).debug("name:" + ele.name + ", atom:" + str(ele.atom))
|
|
state, data = ele.execute(msgin, idnameshort)
|
|
if state:
|
|
return True, data
|
|
return False, None
|
|
ii = i
|
|
for ele in self.leaf:
|
|
logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name))
|
|
i += 1
|
|
if i > id:
|
|
if self.name:
|
|
idnameshort = parent + self.name + '/'
|
|
else:
|
|
idnameshort = parent
|
|
idname = idnameshort + ele.name
|
|
logging.getLogger(LOGGER).debug(str(i) + " id:" + str(id) + " name:" + str(ele.name))
|
|
_ = msgin.readSerial(nbBit, name= idname, typeName='Uint'+str(nbBit), emulate=False, commentValue=ele.name)
|
|
logging.getLogger(LOGGER).debug("name:" + ele.name )
|
|
state, data = ele.execute(msgin, name=idname)
|
|
if state:
|
|
return True, data
|
|
return False, None
|
|
ii = i
|
|
|
|
id = msgin.readSerial(nbBit, name=parent, typeName='Uint'+str(nbBit))
|
|
logging.getLogger(LOGGER).debug("OUT : needRead:" + str(msgin.needRead()) + " nbBit:" + str(nbBit) + " id:" + str(id)+ " i:" + str(ii))
|
|
logging.getLogger(LOGGER).debug(msgin.showAllData())
|
|
return False, None
|
|
|
|
def execute(self, msgin, parent='DatabaseXML/'):
|
|
ret = False, None
|
|
if self.atom:
|
|
logging.getLogger(LOGGER).debug("execute:" + parent)
|
|
state, data = self.execute_atom(msgin, parent)
|
|
else:
|
|
logging.getLogger(LOGGER).debug("execute:" + parent)
|
|
state, data = self.execute_normal(msgin, parent)
|
|
return state, data
|
|
|
|
|
|
class DecodeDatabase():
|
|
def __init__(self):
|
|
self.databaseXml = None
|
|
self.databasePlr = None
|
|
|
|
def loadDatabase(self, databaseXml):
|
|
self.databasePlr = BranchDatabase()
|
|
self.databasePlr.loadRootXml(databaseXml, 'PLR')
|
|
self.databasePlr.show()
|
|
#raise "ok"
|
|
|
|
def execute(self, msgin, world):
|
|
logging.getLogger(LOGGER).debug("Start execute")
|
|
ret = self.databasePlr.execute(msgin)
|
|
logging.getLogger(LOGGER).debug("End execute")
|
|
return ret
|