add CronManager for pinging Community Server to update transactions, adjust existing classes for working together with CronManager

This commit is contained in:
Dario 2020-11-03 12:48:25 +01:00 committed by Ulf Gebhardt
parent 9d5ab8f683
commit a0056885bc
No known key found for this signature in database
GPG Key ID: 81308EFE29ABFEBD
9 changed files with 318 additions and 7 deletions

View File

@ -9,6 +9,7 @@
#include "SingletonManager/SessionManager.h"
#include "SingletonManager/EmailManager.h"
#include "SingletonManager/PendingTasksManager.h"
#include "SingletonManager/CronManager.h"
#include "controller/User.h"
@ -256,15 +257,20 @@ int Gradido_LoginServer::main(const std::vector<std::string>& args)
// load pending tasks not finished in last session
PendingTasksManager::getInstance()->load();
CronManager::getInstance()->init();
printf("[Gradido_LoginServer::main] started in %s\n", usedTime.string().data());
// wait for CTRL-C or kill
waitForTerminationRequest();
CronManager::getInstance()->stop();
// Stop the HTTPServer
srv.stop();
// Stop the json server
json_srv.stop();
ServerConfig::unload();
Poco::Net::uninitializeSSL();
// Optional: Delete all global objects allocated by libprotobuf.

View File

