From a0056885bcfb96c78f9409d600e438c534432750 Mon Sep 17 00:00:00 2001 From: Dario Date: Tue, 3 Nov 2020 12:48:25 +0100 Subject: [PATCH] add CronManager for pinging Community Server to update transactions, adjust existing classes for working together with CronManager --- src/cpp/Gradido_LoginServer.cpp | 6 + src/cpp/SingletonManager/CronManager.cpp | 186 +++++++++++++++++++++++ src/cpp/SingletonManager/CronManager.h | 66 ++++++++ src/cpp/controller/NodeServer.cpp | 29 +++- src/cpp/controller/NodeServer.h | 7 +- src/cpp/lib/JsonRequest.cpp | 23 ++- src/cpp/lib/JsonRequest.h | 1 + src/cpp/model/gradido/Transaction.cpp | 3 + src/cpp/model/table/NodeServer.h | 4 + 9 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 src/cpp/SingletonManager/CronManager.cpp create mode 100644 src/cpp/SingletonManager/CronManager.h diff --git a/src/cpp/Gradido_LoginServer.cpp b/src/cpp/Gradido_LoginServer.cpp index 9bb861ff4..aa5652247 100644 --- a/src/cpp/Gradido_LoginServer.cpp +++ b/src/cpp/Gradido_LoginServer.cpp @@ -9,6 +9,7 @@ #include "SingletonManager/SessionManager.h" #include "SingletonManager/EmailManager.h" #include "SingletonManager/PendingTasksManager.h" +#include "SingletonManager/CronManager.h" #include "controller/User.h" @@ -256,15 +257,20 @@ int Gradido_LoginServer::main(const std::vector& args) // load pending tasks not finished in last session PendingTasksManager::getInstance()->load(); + CronManager::getInstance()->init(); + printf("[Gradido_LoginServer::main] started in %s\n", usedTime.string().data()); // wait for CTRL-C or kill waitForTerminationRequest(); + CronManager::getInstance()->stop(); + // Stop the HTTPServer srv.stop(); // Stop the json server json_srv.stop(); + ServerConfig::unload(); Poco::Net::uninitializeSSL(); // Optional: Delete all global objects allocated by libprotobuf. diff --git a/src/cpp/SingletonManager/CronManager.cpp b/src/cpp/SingletonManager/CronManager.cpp new file mode 100644 index 000000000..41e22044b --- /dev/null +++ b/src/cpp/SingletonManager/CronManager.cpp @@ -0,0 +1,186 @@ +#include "CronManager.h" +#include "../lib/JsonRequest.h" + +#include "../ServerConfig.h" + + + +CronManager::CronManager() + : mInitalized(false), mMainTimer(1000, 600000) +{ + +} + +CronManager::~CronManager() +{ + mMainWorkMutex.lock(); + mMainTimer.stop(); + mInitalized = false; + mMainWorkMutex.unlock(); +} + +CronManager* CronManager::getInstance() +{ + static CronManager one; + return &one; +} + +bool CronManager::init(long defaultPeriodicIntervallMilliseconds/* = 600000*/) +{ + Poco::ScopedLock _lock(mMainWorkMutex); + mInitalized = true; + controller::NodeServer::load(model::table::NODE_SERVER_GRADIDO_COMMUNITY); + + mDefaultIntervalMilliseconds = defaultPeriodicIntervallMilliseconds; + mMainTimer.setPeriodicInterval(defaultPeriodicIntervallMilliseconds); + Poco::TimerCallback callback(*this, &CronManager::runUpdateStep); + mMainTimer.start(callback); + + return true; +} + +void CronManager::stop() +{ + Poco::ScopedLock _lock(mMainWorkMutex); + mInitalized = false; + mMainTimer.stop(); +} + +void CronManager::runUpdateStep(Poco::Timer& timer) +{ + auto current = Poco::DateTime(); + //printf("%s [CronManager::runUpdateStep] \n", Poco::DateTimeFormatter::format(current, "%d.%m.%y %H:%M:%S.%i").data()); + Poco::ScopedLock _lock(mMainWorkMutex); + if (!mInitalized) { + mMainWorkMutex.unlock(); + return; + } + mNodeServersToPingMutex.lock(); + for (auto it = mNodeServersToPing.begin(); it != mNodeServersToPing.end(); it++) + { + // TODO: Make sure that task not already running, for example if community server needs more time for processing that between calls + // or with schedule update run to short time between calls + UniLib::controller::TaskPtr ping_node_server_task(new PingServerTask(*it)); + ping_node_server_task->scheduleTask(ping_node_server_task); + } + mNodeServersToPingMutex.unlock(); + + mTimestampsMutex.lock(); + //printf("update timestamp sizes: %d\n", mUpdateTimestamps.size()); + if (mUpdateTimestamps.size() > 0) { + while (mUpdateTimestamps.size() > 0 && mUpdateTimestamps.front() < Poco::Timestamp()) { + // printf("remove update time in past: %d\n", mUpdateTimestamps.front().epochTime()); + mUpdateTimestamps.pop_front(); + } + Poco::Timespan next_run = mUpdateTimestamps.front() - Poco::Timestamp(); + //printf("timer restart called with: %d\n", next_run.seconds()); + mMainTimer.setPeriodicInterval(next_run.milliseconds()); + //mMainTimer.restart(next_run.milliseconds()); + mUpdateTimestamps.pop_front(); + } + else { + if (mMainTimer.getPeriodicInterval() != mDefaultIntervalMilliseconds) { + //printf("reset to default interval\n"); + mMainTimer.setPeriodicInterval(mDefaultIntervalMilliseconds); + //mMainTimer.restart(mDefaultIntervalMilliseconds); + } + } + mTimestampsMutex.unlock(); + //printf("[CronManager::runUpdateStep] end\n"); +} + +void CronManager::scheduleUpdateRun(Poco::Timespan timespanInFuture) +{ + Poco::Timestamp timestamp; + timestamp += timespanInFuture; + + mTimestampsMutex.lock(); + //printf("[CronManager::scheduleUpdateRun] push:\n%d\n", timestamp.epochTime()); + mUpdateTimestamps.push_back(timestamp); + mUpdateTimestamps.sort(); + auto frontTimestamp = mUpdateTimestamps.front(); + auto backTimestamp = mUpdateTimestamps.back(); + //printf("[CronManager::scheduleUpdateRun] front timestamp and back timestamp:\n%d\n%d\n", frontTimestamp.epochTime(), backTimestamp.epochTime()); + //printf("current: \n%d\n", Poco::Timestamp().epochTime()); + Poco::Timespan next_run = mUpdateTimestamps.front() - Poco::Timestamp(); + //printf("next run:\n%d\n", next_run.seconds()); + if (next_run.seconds() > 0 && mMainTimer.getPeriodicInterval() == mDefaultIntervalMilliseconds) { + if (mMainWorkMutex.tryLock()) { + mMainTimer.restart(next_run.milliseconds()); + mUpdateTimestamps.pop_front(); + mMainWorkMutex.unlock(); + } + } + + mTimestampsMutex.unlock(); + //printf("[CronManager::scheduleUpdateRun] end\n"); +} + + +void CronManager::addNodeServerToPing(Poco::AutoPtr nodeServer) +{ + if (nodeServer.isNull() || !nodeServer->getModel()) { + return; + } + if (isNodeServerInList(nodeServer)) { + return; + } + mNodeServersToPingMutex.lock(); + mNodeServersToPing.push_back(nodeServer); + mNodeServersToPingMutex.unlock(); + +} +void CronManager::removeNodeServerToPing(Poco::AutoPtr nodeServer) +{ + if (nodeServer.isNull() || !nodeServer->getModel()) { + return; + } + mNodeServersToPingMutex.lock(); + int node_server_id = nodeServer->getModel()->getID(); + for (auto it = mNodeServersToPing.begin(); it != mNodeServersToPing.end(); it++) { + if ((*it)->getModel()->getID() == node_server_id) { + mNodeServersToPing.erase(it); + break; + } + } + mNodeServersToPingMutex.unlock(); +} + +bool CronManager::isNodeServerInList(Poco::AutoPtr nodeServer) +{ + bool result = false; + mNodeServersToPingMutex.lock(); + int node_server_id = nodeServer->getModel()->getID(); + for (auto it = mNodeServersToPing.begin(); it != mNodeServersToPing.end(); it++) { + if ((*it)->getModel()->getID() == node_server_id) { + result = true; + break; + } + } + mNodeServersToPingMutex.unlock(); + return false; +} + +// *********************************************************************************************************** +PingServerTask::PingServerTask(Poco::AutoPtr nodeServer) + : CPUTask(ServerConfig::g_CPUScheduler), mNodeServer(nodeServer) +{ + +} + +PingServerTask::~PingServerTask() +{ + +} + +int PingServerTask::run() +{ + auto current = Poco::DateTime(); + if (model::table::NODE_SERVER_GRADIDO_COMMUNITY == mNodeServer->getModel()->getNodeServerType()) { + std::string url_port = mNodeServer->getModel()->getUrlWithPort(); + //printf("%s [PingServerTask::run] call update for %s\n", Poco::DateTimeFormatter::format(current, "%d.%m.%y %H:%M:%S.%i").data(), url_port.data()); + auto json_request = mNodeServer->createJsonRequest(); + json_request.request("updateReadNode"); + } + return 0; +} \ No newline at end of file diff --git a/src/cpp/SingletonManager/CronManager.h b/src/cpp/SingletonManager/CronManager.h new file mode 100644 index 000000000..ba6305d3c --- /dev/null +++ b/src/cpp/SingletonManager/CronManager.h @@ -0,0 +1,66 @@ +#ifndef __GRADIDO_LOGIN_SERVER_SINGLETON_MANAGER_CRON_MANAGER_H +#define __GRADIDO_LOGIN_SERVER_SINGLETON_MANAGER_CRON_MANAGER_H + +#include "Poco/Timer.h" +#include "../controller/NodeServer.h" +#include "../tasks/CPUTask.h" + + +/*! + * \author: Dario Rekowski + * + * \date: 2020-11-03 + * + * \brief: Manager for "Cron"-like Tasks. + * + * Ping for example community server to get new blocks from nodes + * or Hedera Tasks to (re)try receiving Transaction Receipts + * + */ +class CronManager +{ +public: + ~CronManager(); + + static CronManager* getInstance(); + + bool init(long defaultPeriodicIntervallMilliseconds = 60000); + void stop(); + + void runUpdateStep(Poco::Timer& timer); + void scheduleUpdateRun(Poco::Timespan timespanInFuture); + + + void addNodeServerToPing(Poco::AutoPtr nodeServer); + void removeNodeServerToPing(Poco::AutoPtr nodeServer); + +protected: + CronManager(); + + bool isNodeServerInList(Poco::AutoPtr nodeServer); + bool mInitalized; + + Poco::Timer mMainTimer; + std::list> mNodeServersToPing; + std::list mUpdateTimestamps; + Poco::FastMutex mNodeServersToPingMutex; + Poco::FastMutex mMainWorkMutex; + Poco::FastMutex mTimestampsMutex; + long mDefaultIntervalMilliseconds; +}; + +class PingServerTask : public UniLib::controller::CPUTask +{ +public: + PingServerTask(Poco::AutoPtr nodeServer); + virtual ~PingServerTask(); + + const char* getResourceType() const { return "PingServerTask"; } + + int run(); + +protected: + Poco::AutoPtr mNodeServer; +}; + +#endif //__GRADIDO_LOGIN_SERVER_SINGLETON_MANAGER_CRON_MANAGER_H \ No newline at end of file diff --git a/src/cpp/controller/NodeServer.cpp b/src/cpp/controller/NodeServer.cpp index 742f57bea..29494d180 100644 --- a/src/cpp/controller/NodeServer.cpp +++ b/src/cpp/controller/NodeServer.cpp @@ -1,6 +1,7 @@ #include "NodeServer.h" #include "../SingletonManager/ErrorManager.h" #include "../SingletonManager/ConnectionManager.h" +#include "../SingletonManager/CronManager.h" #include "Poco/RegularExpression.h" namespace controller { @@ -14,12 +15,22 @@ namespace controller { return url.substr(protocol.size()) + ":" + std::to_string(port); } + std::string NodeServerConnection::getUri() const + { + std::string protocol; + g_filterHttp.extract(url, protocol); + return url.substr(protocol.size()); + } + NodeServer::NodeServer(model::table::NodeServer* dbModel) { mDBModel = dbModel; + if (model::table::NODE_SERVER_GRADIDO_COMMUNITY == dbModel->getNodeServerType()) { + CronManager::getInstance()->addNodeServerToPing(Poco::AutoPtr(this, true)); + } } NodeServer::~NodeServer() @@ -27,6 +38,15 @@ namespace controller { } + bool NodeServer::deleteFromDB() + { + auto result = mDBModel->deleteFromDB(); + if (result && model::table::NODE_SERVER_GRADIDO_COMMUNITY == getModel()->getNodeServerType()) { + CronManager::getInstance()->removeNodeServerToPing(Poco::AutoPtr(this, true)); + } + return result; + } + Poco::AutoPtr NodeServer::create(const std::string& url, int port, int groupId, model::table::NodeServerType type, int nodeHederaId) { auto db = new model::table::NodeServer(url, port, groupId, type, nodeHederaId); @@ -43,7 +63,7 @@ namespace controller { { node_server_list = db->loadFromDB("server_type", type, 4); } - else if (type == model::table::NODE_SERVER_GRADIDO_NODE) + else if (type == model::table::NODE_SERVER_GRADIDO_NODE || type == model::table::NODE_SERVER_GRADIDO_COMMUNITY) { if (group_id) { @@ -162,4 +182,11 @@ namespace controller { } + JsonRequest NodeServer::createJsonRequest() + { + auto model = getModel(); + NodeServerConnection connection(model->getUrl(), model->getPort()); + return JsonRequest(connection.getUri(), model->getPort()); + } + } \ No newline at end of file diff --git a/src/cpp/controller/NodeServer.h b/src/cpp/controller/NodeServer.h index cdaf01c21..28b06d1fd 100644 --- a/src/cpp/controller/NodeServer.h +++ b/src/cpp/controller/NodeServer.h @@ -3,6 +3,7 @@ #include "../model/table/NodeServer.h" #include "../controller/HederaId.h" +#include "../lib/JsonRequest.h" #include "Poco/SharedPtr.h" @@ -20,6 +21,7 @@ namespace controller { // without http:// or https:// std::string getUriWithPort() const; + std::string getUri() const; bool isValid() { return url != "" && port; } std::string url; @@ -38,17 +40,20 @@ namespace controller { static Poco::AutoPtr create(const std::string& url, int port, int groupId, model::table::NodeServerType type, int nodeHederaId); + //! \param group_id is zero take everyone static std::vector> load(model::table::NodeServerType type, int group_id = 0); static std::vector> listAll(); // pick server randomly static NodeServerConnection pick(model::table::HederaNetworkType type, int group_id = 0); static NodeServerConnection pick(model::table::NodeServerType type, int group_id = 0); - inline bool deleteFromDB() { return mDBModel->deleteFromDB(); } + bool deleteFromDB(); inline Poco::AutoPtr getModel() { return _getModel(); } inline void setHederaId(Poco::AutoPtr hederaId) { mHederaID = hederaId; } inline Poco::AutoPtr getHederaId() { return mHederaID; } + + JsonRequest createJsonRequest(); protected: NodeServer(model::table::NodeServer* dbModel); Poco::AutoPtr mHederaID; diff --git a/src/cpp/lib/JsonRequest.cpp b/src/cpp/lib/JsonRequest.cpp index e4c30caaf..49528abfd 100644 --- a/src/cpp/lib/JsonRequest.cpp +++ b/src/cpp/lib/JsonRequest.cpp @@ -15,6 +15,9 @@ JsonRequest::JsonRequest(const std::string& serverHost, int serverPort) : mServerHost(serverHost), mServerPort(serverPort) { + if (mServerHost.data()[mServerHost.size() - 1] == '/') { + mServerHost = mServerHost.substr(0, mServerHost.size() - 1); + } } @@ -34,24 +37,26 @@ JsonRequestReturn JsonRequest::request(const char* methodName, const Poco::JSON: // TODO: adding port into ServerConfig try { Profiler phpRequestTime; + Poco::Net::HTTPSClientSession httpsClientSession(mServerHost, mServerPort); Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, "/JsonRequestHandler"); request.setChunkedTransferEncoding(true); - std::ostream& requestStream = httpsClientSession.sendRequest(request); - requestJson.stringify(requestStream); + std::ostream& request_stream = httpsClientSession.sendRequest(request); + requestJson.stringify(request_stream); Poco::Net::HTTPResponse response; - std::istream& request_stream = httpsClientSession.receiveResponse(response); + std::istream& response_stream = httpsClientSession.receiveResponse(response); // debugging answer std::stringstream responseStringStream; - for (std::string line; std::getline(request_stream, line); ) { + for (std::string line; std::getline(response_stream, line); ) { responseStringStream << line << std::endl; } Poco::Logger& speedLog = Poco::Logger::get("SpeedLog"); - speedLog.information("[%s] php server time: %s", methodName, phpRequestTime.string()); + std::string method_name(methodName); + speedLog.information("[%s] php server time: %s", method_name, phpRequestTime.string()); // extract parameter from request Poco::JSON::Parser jsonParser; @@ -113,6 +118,14 @@ JsonRequestReturn JsonRequest::request(const char* methodName, const Poco::Net:: } return request(methodName, requestJson); } + +JsonRequestReturn JsonRequest::request(const char* methodName) +{ + Poco::JSON::Object requestJson; + requestJson.set("method", methodName); + return request(methodName, requestJson); +} + #include "Poco/JSON/Stringifier.h" JsonRequestReturn JsonRequest::requestGRPCRelay(const Poco::Net::NameValueCollection& payload) { diff --git a/src/cpp/lib/JsonRequest.h b/src/cpp/lib/JsonRequest.h index ce4b7ce3e..0def2465d 100644 --- a/src/cpp/lib/JsonRequest.h +++ b/src/cpp/lib/JsonRequest.h @@ -31,6 +31,7 @@ public: JsonRequestReturn request(const char* methodName, const Poco::Net::NameValueCollection& payload); JsonRequestReturn request(const char* methodName, const Poco::JSON::Object& payload); + JsonRequestReturn request(const char* methodName); JsonRequestReturn requestGRPCRelay(const Poco::Net::NameValueCollection& payload); protected: diff --git a/src/cpp/model/gradido/Transaction.cpp b/src/cpp/model/gradido/Transaction.cpp index d2c3605e4..23b48f57a 100644 --- a/src/cpp/model/gradido/Transaction.cpp +++ b/src/cpp/model/gradido/Transaction.cpp @@ -2,6 +2,7 @@ #include "../../SingletonManager/ErrorManager.h" #include "../../SingletonManager/PendingTasksManager.h" #include "../../SingletonManager/LanguageManager.h" +#include "../../SingletonManager/CronManager.h" #include "../../ServerConfig.h" #include "../../controller/HederaId.h" @@ -550,6 +551,8 @@ namespace model { // simply assume if transaction was sended to hedera without error, it was also accepted from gradido node // TODO: later check, but now I haven't any way to communicate with the gradido node mTransactionBody->getTransactionBase()->transactionAccepted(getUser()); + // trigger community server update in 5 seconds + CronManager::getInstance()->scheduleUpdateRun(Poco::Timespan(5000, 0)); return 1; } diff --git a/src/cpp/model/table/NodeServer.h b/src/cpp/model/table/NodeServer.h index 21b26e1c7..fc92cdc9c 100644 --- a/src/cpp/model/table/NodeServer.h +++ b/src/cpp/model/table/NodeServer.h @@ -38,6 +38,10 @@ namespace model { inline void setLastLiveSign(Poco::DateTime lastLiveSign) { UNIQUE_LOCK; mLastLiveSign = lastLiveSign; } + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // !ATTENTION! if using set port or set url review CronManager code + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + inline std::string getUrl() const { return mUrl; } inline int getPort() const { return mPort; } inline std::string getUrlWithPort() const { return mUrl + ":" + std::to_string(mPort); }