mirror of
https://port.numenaute.org/aleajactaest/khanat-code-old.git
synced 2025-01-22 14:42:02 +00:00
1159 lines
28 KiB
C++
1159 lines
28 KiB
C++
|
// Ryzom - MMORPG Framework <http://dev.ryzom.com/projects/ryzom/>
|
||
|
// Copyright (C) 2010 Winch Gate Property Limited
|
||
|
//
|
||
|
// This program is free software: you can redistribute it and/or modify
|
||
|
// it under the terms of the GNU Affero General Public License as
|
||
|
// published by the Free Software Foundation, either version 3 of the
|
||
|
// License, or (at your option) any later version.
|
||
|
//
|
||
|
// This program is distributed in the hope that it will be useful,
|
||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
// GNU Affero General Public License for more details.
|
||
|
//
|
||
|
// You should have received a copy of the GNU Affero General Public License
|
||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
#include "pd_lib.h"
|
||
|
|
||
|
#include <nel/misc/file.h>
|
||
|
#include <nel/misc/o_xml.h>
|
||
|
|
||
|
#include <nel/net/service.h>
|
||
|
#include <nel/net/unified_network.h>
|
||
|
#include <nel/net/service.h>
|
||
|
|
||
|
#include "pd_string_mapper.h"
|
||
|
#include "timestamp.h"
|
||
|
#include "db_description_parser.h"
|
||
|
#include "game_share/backup_service_interface.h"
|
||
|
|
||
|
using namespace std;
|
||
|
using namespace NLMISC;
|
||
|
using namespace NLNET;
|
||
|
|
||
|
namespace RY_PDS
|
||
|
{
|
||
|
|
||
|
|
||
|
std::vector<CPDSLib*> CPDSLib::_Libs;
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* PD System Verbosity control
|
||
|
*/
|
||
|
CVariable<bool> PDVerbose("pd", "PDVerbose", "Control of the PD system verbosity", false, 0, true);
|
||
|
|
||
|
/**
|
||
|
* PD System Verbosity level
|
||
|
*/
|
||
|
CVariable<sint> PDVerboseLevel("pd", "PDVerboseLevel", "PD system verbosity level (the greater the more the service logs info, -1=no log, default=0)", 0, 0, true);
|
||
|
|
||
|
/*
|
||
|
* PDS Resolve Unmapped rows
|
||
|
* If true, PDS continue loading when it finds a row not mapped in a mapped table,
|
||
|
* and keeps it unmapped.
|
||
|
* Otherwise, loading fails.
|
||
|
*/
|
||
|
CVariable<bool> ResolveUnmappedRows("pd", "ResolveUnmappedRows", "If true, PDS continue loading when it finds a row not mapped in a mapped table, and keeps it unmapped. Otherwise, loading fails.", true, 0, true);
|
||
|
|
||
|
/*
|
||
|
* PDS Resolve Double Mapped Rows
|
||
|
* If true, PDS continue loading when it finds a row already mapped, and keeps it unmapped.
|
||
|
* Otherwise, loading fails.
|
||
|
* See also ResolveDoubleMappedKeepOlder
|
||
|
*/
|
||
|
CVariable<bool> ResolveDoubleMappedRows("pd", "ResolveDoubleMappedRows", "If true, PDS continue loading when it finds a row already mapped, and keeps it unmapped. Otherwise, loading fails. See also ResolveDoubleMappedKeepOlder", true, 0, true);
|
||
|
|
||
|
/*
|
||
|
* PDS Resolve Double Mapped Keep Older
|
||
|
* If true, when finds a doubly mapped row at loading, keep only older row as mapped (lesser row index)
|
||
|
* See also ResolveDoubleMappedRows.
|
||
|
*/
|
||
|
CVariable<bool> ResolveDoubleMappedKeepOlder("pd", "ResolveDoubleMappedKeepOlder", "If true, when finds a doubly mapped row at loading, keep only older row as mapped (lesser row index). See also ResolveDoubleMappedRows.", true, 0, true);
|
||
|
|
||
|
/*
|
||
|
* PDS Resolve Unallocated rows
|
||
|
* If true, when finds a reference to an unallocated row or an invalid row, force reference to null.
|
||
|
*/
|
||
|
CVariable<bool> ResolveInvalidRow("pd", "ResolveInvalidRow", "If true, when finds a reference to an unallocated row or an invalid row, force reference to null.", true, 0, true);
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Maximal Message Queue Size in bytes
|
||
|
* Set to 1mb by default
|
||
|
*/
|
||
|
CVariable<uint> PDMaxMsgQueueSize("pd", "PDMaxMsgQueueSize", "Maximum message queue size towards PDS. If queue size is greater, PDS is displayed as not available, and processing should be halted", 1048576, 0, true);
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Enables client to log updates by itself (when PDS is not used)
|
||
|
*/
|
||
|
CVariable<bool> PDEnableLog("pd", "PDEnableLog", "Enables client to log updates by itself (when PDS is not used)", true, 0, true);
|
||
|
|
||
|
/*
|
||
|
* Enables client to log string manager updates by itself (when PDS is not used)
|
||
|
*/
|
||
|
//CVariable<bool> PDEnableStringLog("pd", "PDEnableStringLog", "Enables client to log string manager updates by itself (when PDS is not used)", true, 0, true);
|
||
|
|
||
|
/*
|
||
|
* Number of seconds between 2 updates (when PDS is not used)
|
||
|
* Set to 1mb by default
|
||
|
*/
|
||
|
CVariable<uint> PDLogUpdate("pd", "PDLogUpdate", "Number of seconds between 2 updates (when PDS is not used)", 5, 0, true);
|
||
|
|
||
|
/*
|
||
|
* Time between creating 2 different pd_log file (in seconds)
|
||
|
*/
|
||
|
CVariable<uint> PDLogFileTime("pd", "PDLogFileTime", "Time between creating 2 different pd_log file (in seconds)", 3600, 0, true);
|
||
|
|
||
|
/*
|
||
|
* Directory to save logs (when PDS is not used)
|
||
|
*/
|
||
|
CVariable<std::string> PDRootDirectory("pd", "PDRootDirectory", "Directory to save PD data (if empty, 'SaveFilesDirectory/pds' will be used instead)", "", 0, true);
|
||
|
|
||
|
/*
|
||
|
* Use Backup Servce when logging
|
||
|
*/
|
||
|
CVariable<bool> PDUseBS("pd", "PDUseBS", "Use Backup Service when logging (when PDS is not used)", true, 0, true);
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* PDS is ready to work
|
||
|
*/
|
||
|
void cbPDSReady(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
{
|
||
|
uint32 databaseId;
|
||
|
msgin.serial(databaseId);
|
||
|
|
||
|
uint32 lastUpdateId;
|
||
|
msgin.serial(lastUpdateId);
|
||
|
|
||
|
CPDSLib* lib = CPDSLib::getLib(databaseId);
|
||
|
|
||
|
if (lib != NULL)
|
||
|
lib->ready(lastUpdateId);
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* PDS init failed, stop everything!
|
||
|
*/
|
||
|
void cbPDSInitFailed(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
{
|
||
|
uint32 databaseId;
|
||
|
msgin.serial(databaseId);
|
||
|
string error;
|
||
|
msgin.serial(error);
|
||
|
|
||
|
nlerror("Database '%d' initialisation failed: '%s'", databaseId, error.c_str());
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* fetch data
|
||
|
*/
|
||
|
void cbFetch(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
{
|
||
|
uint32 databaseId;
|
||
|
msgin.serial(databaseId);
|
||
|
|
||
|
CPDSLib* lib = CPDSLib::getLib(databaseId);
|
||
|
|
||
|
if (lib != NULL)
|
||
|
lib->fetchPDSData(msgin);
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* fetch data failed
|
||
|
*/
|
||
|
void cbFetchFailure(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
{
|
||
|
uint32 databaseId;
|
||
|
msgin.serial(databaseId);
|
||
|
|
||
|
CPDSLib* lib = CPDSLib::getLib(databaseId);
|
||
|
|
||
|
if (lib != NULL)
|
||
|
lib->notifyFetchFailure(msgin);
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* receive index allocators
|
||
|
*/
|
||
|
void cbAllocs(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
{
|
||
|
uint32 databaseId;
|
||
|
msgin.serial(databaseId);
|
||
|
|
||
|
CPDSLib* lib = CPDSLib::getLib(databaseId);
|
||
|
|
||
|
if (lib == NULL)
|
||
|
return;
|
||
|
|
||
|
// do not reinit allocator as it may contain newer data
|
||
|
if (lib->allocsInitialised())
|
||
|
return;
|
||
|
|
||
|
lib->setupIndexAllocators(msgin);
|
||
|
lib->initAllocs();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* init client string manager
|
||
|
*/
|
||
|
//void cbInitStringManager(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
//{
|
||
|
// uint32 databaseId;
|
||
|
// msgin.serial(databaseId);
|
||
|
//
|
||
|
// CPDSLib* lib = CPDSLib::getLib(databaseId);
|
||
|
//
|
||
|
// if (lib == NULL)
|
||
|
// return;
|
||
|
//
|
||
|
// // do not reinit string manager (as it may contain newer data)
|
||
|
// if (lib->stringManagerInitialised())
|
||
|
// return;
|
||
|
//
|
||
|
// CPDStringManager& sm = lib->getStringManager();
|
||
|
// sm.serial(msgin);
|
||
|
//
|
||
|
// lib->initStringManager();
|
||
|
//}
|
||
|
|
||
|
/*
|
||
|
* Flush client messages, when they are acknowledged by the PDS
|
||
|
*/
|
||
|
void cbFlushAckUpdates(NLNET::CMessage& msgin, const std::string &serviceName, TServiceId serviceId)
|
||
|
{
|
||
|
uint32 databaseId;
|
||
|
msgin.serial(databaseId);
|
||
|
|
||
|
vector<uint32> ack;
|
||
|
msgin.serialCont(ack);
|
||
|
|
||
|
CPDSLib* lib = CPDSLib::getLib(databaseId);
|
||
|
|
||
|
if (lib != NULL)
|
||
|
lib->flushAcknowledged(ack);
|
||
|
}
|
||
|
|
||
|
TUnifiedCallbackItem PDSCbArray[] =
|
||
|
{
|
||
|
{ "PD_READY", cbPDSReady },
|
||
|
{ "PD_INIT_FAILED", cbPDSInitFailed },
|
||
|
{ "PD_FETCH", cbFetch },
|
||
|
{ "PD_FETCH_FAILURE", cbFetchFailure },
|
||
|
{ "PD_ALLOCS", cbAllocs },
|
||
|
// { "PD_SM_INIT", cbInitStringManager },
|
||
|
{ "PD_ACK_UPD", cbFlushAckUpdates },
|
||
|
};
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Constructor
|
||
|
*/
|
||
|
CPDSLib::CPDSLib()
|
||
|
{
|
||
|
_PDSConnected = false;
|
||
|
_PDSReady = false;
|
||
|
_DatabaseId = 0-1;
|
||
|
_UpdateId = 0-1;
|
||
|
_UsePDS = false;
|
||
|
_AllocsInitialised = false;
|
||
|
// _StringManagerInitialised = false;
|
||
|
|
||
|
_PreviousLogSave = 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Internal type checking
|
||
|
*/
|
||
|
void CPDSLib::checkInternalTypes()
|
||
|
{
|
||
|
|
||
|
if (sizeof(CColumnIndex) != 8)
|
||
|
nlerror("Internal type checking failed: class CColumnIndex is not 8 bytes wide");
|
||
|
|
||
|
if (sizeof(CObjectIndex) != 8)
|
||
|
nlerror("Internal type checking failed: class CObjectIndex is not 8 bytes wide");
|
||
|
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
void CPDSLib::usePDS()
|
||
|
{
|
||
|
_UsePDS = true;
|
||
|
}
|
||
|
|
||
|
// Init Db description
|
||
|
void CPDSLib::init(const std::string &xml, uint32 overrideDbId)
|
||
|
{
|
||
|
nlinfo("CPDSLib: %12s", "init");
|
||
|
|
||
|
// initial checkup
|
||
|
checkInternalTypes();
|
||
|
|
||
|
_XmlDescription = xml;
|
||
|
|
||
|
CUnifiedNetwork::getInstance()->addCallbackArray(PDSCbArray, sizeof(PDSCbArray)/sizeof(TUnifiedCallbackItem));
|
||
|
CUnifiedNetwork::getInstance()->setServiceUpCallback("PDS", onPDSUp);
|
||
|
CUnifiedNetwork::getInstance()->setServiceDownCallback("PDS", onPDSDown);
|
||
|
|
||
|
CConfigFile::CVar* dbidptr = IService::getInstance()->ConfigFile.getVarPtr("DatabaseId");
|
||
|
if (overrideDbId != 0)
|
||
|
{
|
||
|
nlinfo("CPDSLib: variable 'DatabaseId' orverriden with Id='%d'", overrideDbId);
|
||
|
_DatabaseId = overrideDbId;
|
||
|
}
|
||
|
else if (dbidptr != NULL && overrideDbId == 0)
|
||
|
{
|
||
|
_DatabaseId = dbidptr->asInt();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
nlwarning("CPDSLib: variable 'DatabaseId' not found in config file, will use '0' as default value");
|
||
|
_DatabaseId = 0;
|
||
|
}
|
||
|
|
||
|
if (_Libs.size() <= _DatabaseId)
|
||
|
_Libs.resize(_DatabaseId+1, NULL);
|
||
|
_Libs[_DatabaseId] = this;
|
||
|
|
||
|
// _StringManager.init(this);
|
||
|
|
||
|
if (!_UsePDS)
|
||
|
{
|
||
|
// _StringManager.load();
|
||
|
|
||
|
// save description in log repertory
|
||
|
std::string logDir = getRemoteLogDirectory();
|
||
|
bool repSuccess = true;
|
||
|
|
||
|
if (!CFile::isDirectory(logDir))
|
||
|
{
|
||
|
if (!CFile::createDirectoryTree(logDir))
|
||
|
{
|
||
|
nlwarning("Failed to create log root directory '%s'", logDir.c_str());
|
||
|
repSuccess = false;
|
||
|
}
|
||
|
|
||
|
if (!CFile::setRWAccess(logDir))
|
||
|
{
|
||
|
nlwarning("Failed, can't set RW access to directory '%s'", logDir.c_str());
|
||
|
repSuccess = false;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
_LogStartDate.setToCurrent();
|
||
|
_PreviousTickDate = _LogStartDate;
|
||
|
|
||
|
if (repSuccess)
|
||
|
{
|
||
|
CDBDescriptionParser dbDesc;
|
||
|
dbDesc.loadDescription((const uint8*)(xml.c_str()));
|
||
|
// dbDesc.saveDescription(logDir + _LogStartDate.toString() + ".description");
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Init PDS log
|
||
|
*/
|
||
|
void CPDSLib::initLog(uint logmsg)
|
||
|
{
|
||
|
if (logmsg >= _LogDescs.size())
|
||
|
_LogDescs.resize(logmsg + 1);
|
||
|
|
||
|
nlassertex(_LogDescs[logmsg].Log == 0xffffffff, ("Internal error! Log message %d already declared!", logmsg));
|
||
|
|
||
|
_LogDescs[logmsg].Log = logmsg;
|
||
|
_LogDescs[logmsg].ByteSize = 0;
|
||
|
_LogDescs[logmsg].Params.clear();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Init PDS log parameter
|
||
|
*/
|
||
|
void CPDSLib::initLogParam(uint logmsg, uint logparam, uint byteSize)
|
||
|
{
|
||
|
nlassertex(_LogDescs.size() > logmsg, ("Internal error! Log message %d is not yet declared!", logmsg));
|
||
|
|
||
|
CLogDesc& desc = _LogDescs[logmsg];
|
||
|
nlassertex(desc.Log == logmsg, ("Internal error! Log message %d badly or not initialised!", logmsg));
|
||
|
nlassertex(desc.Params.size() == logparam, ("Internal error! Log parameter %d of message %d already initialised!", logmsg, logparam));
|
||
|
|
||
|
uint byteOffset = desc.ByteSize;
|
||
|
|
||
|
CLogParam param;
|
||
|
param.ByteSize = byteSize;
|
||
|
param.ByteOffset = byteOffset;
|
||
|
desc.Params.push_back(param);
|
||
|
|
||
|
desc.ByteSize += byteSize;
|
||
|
}
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Release Lib
|
||
|
*/
|
||
|
void CPDSLib::release()
|
||
|
{
|
||
|
}
|
||
|
|
||
|
|
||
|
/*
|
||
|
* PDS up callback
|
||
|
*/
|
||
|
void CPDSLib::onPDSUp(const std::string &serviceName, TServiceId sid, void *arg)
|
||
|
{
|
||
|
uint i;
|
||
|
for (i=0; i<_Libs.size(); ++i)
|
||
|
{
|
||
|
if (_Libs[i] != NULL)
|
||
|
{
|
||
|
_Libs[i]->_PDSConnected = true;
|
||
|
_Libs[i]->_PDSReady = false;
|
||
|
_Libs[i]->connect();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Connect service to PDS when up
|
||
|
*/
|
||
|
void CPDSLib::connect()
|
||
|
{
|
||
|
nlassert(_PDSConnected);
|
||
|
|
||
|
nlinfo("PDS is up, connecting...");
|
||
|
CMessage msgout("PD_INIT");
|
||
|
msgout.serial(_DatabaseId);
|
||
|
msgout.serial(_XmlDescription);
|
||
|
CUnifiedNetwork::getInstance()->send("PDS", msgout);
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* PDS is ready
|
||
|
*/
|
||
|
void CPDSLib::ready(uint32 lastUpdateId)
|
||
|
{
|
||
|
nlassert(_PDSConnected);
|
||
|
|
||
|
nlinfo("PDS is ready to work");
|
||
|
_PDSReady = true;
|
||
|
|
||
|
// set up next update Id
|
||
|
_UpdateId = lastUpdateId+1;
|
||
|
|
||
|
// flush previous messages that have already been written on hd
|
||
|
if ((sint)lastUpdateId >= 0)
|
||
|
{
|
||
|
flushAcknowledged(lastUpdateId);
|
||
|
}
|
||
|
|
||
|
// prepare PD string mapper for sheet ids
|
||
|
CPDStringMapper sheetIdStringMapper;
|
||
|
|
||
|
std::vector<NLMISC::CSheetId> sheetIds;
|
||
|
NLMISC::CSheetId::buildIdVector(sheetIds);
|
||
|
|
||
|
uint i;
|
||
|
for (i=0; i<sheetIds.size(); ++i)
|
||
|
{
|
||
|
NLMISC::CSheetId sheetId = sheetIds[i];
|
||
|
sheetIdStringMapper.setMapping(sheetId.toString(), sheetId.asInt());
|
||
|
}
|
||
|
|
||
|
CMessage msgout("PD_SHEETID_MAPPING");
|
||
|
msgout.serial(_DatabaseId);
|
||
|
msgout.serial(sheetIdStringMapper);
|
||
|
CUnifiedNetwork::getInstance()->send("PDS", msgout);
|
||
|
|
||
|
// if there are still some messages in queue, resend them
|
||
|
if (!_QueuedMessages.empty())
|
||
|
{
|
||
|
// sort them before, so they are in right order
|
||
|
// this is in theory useless, since new messages are inserted at end
|
||
|
std::sort(_QueuedMessages.begin(), _QueuedMessages.end(), CQueuedMessagePred());
|
||
|
|
||
|
TQueuedMessages::iterator it;
|
||
|
for (it=_QueuedMessages.begin(); it!=_QueuedMessages.end(); ++it)
|
||
|
{
|
||
|
uint32 updateId = (*it).first;
|
||
|
|
||
|
if (updateId != _UpdateId)
|
||
|
nlwarning("CPDSLib::ready(): update id '%d' is not consistent with message id '%d' to be sent", _UpdateId, updateId);
|
||
|
|
||
|
_UpdateId = updateId+1;
|
||
|
|
||
|
NLNET::CMessage* msg = (*it).second;
|
||
|
CUnifiedNetwork::getInstance()->send("PDS", *msg);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* PDS down callback
|
||
|
*/
|
||
|
void CPDSLib::onPDSDown(const std::string &serviceName, TServiceId sid, void *arg)
|
||
|
{
|
||
|
uint i;
|
||
|
for (i=0; i<_Libs.size(); ++i)
|
||
|
{
|
||
|
if (_Libs[i] != NULL)
|
||
|
{
|
||
|
_Libs[i]->_PDSConnected = false;
|
||
|
_Libs[i]->_PDSReady = false;
|
||
|
_Libs[i]->disconnect();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Disconnect service to PDS
|
||
|
*/
|
||
|
void CPDSLib::disconnect()
|
||
|
{
|
||
|
nlinfo("PDS disconnected, system should be halted");
|
||
|
}
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Tells if PDS is ready
|
||
|
*/
|
||
|
bool CPDSLib::PDSReady()
|
||
|
{
|
||
|
// check PDS connected, ready and is not too busy
|
||
|
return (!_UsePDS) || (_PDSReady && _PDSConnected && getMessageQueueSize() < PDMaxMsgQueueSize);
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Update
|
||
|
* Call regularly this method once a tick to apply changes to database
|
||
|
*/
|
||
|
void CPDSLib::update()
|
||
|
{
|
||
|
if (_UsePDS)
|
||
|
{
|
||
|
// check pds is ready to receive update
|
||
|
if (!PDSReady())
|
||
|
return;
|
||
|
|
||
|
if (_DbMessageQueue.empty())
|
||
|
return;
|
||
|
|
||
|
std::list<CDbMessageQueue>::iterator it;
|
||
|
for (it=_DbMessageQueue.begin(); it!=_DbMessageQueue.end(); ++it)
|
||
|
{
|
||
|
CDbMessageQueue& queue = (*it);
|
||
|
|
||
|
// create a new message
|
||
|
CMessage* msgupd = new CMessage("PD_UPDATE");
|
||
|
msgupd->serial(_DatabaseId);
|
||
|
msgupd->serial(_UpdateId);
|
||
|
_QueuedMessages.push_back(make_pair<uint32,CMessage*>(_UpdateId, msgupd));
|
||
|
++_UpdateId;
|
||
|
|
||
|
// serial queue
|
||
|
msgupd->serial(queue);
|
||
|
|
||
|
// send update to the pds
|
||
|
CUnifiedNetwork::getInstance()->send("PDS", *msgupd);
|
||
|
}
|
||
|
|
||
|
// clean up for next update
|
||
|
_DbMessageQueue.clear();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
uint i;
|
||
|
|
||
|
// save log (updates + string manager)
|
||
|
TTime ctime = CTime::getLocalTime();
|
||
|
|
||
|
//
|
||
|
CTimestamp timestamp;
|
||
|
timestamp.setToCurrent();
|
||
|
|
||
|
// setup update logs with full timestamp
|
||
|
if (!_DbMessageQueue.empty())
|
||
|
{
|
||
|
i = _UpdateLogs.size();
|
||
|
_UpdateLogs.resize(_DbMessageQueue.size());
|
||
|
|
||
|
for (; i<_UpdateLogs.size(); ++i)
|
||
|
{
|
||
|
_UpdateLogs[i].StartStamp = _PreviousTickDate;
|
||
|
_UpdateLogs[i].EndStamp = _PreviousTickDate;
|
||
|
}
|
||
|
|
||
|
_UpdateLogs.back().EndStamp = timestamp;
|
||
|
}
|
||
|
|
||
|
_PreviousTickDate = timestamp;
|
||
|
|
||
|
if (ctime - _PreviousLogSave > PDLogUpdate*1000 && !_DbMessageQueue.empty())
|
||
|
{
|
||
|
// std::string logDir = getRemoteLogDirectory();
|
||
|
// std::string logFile = _LogStartDate.toString()+"_0000.pd_log";
|
||
|
//
|
||
|
// // if don't use BS, create logdir directory tree
|
||
|
// if (!PDUseBS && !CFile::isDirectory(logDir))
|
||
|
// {
|
||
|
// if (!CFile::createDirectoryTree(logDir))
|
||
|
// {
|
||
|
// nlwarning("Failed to create log root directory '%s'", logDir.c_str());
|
||
|
// }
|
||
|
//
|
||
|
// if (!CFile::setRWAccess(logDir))
|
||
|
// {
|
||
|
// nlwarning("Failed, can't set RW access to directory '%s'", logDir.c_str());
|
||
|
// }
|
||
|
// }
|
||
|
//
|
||
|
// // map update logs to msg queues
|
||
|
// nlassert(_UpdateLogs.size() == _DbMessageQueue.size());
|
||
|
// for (i=0; i<_UpdateLogs.size(); ++i)
|
||
|
// _UpdateLogs[i].setUpdates(&(_DbMessageQueue.get(i)));
|
||
|
//
|
||
|
// if (PDUseBS)
|
||
|
// {
|
||
|
// CBackupMsgSaveFile msgBS( logDir+logFile, CBackupMsgSaveFile::AppendFileCheck, PDBsi );
|
||
|
// msgBS.DataMsg.serialCont(_UpdateLogs);
|
||
|
// PDBsi.append(msgBS);
|
||
|
// }
|
||
|
// else
|
||
|
// {
|
||
|
// COFile file;
|
||
|
// if (file.open(logDir + logFile))
|
||
|
// {
|
||
|
// file.serialCont(_UpdateLogs);
|
||
|
// }
|
||
|
// else
|
||
|
// {
|
||
|
// nlwarning("Failed to save log file '%s' for database %d", (logDir + logFile).c_str(), _DatabaseId);
|
||
|
// }
|
||
|
// }
|
||
|
|
||
|
// if (PDEnableStringLog && !_StringManager.logEmpty())
|
||
|
// {
|
||
|
// std::string logFile = timestamp.toString()+".string_log";
|
||
|
//
|
||
|
// if (PDUseBS)
|
||
|
// {
|
||
|
// bool success = false;
|
||
|
// CBackupMsgSaveFile msgBS( logDir+logFile, CBackupMsgSaveFile::SaveFileCheck, PDBsi );
|
||
|
// {
|
||
|
// COXml xml;
|
||
|
//
|
||
|
// if (xml.init(&msgBS.DataMsg))
|
||
|
// {
|
||
|
// _StringManager.storeLog(xml);
|
||
|
// success = true;
|
||
|
// }
|
||
|
// else
|
||
|
// {
|
||
|
// nlwarning("Failed to save string log file '%s' for database %d", msgBS.FileName.c_str(), _DatabaseId);
|
||
|
// }
|
||
|
// }
|
||
|
//
|
||
|
// if (success)
|
||
|
// {
|
||
|
// PDBsi.sendFile(msgBS);
|
||
|
// }
|
||
|
// }
|
||
|
// else
|
||
|
// {
|
||
|
// COFile file;
|
||
|
// {
|
||
|
// COXml xml;
|
||
|
// if (file.open(logDir + logFile) && xml.init(&file))
|
||
|
// {
|
||
|
// _StringManager.storeLog(xml);
|
||
|
// }
|
||
|
// else
|
||
|
// {
|
||
|
// nlwarning("Failed to save string log file '%s' for database %d", (logDir+logFile).c_str(), _DatabaseId);
|
||
|
// }
|
||
|
// }
|
||
|
// }
|
||
|
// }
|
||
|
|
||
|
_PreviousLogSave = ctime;
|
||
|
|
||
|
// clean up for next update
|
||
|
_DbMessageQueue.clear();
|
||
|
_UpdateLogs.clear();
|
||
|
|
||
|
}
|
||
|
|
||
|
if (timestamp.toTime()-_LogStartDate.toTime() > sint(PDLogFileTime.get()))
|
||
|
{
|
||
|
_LogStartDate = timestamp;
|
||
|
}
|
||
|
|
||
|
_DbMessageQueue.forceNextQueue();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Register Class Mapping
|
||
|
*/
|
||
|
void CPDSLib::registerClassMapping(TTableIndex table, const std::string& name)
|
||
|
{
|
||
|
nlassertex(_ClassMapName.find(name) == _ClassMapName.end(), ("Class Name '%s' already registered!", name.c_str()));
|
||
|
nlassertex(table >= _ClassMapId.size() || _ClassMapId[table].empty(), ("Class Id '%d' already registered!", table));
|
||
|
|
||
|
if (_ClassMapId.size() <= table)
|
||
|
_ClassMapId.resize(table+1);
|
||
|
|
||
|
_ClassMapName[name] = table;
|
||
|
_ClassMapId[table] = name;
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Map Class Name
|
||
|
*/
|
||
|
TTableIndex CPDSLib::mapClassName(const std::string& name)
|
||
|
{
|
||
|
std::map<std::string, TTableIndex>::iterator it = _ClassMapName.find(name);
|
||
|
if (it == _ClassMapName.end())
|
||
|
return INVALID_TABLE_INDEX;
|
||
|
|
||
|
return (*it).second;
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
* Map Class Name
|
||
|
*/
|
||
|
std::string CPDSLib::mapClassId(TTableIndex table)
|
||
|
{
|
||
|
if (_ClassMapId.size() <= table)
|
||
|
return "";
|
||
|
|
||
|
return _ClassMapId[table];
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
// Allocate a row in the PDS (key is provided for table that are keymapped
|
||
|
void CPDSLib::allocateRow(TTableIndex table, TRowIndex row, uint64 key)
|
||
|
{
|
||
|
// send to PDS alloc(table, row, key);
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s index=%u:%u, key=%016"NL_I64"X", "allocrow", table, row, key);
|
||
|
|
||
|
CDbMessage& msg = nextMessage((uint8)table, row);
|
||
|
msg.allocRow(key);
|
||
|
}
|
||
|
|
||
|
// Deallocate a row in the PDS
|
||
|
void CPDSLib::deallocateRow(TTableIndex table, TRowIndex row)
|
||
|
{
|
||
|
// send to PDS dealloc(table, row)
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s index=%u:%u", "deallocrow", table, row);
|
||
|
|
||
|
CDbMessage& msg = nextMessage((uint8)table, row);
|
||
|
msg.deallocRow();
|
||
|
}
|
||
|
|
||
|
// Allocate a row in the PDS (key is provided for table that are keymapped
|
||
|
void CPDSLib::allocateRow(TTableIndex table, TRowIndex row, uint64 key, const NLMISC::CEntityId& id)
|
||
|
{
|
||
|
// send to PDS alloc(table, row, key);
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s index=%u:%u, key=%016"NL_I64"X", "allocrow", table, row, key);
|
||
|
|
||
|
CDbMessage& msg = nextMessage((uint8)table, row);
|
||
|
msg.allocRow(key, id);
|
||
|
}
|
||
|
|
||
|
// Deallocate a row in the PDS
|
||
|
void CPDSLib::deallocateRow(TTableIndex table, TRowIndex row, const NLMISC::CEntityId& id)
|
||
|
{
|
||
|
// send to PDS dealloc(table, row)
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s index=%u:%u", "deallocrow", table, row);
|
||
|
|
||
|
CDbMessage& msg = nextMessage((uint8)table, row);
|
||
|
msg.deallocRow(id);
|
||
|
}
|
||
|
|
||
|
// Create an object of a given type
|
||
|
IPDBaseData *CPDSLib::create(TTableIndex table)
|
||
|
{
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s table=%u", "create", table);
|
||
|
|
||
|
if (table >= _Factories.size() || _Factories[table] == NULL)
|
||
|
{
|
||
|
nlwarning("CPDSLib: Unable to create object %d, factory not registered.", table);
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
IPDBaseData *obj = _Factories[table] ();
|
||
|
|
||
|
return obj;
|
||
|
}
|
||
|
|
||
|
// Fetch data
|
||
|
void CPDSLib::fetchPDSData(NLMISC::IStream &f)
|
||
|
{
|
||
|
TTableIndex table;
|
||
|
TRowIndex row;
|
||
|
|
||
|
f.serial(table, row);
|
||
|
|
||
|
IPDBaseData* obj = create(table);
|
||
|
if (obj == NULL)
|
||
|
{
|
||
|
nlerror("PDS sent an invalid object reference!");
|
||
|
}
|
||
|
|
||
|
setRowIndex(row, obj);
|
||
|
|
||
|
if (table >= _Fetchs.size() || _Fetchs[table] == NULL)
|
||
|
{
|
||
|
nlerror("PDS sent an invalid object reference! -- cannot fetch object data");
|
||
|
}
|
||
|
|
||
|
TPDFetch fetchFunc = _Fetchs[table];
|
||
|
|
||
|
fetchFunc(obj, f);
|
||
|
}
|
||
|
|
||
|
// Notify client of fetch failure
|
||
|
void CPDSLib::notifyFetchFailure(NLMISC::IStream &f)
|
||
|
{
|
||
|
TTableIndex table;
|
||
|
uint64 key;
|
||
|
|
||
|
f.serial(table, key);
|
||
|
|
||
|
if (table >= _FetchFailures.size() || _FetchFailures[table] == NULL)
|
||
|
{
|
||
|
nlwarning("CPDSLib: Unable to notify fetch failure of %d:%016"NL_I64"X, callback not set.", table, key);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// get notify function
|
||
|
TPDFetchFailure failureFunc = _FetchFailures[table];
|
||
|
|
||
|
// notify user
|
||
|
failureFunc(key);
|
||
|
}
|
||
|
|
||
|
|
||
|
// Fetch data
|
||
|
void CPDSLib::setupIndexAllocators(NLMISC::IStream &f)
|
||
|
{
|
||
|
std::vector<CIndexAllocator> allocators;
|
||
|
|
||
|
f.serialCont(allocators);
|
||
|
|
||
|
uint i;
|
||
|
for (i=0; i<allocators.size() && i<_Allocators.size(); ++i)
|
||
|
if (_Allocators[i] != NULL)
|
||
|
*(_Allocators[i]) = allocators[i];
|
||
|
}
|
||
|
|
||
|
|
||
|
// Flush acknowledged messages
|
||
|
void CPDSLib::flushAcknowledged(const std::vector<uint32>& ack)
|
||
|
{
|
||
|
uint i;
|
||
|
for (i=0; i<ack.size(); ++i)
|
||
|
{
|
||
|
uint32 msg = ack[i];
|
||
|
|
||
|
TQueuedMessages::iterator it;
|
||
|
for (it=_QueuedMessages.begin(); it!=_QueuedMessages.end() && (*it).first!=msg; )
|
||
|
;
|
||
|
|
||
|
if (it != _QueuedMessages.end())
|
||
|
{
|
||
|
delete (*it).second;
|
||
|
it = _QueuedMessages.erase(it);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
++it;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Flush acknowledged messages until message
|
||
|
void CPDSLib::flushAcknowledged(uint32 ack)
|
||
|
{
|
||
|
// remove all updates before or equal to the acknowledged message
|
||
|
TQueuedMessages::iterator it;
|
||
|
for (it=_QueuedMessages.begin(); it!=_QueuedMessages.end(); )
|
||
|
{
|
||
|
uint32 updateId = (*it).first;
|
||
|
if (updateId <= ack)
|
||
|
{
|
||
|
delete (*it).second;
|
||
|
it = _QueuedMessages.erase(it);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
++it;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
/*
|
||
|
* Get Message Queue Size
|
||
|
*/
|
||
|
uint CPDSLib::getMessageQueueSize()
|
||
|
{
|
||
|
uint size = 0;
|
||
|
uint i;
|
||
|
for (i=0; i<_QueuedMessages.size(); ++i)
|
||
|
size += _QueuedMessages[i].second->length();
|
||
|
|
||
|
return size;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
// Erase an object from its table and key
|
||
|
void CPDSLib::erase(TTableIndex table, uint64 key)
|
||
|
{
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s table=%u, key=%016"NL_I64"X", "erase", table, key);
|
||
|
}
|
||
|
|
||
|
// Load a row and its dependent rows from a mapped table
|
||
|
void CPDSLib::load(TTableIndex table, uint64 key)
|
||
|
{
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s table=%u, key=%016"NL_I64"X", "load", table, key);
|
||
|
|
||
|
if (!_UsePDS)
|
||
|
{
|
||
|
if (table >= _FetchFailures.size() || _FetchFailures[table] == NULL)
|
||
|
return;
|
||
|
|
||
|
// get notify function
|
||
|
TPDFetchFailure failureFunc = _FetchFailures[table];
|
||
|
|
||
|
// notify user
|
||
|
failureFunc(key);
|
||
|
}
|
||
|
|
||
|
CDbMessage& msg = nextMessage();
|
||
|
msg.loadRow(table, key);
|
||
|
}
|
||
|
|
||
|
// Release a row
|
||
|
void CPDSLib::release(TTableIndex table, TRowIndex row)
|
||
|
{
|
||
|
if (PDVerbose)
|
||
|
nlinfo("CPDSLib: %12s table=%u, row=%u", "release", table, row);
|
||
|
|
||
|
CDbMessage& msg = nextMessage((uint8)table, row);
|
||
|
msg.releaseRow();
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
// Add a string in pds
|
||
|
void CPDSLib::addString(const NLMISC::CEntityId& eid, const ucstring& str)
|
||
|
{
|
||
|
CDbMessage& msg = nextMessage();
|
||
|
msg.addString(eid.asUint64(), str);
|
||
|
}
|
||
|
|
||
|
// Unmap a string in pds
|
||
|
void CPDSLib::unmapString(const NLMISC::CEntityId& eid)
|
||
|
{
|
||
|
CDbMessage& msg = nextMessage();
|
||
|
msg.unmapString(eid.asUint64());
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
// Get Database Root directory
|
||
|
std::string CPDSLib::getPDSRootDirectory(const std::string& shard, bool wantRemoteDir)
|
||
|
{
|
||
|
std::string dir;
|
||
|
|
||
|
if (PDRootDirectory.get() == "")
|
||
|
{
|
||
|
if (wantRemoteDir)
|
||
|
dir = "pds/";
|
||
|
else
|
||
|
{
|
||
|
// Because we have removed the PD Bsi, but to keep the log analizer wroking,
|
||
|
// we fallback to the path set in the per shard BS because they are
|
||
|
// init with the same value
|
||
|
dir = Bsi.getLocalPath() + "pds/";
|
||
|
//dir = PDBsi.getLocalPath() + "pds/";
|
||
|
}
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
dir = CPath::standardizePath(PDRootDirectory);
|
||
|
}
|
||
|
|
||
|
std::string findstr("$shard");
|
||
|
uint p = dir.find(findstr);
|
||
|
|
||
|
if (p != std::string::npos)
|
||
|
dir.replace(p, findstr.size(), shard);
|
||
|
|
||
|
return dir;
|
||
|
}
|
||
|
|
||
|
// Get Database Root directory
|
||
|
std::string CPDSLib::getRootDirectory(uint databaseId, const std::string& shard, bool wantRemoteDir)
|
||
|
{
|
||
|
std::string dir = getPDSRootDirectory(shard, wantRemoteDir);
|
||
|
return NLMISC::toString("%s%08X/", dir.c_str(), databaseId);
|
||
|
}
|
||
|
|
||
|
|
||
|
// Get Logging directory
|
||
|
std::string CPDSLib::getLogDirectory(uint databaseId, const std::string& shard)
|
||
|
{
|
||
|
return getRootDirectory(databaseId, shard) + "logs/";
|
||
|
}
|
||
|
|
||
|
// Get Lib Logging directory
|
||
|
//std::string CPDSLib::getLogDirectory(const std::string& shard) const
|
||
|
//{
|
||
|
// return getLogDirectory(_DatabaseId, shard);
|
||
|
//}
|
||
|
|
||
|
// Get Logging directory
|
||
|
std::string CPDSLib::getRemoteLogDirectory(uint databaseId, const std::string& shard)
|
||
|
{
|
||
|
return getRootDirectory(databaseId, shard, true) + "logs/";
|
||
|
}
|
||
|
|
||
|
// Get Lib Logging directory
|
||
|
std::string CPDSLib::getRemoteLogDirectory(const std::string& shard) const
|
||
|
{
|
||
|
return getRemoteLogDirectory(_DatabaseId, shard);
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
NLMISC_DYNVARIABLE(uint, CurrentPDQueueSize, "Gives current size of PDS messages to be send")
|
||
|
{
|
||
|
// read or write the variable
|
||
|
if (get)
|
||
|
{
|
||
|
uint i;
|
||
|
uint size = 0;
|
||
|
for (i=0; i<CPDSLib::_Libs.size(); ++i)
|
||
|
if (CPDSLib::_Libs[i] != NULL)
|
||
|
size += CPDSLib::_Libs[i]->getMessageQueueSize();
|
||
|
*pointer = size;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
NLMISC_DYNVARIABLE(uint, EnqueuedPDMessages, "Tells number of messages enqueued and not yet sent")
|
||
|
{
|
||
|
// read or write the variable
|
||
|
if (get)
|
||
|
{
|
||
|
uint i;
|
||
|
uint total = 0;
|
||
|
for (i=0; i<CPDSLib::_Libs.size(); ++i)
|
||
|
if (CPDSLib::_Libs[i] != NULL)
|
||
|
total += CPDSLib::_Libs[i]->enqueuedMessages();
|
||
|
*pointer = total;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
//
|
||
|
//NLMISC_COMMAND(displayPDLogContent, "display pd_log file human readable content", "<databaseId> <filename>")
|
||
|
//{
|
||
|
// if (args.size() != 2)
|
||
|
// return false;
|
||
|
//
|
||
|
// uint databaseId = atoi(args[0].c_str());
|
||
|
// string filename = args[1];
|
||
|
//
|
||
|
// CPDSLib* lib = (databaseId < CPDSLib::_Libs.size() ? CPDSLib::_Libs[databaseId] : NULL);
|
||
|
//
|
||
|
// if (lib == NULL)
|
||
|
// return false;
|
||
|
//
|
||
|
// const std::string& xml = lib->getXmlDescription();
|
||
|
// CDBDescriptionParser desc;
|
||
|
//
|
||
|
// if (!desc.loadDescription((const uint8*)(xml.c_str())))
|
||
|
// {
|
||
|
// nlwarning("Failed to load database %d description", databaseId);
|
||
|
// return false;
|
||
|
// }
|
||
|
//
|
||
|
// if (!desc.buildColumns())
|
||
|
// {
|
||
|
// nlwarning("Failed to build database %d columns for description", databaseId);
|
||
|
// return false;
|
||
|
// }
|
||
|
//
|
||
|
// CIFile ifile;
|
||
|
// CUpdateLog updateLog;
|
||
|
//
|
||
|
// if (!ifile.open(filename))
|
||
|
// {
|
||
|
// nlwarning("Failed to open file '%s'", filename.c_str());
|
||
|
// return false;
|
||
|
// }
|
||
|
//
|
||
|
// try
|
||
|
// {
|
||
|
// updateLog.serial(ifile);
|
||
|
// }
|
||
|
// catch (Exception& e)
|
||
|
// {
|
||
|
// nlwarning("Failed to load file '%s': %s", filename.c_str(), e.what());
|
||
|
// return false;
|
||
|
// }
|
||
|
//
|
||
|
// updateLog.display(desc, log);
|
||
|
//
|
||
|
// return true;
|
||
|
//}
|
||
|
|
||
|
} // RY_PDS
|