@ -0,0 +1,186 @@
#include "CronManager.h"
#include "../lib/JsonRequest.h"
#include "../ServerConfig.h"
CronManager::CronManager()
: mInitalized(false), mMainTimer(1000, 600000)
{
}
CronManager::~CronManager()
{
mMainWorkMutex.lock();
mMainTimer.stop();
mInitalized = false;
mMainWorkMutex.unlock();
}
CronManager* CronManager::getInstance()
{
static CronManager one;
return &one;
}
bool CronManager::init(long defaultPeriodicIntervallMilliseconds/* = 600000*/)
{
Poco::ScopedLock<Poco::FastMutex> _lock(mMainWorkMutex);
mInitalized = true;
controller::NodeServer::load(model::table::NODE_SERVER_GRADIDO_COMMUNITY);
mDefaultIntervalMilliseconds = defaultPeriodicIntervallMilliseconds;
mMainTimer.setPeriodicInterval(defaultPeriodicIntervallMilliseconds);
Poco::TimerCallback<CronManager> callback(*this, &CronManager::runUpdateStep);
mMainTimer.start(callback);
return true;
}
void CronManager::stop()
{
Poco::ScopedLock<Poco::FastMutex> _lock(mMainWorkMutex);
mInitalized = false;
mMainTimer.stop();
}
void CronManager::runUpdateStep(Poco::Timer& timer)
{
auto current = Poco::DateTime();
//printf("%s [CronManager::runUpdateStep] \n", Poco::DateTimeFormatter::format(current, "%d.%m.%y %H:%M:%S.%i").data());
Poco::ScopedLock<Poco::FastMutex> _lock(mMainWorkMutex);
if (!mInitalized) {
mMainWorkMutex.unlock();
return;
}
mNodeServersToPingMutex.lock();
for (auto it = mNodeServersToPing.begin(); it != mNodeServersToPing.end(); it++)
{
// TODO: Make sure that task not already running, for example if community server needs more time for processing that between calls
// or with schedule update run to short time between calls
UniLib::controller::TaskPtr ping_node_server_task(new PingServerTask(*it));
ping_node_server_task->scheduleTask(ping_node_server_task);
}
mNodeServersToPingMutex.unlock();
mTimestampsMutex.lock();
//printf("update timestamp sizes: %d\n", mUpdateTimestamps.size());
if (mUpdateTimestamps.size() > 0) {
while (mUpdateTimestamps.size() > 0 && mUpdateTimestamps.front() < Poco::Timestamp()) {
// printf("remove update time in past: %d\n", mUpdateTimestamps.front().epochTime());
mUpdateTimestamps.pop_front();
}
Poco::Timespan next_run = mUpdateTimestamps.front() - Poco::Timestamp();
//printf("timer restart called with: %d\n", next_run.seconds());
mMainTimer.setPeriodicInterval(next_run.milliseconds());
//mMainTimer.restart(next_run.milliseconds());
mUpdateTimestamps.pop_front();
}
else {
if (mMainTimer.getPeriodicInterval() != mDefaultIntervalMilliseconds) {
//printf("reset to default interval\n");
mMainTimer.setPeriodicInterval(mDefaultIntervalMilliseconds);
//mMainTimer.restart(mDefaultIntervalMilliseconds);
}
}
mTimestampsMutex.unlock();
//printf("[CronManager::runUpdateStep] end\n");
}
void CronManager::scheduleUpdateRun(Poco::Timespan timespanInFuture)
{
Poco::Timestamp timestamp;
timestamp += timespanInFuture;
mTimestampsMutex.lock();
//printf("[CronManager::scheduleUpdateRun] push:\n%d\n", timestamp.epochTime());
mUpdateTimestamps.push_back(timestamp);
mUpdateTimestamps.sort();
auto frontTimestamp = mUpdateTimestamps.front();
auto backTimestamp = mUpdateTimestamps.back();
//printf("[CronManager::scheduleUpdateRun] front timestamp and back timestamp:\n%d\n%d\n", frontTimestamp.epochTime(), backTimestamp.epochTime());
//printf("current: \n%d\n", Poco::Timestamp().epochTime());
Poco::Timespan next_run = mUpdateTimestamps.front() - Poco::Timestamp();
//printf("next run:\n%d\n", next_run.seconds());
if (next_run.seconds() > 0 && mMainTimer.getPeriodicInterval() == mDefaultIntervalMilliseconds) {
if (mMainWorkMutex.tryLock()) {
mMainTimer.restart(next_run.milliseconds());
mUpdateTimestamps.pop_front();
mMainWorkMutex.unlock();
}
}
mTimestampsMutex.unlock();
//printf("[CronManager::scheduleUpdateRun] end\n");
}
void CronManager::addNodeServerToPing(Poco::AutoPtr<controller::NodeServer> nodeServer)
{
if (nodeServer.isNull() || !nodeServer->getModel()) {
return;
}
if (isNodeServerInList(nodeServer)) {
return;
}
mNodeServersToPingMutex.lock();
mNodeServersToPing.push_back(nodeServer);
mNodeServersToPingMutex.unlock();
}
void CronManager::removeNodeServerToPing(Poco::AutoPtr<controller::NodeServer> nodeServer)
{
if (nodeServer.isNull() || !nodeServer->getModel()) {
return;
}
mNodeServersToPingMutex.lock();
int node_server_id = nodeServer->getModel()->getID();
for (auto it = mNodeServersToPing.begin(); it != mNodeServersToPing.end(); it++) {
if ((*it)->getModel()->getID() == node_server_id) {
mNodeServersToPing.erase(it);
break;
}
}
mNodeServersToPingMutex.unlock();
}
bool CronManager::isNodeServerInList(Poco::AutoPtr<controller::NodeServer> nodeServer)
{
bool result = false;
mNodeServersToPingMutex.lock();
int node_server_id = nodeServer->getModel()->getID();
for (auto it = mNodeServersToPing.begin(); it != mNodeServersToPing.end(); it++) {
if ((*it)->getModel()->getID() == node_server_id) {
result = true;
break;
}
}
mNodeServersToPingMutex.unlock();
return false;
}
// ***********************************************************************************************************
PingServerTask::PingServerTask(Poco::AutoPtr<controller::NodeServer> nodeServer)
: CPUTask(ServerConfig::g_CPUScheduler), mNodeServer(nodeServer)
{
}
PingServerTask::~PingServerTask()
{
}
int PingServerTask::run()
{
auto current = Poco::DateTime();
if (model::table::NODE_SERVER_GRADIDO_COMMUNITY == mNodeServer->getModel()->getNodeServerType()) {
std::string url_port = mNodeServer->getModel()->getUrlWithPort();
//printf("%s [PingServerTask::run] call update for %s\n", Poco::DateTimeFormatter::format(current, "%d.%m.%y %H:%M:%S.%i").data(), url_port.data());
auto json_request = mNodeServer->createJsonRequest();
json_request.request("updateReadNode");
}
return 0;
}

View File

@ -0,0 +1,66 @@
#ifndef __GRADIDO_LOGIN_SERVER_SINGLETON_MANAGER_CRON_MANAGER_H
#define __GRADIDO_LOGIN_SERVER_SINGLETON_MANAGER_CRON_MANAGER_H
#include "Poco/Timer.h"
#include "../controller/NodeServer.h"
#include "../tasks/CPUTask.h"
/*!
* \author: Dario Rekowski
*
* \date: 2020-11-03
*
* \brief: Manager for "Cron"-like Tasks.
*
* Ping for example community server to get new blocks from nodes
* or Hedera Tasks to (re)try receiving Transaction Receipts
*
*/
class CronManager
{
public:
~CronManager();
static CronManager* getInstance();
bool init(long defaultPeriodicIntervallMilliseconds = 60000);
void stop();
void runUpdateStep(Poco::Timer& timer);
void scheduleUpdateRun(Poco::Timespan timespanInFuture);
void addNodeServerToPing(Poco::AutoPtr<controller::NodeServer> nodeServer);
void removeNodeServerToPing(Poco::AutoPtr<controller::NodeServer> nodeServer);
protected:
CronManager();
bool isNodeServerInList(Poco::AutoPtr<controller::NodeServer> nodeServer);
bool mInitalized;
Poco::Timer mMainTimer;
std::list<Poco::AutoPtr<controller::NodeServer>> mNodeServersToPing;
std::list<Poco::Timestamp> mUpdateTimestamps;
Poco::FastMutex mNodeServersToPingMutex;
Poco::FastMutex mMainWorkMutex;
Poco::FastMutex mTimestampsMutex;
long mDefaultIntervalMilliseconds;
};
class PingServerTask : public UniLib::controller::CPUTask
{
public:
PingServerTask(Poco::AutoPtr<controller::NodeServer> nodeServer);
virtual ~PingServerTask();
const char* getResourceType() const { return "PingServerTask"; }
int run();
protected:
Poco::AutoPtr<controller::NodeServer> mNodeServer;
};
#endif //__GRADIDO_LOGIN_SERVER_SINGLETON_MANAGER_CRON_MANAGER_H

View File

@ -1,6 +1,7 @@
#include "NodeServer.h"
#include "../SingletonManager/ErrorManager.h"
#include "../SingletonManager/ConnectionManager.h"
#include "../SingletonManager/CronManager.h"
#include "Poco/RegularExpression.h"
namespace controller {
@ -14,12 +15,22 @@ namespace controller {
return url.substr(protocol.size()) + ":" + std::to_string(port);
}
std::string NodeServerConnection::getUri() const
{
std::string protocol;
g_filterHttp.extract(url, protocol);
return url.substr(protocol.size());
}
NodeServer::NodeServer(model::table::NodeServer* dbModel)
{
mDBModel = dbModel;
if (model::table::NODE_SERVER_GRADIDO_COMMUNITY == dbModel->getNodeServerType()) {
CronManager::getInstance()->addNodeServerToPing(Poco::AutoPtr<controller::NodeServer>(this, true));
}
}
NodeServer::~NodeServer()
@ -27,6 +38,15 @@ namespace controller {
}
bool NodeServer::deleteFromDB()
{
auto result = mDBModel->deleteFromDB();
if (result && model::table::NODE_SERVER_GRADIDO_COMMUNITY == getModel()->getNodeServerType()) {
CronManager::getInstance()->removeNodeServerToPing(Poco::AutoPtr<controller::NodeServer>(this, true));
}
return result;
}
Poco::AutoPtr<NodeServer> NodeServer::create(const std::string& url, int port, int groupId, model::table::NodeServerType type, int nodeHederaId)
{
auto db = new model::table::NodeServer(url, port, groupId, type, nodeHederaId);
@ -43,7 +63,7 @@ namespace controller {
{
node_server_list = db->loadFromDB<model::table::NodeServerType, model::table::NodeServerTuple>("server_type", type, 4);
}
else if (type == model::table::NODE_SERVER_GRADIDO_NODE)
else if (type == model::table::NODE_SERVER_GRADIDO_NODE || type == model::table::NODE_SERVER_GRADIDO_COMMUNITY)
{
if (group_id)
{
@ -162,4 +182,11 @@ namespace controller {
}
JsonRequest NodeServer::createJsonRequest()
{
auto model = getModel();
NodeServerConnection connection(model->getUrl(), model->getPort());
return JsonRequest(connection.getUri(), model->getPort());
}
}

View File

@ -3,6 +3,7 @@
#include "../model/table/NodeServer.h"
#include "../controller/HederaId.h"
#include "../lib/JsonRequest.h"
#include "Poco/SharedPtr.h"
@ -20,6 +21,7 @@ namespace controller {
// without http:// or https://
std::string getUriWithPort() const;
std::string getUri() const;
bool isValid() { return url != "" && port; }
std::string url;
@ -38,17 +40,20 @@ namespace controller {
static Poco::AutoPtr<NodeServer> create(const std::string& url, int port, int groupId, model::table::NodeServerType type, int nodeHederaId);
//! \param group_id is zero take everyone
static std::vector<Poco::AutoPtr<NodeServer>> load(model::table::NodeServerType type, int group_id = 0);
static std::vector<Poco::AutoPtr<NodeServer>> listAll();
// pick server randomly
static NodeServerConnection pick(model::table::HederaNetworkType type, int group_id = 0);
static NodeServerConnection pick(model::table::NodeServerType type, int group_id = 0);
inline bool deleteFromDB() { return mDBModel->deleteFromDB(); }
bool deleteFromDB();
inline Poco::AutoPtr<model::table::NodeServer> getModel() { return _getModel<model::table::NodeServer>(); }
inline void setHederaId(Poco::AutoPtr<controller::HederaId> hederaId) { mHederaID = hederaId; }
inline Poco::AutoPtr<controller::HederaId> getHederaId() { return mHederaID; }
JsonRequest createJsonRequest();
protected:
NodeServer(model::table::NodeServer* dbModel);
Poco::AutoPtr<controller::HederaId> mHederaID;

View File

@ -15,6 +15,9 @@
JsonRequest::JsonRequest(const std::string& serverHost, int serverPort)
: mServerHost(serverHost), mServerPort(serverPort)
{
if (mServerHost.data()[mServerHost.size() - 1] == '/') {
mServerHost = mServerHost.substr(0, mServerHost.size() - 1);
}
}
@ -34,24 +37,26 @@ JsonRequestReturn JsonRequest::request(const char* methodName, const Poco::JSON:
// TODO: adding port into ServerConfig
try {
Profiler phpRequestTime;
Poco::Net::HTTPSClientSession httpsClientSession(mServerHost, mServerPort);
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, "/JsonRequestHandler");
request.setChunkedTransferEncoding(true);
std::ostream& requestStream = httpsClientSession.sendRequest(request);
requestJson.stringify(requestStream);
std::ostream& request_stream = httpsClientSession.sendRequest(request);
requestJson.stringify(request_stream);
Poco::Net::HTTPResponse response;
std::istream& request_stream = httpsClientSession.receiveResponse(response);
std::istream& response_stream = httpsClientSession.receiveResponse(response);
// debugging answer
std::stringstream responseStringStream;
for (std::string line; std::getline(request_stream, line); ) {
for (std::string line; std::getline(response_stream, line); ) {
responseStringStream << line << std::endl;
}
Poco::Logger& speedLog = Poco::Logger::get("SpeedLog");
speedLog.information("[%s] php server time: %s", methodName, phpRequestTime.string());
std::string method_name(methodName);
speedLog.information("[%s] php server time: %s", method_name, phpRequestTime.string());
// extract parameter from request
Poco::JSON::Parser jsonParser;
@ -113,6 +118,14 @@ JsonRequestReturn JsonRequest::request(const char* methodName, const Poco::Net::
}
return request(methodName, requestJson);
}
JsonRequestReturn JsonRequest::request(const char* methodName)
{
Poco::JSON::Object requestJson;
requestJson.set("method", methodName);
return request(methodName, requestJson);
}
#include "Poco/JSON/Stringifier.h"
JsonRequestReturn JsonRequest::requestGRPCRelay(const Poco::Net::NameValueCollection& payload)
{

View File

@ -31,6 +31,7 @@ public:
JsonRequestReturn request(const char* methodName, const Poco::Net::NameValueCollection& payload);
JsonRequestReturn request(const char* methodName, const Poco::JSON::Object& payload);
JsonRequestReturn request(const char* methodName);
JsonRequestReturn requestGRPCRelay(const Poco::Net::NameValueCollection& payload);
protected:

View File

@ -2,6 +2,7 @@
#include "../../SingletonManager/ErrorManager.h"
#include "../../SingletonManager/PendingTasksManager.h"
#include "../../SingletonManager/LanguageManager.h"
#include "../../SingletonManager/CronManager.h"
#include "../../ServerConfig.h"
#include "../../controller/HederaId.h"
@ -550,6 +551,8 @@ namespace model {
// simply assume if transaction was sended to hedera without error, it was also accepted from gradido node
// TODO: later check, but now I haven't any way to communicate with the gradido node
mTransactionBody->getTransactionBase()->transactionAccepted(getUser());
// trigger community server update in 5 seconds
CronManager::getInstance()->scheduleUpdateRun(Poco::Timespan(5000, 0));
return 1;
}

View File

@ -38,6 +38,10 @@ namespace model {
inline void setLastLiveSign(Poco::DateTime lastLiveSign) { UNIQUE_LOCK; mLastLiveSign = lastLiveSign; }
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// !ATTENTION! if using set port or set url review CronManager code
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
inline std::string getUrl() const { return mUrl; }
inline int getPort() const { return mPort; }
inline std::string getUrlWithPort() const { return mUrl + ":" + std::to_string(mPort); }