From 68cb7b368bcc30cc0af05c035fd53baf83d1ba49 Mon Sep 17 00:00:00 2001 From: einhornimmond Date: Sat, 21 Sep 2024 15:49:32 +0200 Subject: [PATCH] refactor dlt connector usage, reduce complexity --- .../apis/dltConnector/DltConnectorClient.ts | 16 +-- .../dltConnector/model/TransactionRecipe.ts | 3 +- .../graphql/resolver/ContributionResolver.ts | 9 +- .../resolver/TransactionLinkResolver.ts | 3 +- .../graphql/resolver/TransactionResolver.ts | 9 +- backend/src/graphql/resolver/UserResolver.ts | 9 +- .../util/sendTransactionsToDltConnector.ts | 81 -------------- backend/src/index.ts | 5 + .../sendTransactionsToDltConnector.test.ts | 0 .../tasks/sendTransactionsToDltConnector.ts | 101 ++++++++++++++++++ backend/src/util/InterruptiveSleep.ts | 31 ++++++ backend/src/util/InterruptiveSleepManager.ts | 67 ++++++++++++ 12 files changed, 230 insertions(+), 104 deletions(-) delete mode 100644 backend/src/graphql/resolver/util/sendTransactionsToDltConnector.ts rename backend/src/{graphql/resolver/util => tasks}/sendTransactionsToDltConnector.test.ts (100%) create mode 100644 backend/src/tasks/sendTransactionsToDltConnector.ts create mode 100644 backend/src/util/InterruptiveSleep.ts create mode 100644 backend/src/util/InterruptiveSleepManager.ts diff --git a/backend/src/apis/dltConnector/DltConnectorClient.ts b/backend/src/apis/dltConnector/DltConnectorClient.ts index 85e8bfb0a..4737d95bc 100644 --- a/backend/src/apis/dltConnector/DltConnectorClient.ts +++ b/backend/src/apis/dltConnector/DltConnectorClient.ts @@ -103,10 +103,12 @@ export class DltConnectorClient { * transmit transaction via dlt-connector to iota * and update dltTransactionId of transaction in db with iota message id */ - public async transmitTransaction(transaction: DbTransaction): Promise { + public async transmitTransaction( + transaction: DbTransaction, + ): Promise { // we don't need the receive transactions, there contain basically the same data as the send transactions if ((transaction.typeId as TransactionTypeId) === TransactionTypeId.RECEIVE) { - return true + return } const typeString = getTransactionTypeString(transaction.typeId) // no negative values in dlt connector, gradido concept don't use negative values so the code don't use it too @@ -132,17 +134,15 @@ export class DltConnectorClient { // TODO: add account nr for user after they have also more than one account in backend logger.debug('transmit transaction to dlt connector', params) const { - data: { - sendTransaction: { error, succeed }, - }, + data: { sendTransaction: result }, } = await this.client.rawRequest<{ sendTransaction: TransactionResult }>( sendTransaction, params, ) - if (error) { - throw new Error(error.message) + if (result.error) { + throw new Error(result.error.message) } - return succeed + return result } catch (e) { throw new LogError('Error send sending transaction to dlt-connector: ', e) } diff --git a/backend/src/apis/dltConnector/model/TransactionRecipe.ts b/backend/src/apis/dltConnector/model/TransactionRecipe.ts index edd7deadb..504ff2044 100644 --- a/backend/src/apis/dltConnector/model/TransactionRecipe.ts +++ b/backend/src/apis/dltConnector/model/TransactionRecipe.ts @@ -1,8 +1,7 @@ import { TransactionType } from '@dltConnector/enum/TransactionType' export interface TransactionRecipe { - id: number createdAt: string type: TransactionType - topic: string + messageIdHex: string } diff --git a/backend/src/graphql/resolver/ContributionResolver.ts b/backend/src/graphql/resolver/ContributionResolver.ts index 5684835e4..145d70a3f 100644 --- a/backend/src/graphql/resolver/ContributionResolver.ts +++ b/backend/src/graphql/resolver/ContributionResolver.ts @@ -43,6 +43,10 @@ import { Context, getUser, getClientTimezoneOffset } from '@/server/context' import { LogError } from '@/server/LogError' import { backendLogger as logger } from '@/server/logger' import { calculateDecay } from '@/util/decay' +import { + InterruptiveSleepManager, + TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY, +} from '@/util/InterruptiveSleepManager' import { TRANSACTIONS_LOCK } from '@/util/TRANSACTIONS_LOCK' import { fullName } from '@/util/utilities' @@ -50,7 +54,6 @@ import { findContribution } from './util/contributions' import { getUserCreation, validateContribution, getOpenCreations } from './util/creations' import { findContributions } from './util/findContributions' import { getLastTransaction } from './util/getLastTransaction' -import { sendTransactionsToDltConnector } from './util/sendTransactionsToDltConnector' @Resolver() export class ContributionResolver { @@ -473,8 +476,8 @@ export class ContributionResolver { await queryRunner.commitTransaction() - // trigger to send transaction via dlt-connector - void sendTransactionsToDltConnector() + // notify dlt-connector loop for new work + InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY) logger.info('creation commited successfuly.') void sendContributionConfirmedEmail({ diff --git a/backend/src/graphql/resolver/TransactionLinkResolver.ts b/backend/src/graphql/resolver/TransactionLinkResolver.ts index 63134a9a8..0ef7f0586 100644 --- a/backend/src/graphql/resolver/TransactionLinkResolver.ts +++ b/backend/src/graphql/resolver/TransactionLinkResolver.ts @@ -38,10 +38,11 @@ import { TRANSACTIONS_LOCK } from '@/util/TRANSACTIONS_LOCK' import { fullName } from '@/util/utilities' import { calculateBalance } from '@/util/validate' +import { sendTransactionsToDltConnector } from '../../tasks/sendTransactionsToDltConnector' + import { executeTransaction } from './TransactionResolver' import { getUserCreation, validateContribution } from './util/creations' import { getLastTransaction } from './util/getLastTransaction' -import { sendTransactionsToDltConnector } from './util/sendTransactionsToDltConnector' import { transactionLinkList } from './util/transactionLinkList' // TODO: do not export, test it inside the resolver diff --git a/backend/src/graphql/resolver/TransactionResolver.ts b/backend/src/graphql/resolver/TransactionResolver.ts index 1889e3be0..1d652adfe 100644 --- a/backend/src/graphql/resolver/TransactionResolver.ts +++ b/backend/src/graphql/resolver/TransactionResolver.ts @@ -32,6 +32,10 @@ import { Context, getUser } from '@/server/context' import { LogError } from '@/server/LogError' import { backendLogger as logger } from '@/server/logger' import { communityUser } from '@/util/communityUser' +import { + InterruptiveSleepManager, + TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY, +} from '@/util/InterruptiveSleepManager' import { TRANSACTIONS_LOCK } from '@/util/TRANSACTIONS_LOCK' import { fullName } from '@/util/utilities' import { calculateBalance } from '@/util/validate' @@ -47,7 +51,6 @@ import { processXComCommittingSendCoins, processXComPendingSendCoins, } from './util/processXComSendCoins' -import { sendTransactionsToDltConnector } from './util/sendTransactionsToDltConnector' import { storeForeignUser } from './util/storeForeignUser' import { transactionLinkSummary } from './util/transactionLinkSummary' @@ -177,8 +180,8 @@ export const executeTransaction = async ( transactionReceive.amount, ) - // trigger to send transaction via dlt-connector - void sendTransactionsToDltConnector() + // notify dlt-connector loop for new work + InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY) } catch (e) { await queryRunner.rollbackTransaction() throw new LogError('Transaction was not successful', e) diff --git a/backend/src/graphql/resolver/UserResolver.ts b/backend/src/graphql/resolver/UserResolver.ts index 93c63e659..6ddd15705 100644 --- a/backend/src/graphql/resolver/UserResolver.ts +++ b/backend/src/graphql/resolver/UserResolver.ts @@ -30,7 +30,6 @@ import { GmsUserAuthenticationResult } from '@model/GmsUserAuthenticationResult' import { User } from '@model/User' import { UserAdmin, SearchUsersResult } from '@model/UserAdmin' -import { DltConnectorClient } from '@/apis/dltConnector/DltConnectorClient' import { updateGmsUser } from '@/apis/gms/GmsClient' import { GmsUser } from '@/apis/gms/model/GmsUser' import { HumHubClient } from '@/apis/humhub/HumHubClient' @@ -67,6 +66,7 @@ import { LogError } from '@/server/LogError' import { backendLogger as logger } from '@/server/logger' import { communityDbUser } from '@/util/communityUser' import { hasElopageBuys } from '@/util/hasElopageBuys' +import { InterruptiveSleepManager, TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/util/InterruptiveSleepManager' import { getTimeDurationObject, printTimeDuration } from '@/util/time' import random from 'random-bigint' @@ -385,11 +385,8 @@ export class UserResolver { } logger.info('createUser() successful...') - const dltConnector = DltConnectorClient.getInstance() - if (dltConnector) { - const r = await dltConnector.registerAddress(dbUser) - console.log('result from dlt', r) - } + // notify dlt-connector loop for new work + InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY) if (redeemCode) { eventRegisterRedeem.affectedUser = dbUser diff --git a/backend/src/graphql/resolver/util/sendTransactionsToDltConnector.ts b/backend/src/graphql/resolver/util/sendTransactionsToDltConnector.ts deleted file mode 100644 index 733c12594..000000000 --- a/backend/src/graphql/resolver/util/sendTransactionsToDltConnector.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { IsNull } from '@dbTools/typeorm' -import { DltTransaction } from '@entity/DltTransaction' -import { Transaction } from '@entity/Transaction' - -import { DltConnectorClient } from '@dltConnector/DltConnectorClient' - -import { backendLogger as logger } from '@/server/logger' -import { Monitor, MonitorNames } from '@/util/Monitor' - -export async function sendTransactionsToDltConnector(): Promise { - logger.info('sendTransactionsToDltConnector...') - // check if this logic is still occupied, no concurrecy allowed - if (!Monitor.isLocked(MonitorNames.SEND_DLT_TRANSACTIONS)) { - // mark this block for occuption to prevent concurrency - Monitor.lockIt(MonitorNames.SEND_DLT_TRANSACTIONS) - - try { - await createDltTransactions() - const dltConnector = DltConnectorClient.getInstance() - if (dltConnector) { - logger.debug('with sending to DltConnector...') - const dltTransactions = await DltTransaction.find({ - where: { messageId: IsNull() }, - relations: ['transaction'], - order: { createdAt: 'ASC', id: 'ASC' }, - }) - - for (const dltTx of dltTransactions) { - if (!dltTx.transaction) { - continue - } - try { - const result = await dltConnector.transmitTransaction(dltTx.transaction) - // message id isn't known at this point of time, because transaction will not direct sended to iota, - // it will first go to db and then sended, if no transaction is in db before - if (result) { - dltTx.messageId = 'sended' - await DltTransaction.save(dltTx) - logger.info('store messageId=%s in dltTx=%d', dltTx.messageId, dltTx.id) - } - } catch (e) { - logger.error( - `error while sending to dlt-connector or writing messageId of dltTx=${dltTx.id}`, - e, - ) - } - } - } else { - logger.info('sending to DltConnector currently not configured...') - } - } catch (e) { - logger.error('error on sending transactions to dlt-connector.', e) - } finally { - // releae Monitor occupation - Monitor.releaseIt(MonitorNames.SEND_DLT_TRANSACTIONS) - } - } else { - logger.info('sendTransactionsToDltConnector currently locked by monitor...') - } -} - -async function createDltTransactions(): Promise { - const dltqb = DltTransaction.createQueryBuilder().select('transactions_id') - const newTransactions: Transaction[] = await Transaction.createQueryBuilder() - .select('id') - .addSelect('balance_date') - .where('id NOT IN (' + dltqb.getSql() + ')') - // eslint-disable-next-line camelcase - .orderBy({ balance_date: 'ASC', id: 'ASC' }) - .getRawMany() - - const dltTxArray: DltTransaction[] = [] - let idx = 0 - while (newTransactions.length > dltTxArray.length) { - // timing problems with for(let idx = 0; idx < newTransactions.length; idx++) { - const dltTx = DltTransaction.create() - dltTx.transactionId = newTransactions[idx++].id - await DltTransaction.save(dltTx) - dltTxArray.push(dltTx) - } -} diff --git a/backend/src/index.ts b/backend/src/index.ts index 86f78326d..5cb3574e4 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,6 +1,7 @@ import { CONFIG } from './config' import { startValidateCommunities } from './federation/validateCommunities' import { createServer } from './server/createServer' +import { sendTransactionsToDltConnector } from './tasks/sendTransactionsToDltConnector' async function main() { const { app } = await createServer() @@ -13,6 +14,10 @@ async function main() { console.log(`GraphIQL available at http://localhost:${CONFIG.PORT}`) } }) + // task is running the whole time for transmitting transaction via dlt-connector to iota + // can be notified with InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY) + // that a new transaction or user was stored in db + void sendTransactionsToDltConnector() void startValidateCommunities(Number(CONFIG.FEDERATION_VALIDATE_COMMUNITY_TIMER)) } diff --git a/backend/src/graphql/resolver/util/sendTransactionsToDltConnector.test.ts b/backend/src/tasks/sendTransactionsToDltConnector.test.ts similarity index 100% rename from backend/src/graphql/resolver/util/sendTransactionsToDltConnector.test.ts rename to backend/src/tasks/sendTransactionsToDltConnector.test.ts diff --git a/backend/src/tasks/sendTransactionsToDltConnector.ts b/backend/src/tasks/sendTransactionsToDltConnector.ts new file mode 100644 index 000000000..26535e082 --- /dev/null +++ b/backend/src/tasks/sendTransactionsToDltConnector.ts @@ -0,0 +1,101 @@ +import { DltTransaction } from '@entity/DltTransaction' +import { DltUser } from '@entity/DltUser' +import { Transaction } from '@entity/Transaction' +import { User } from '@entity/User' + +import { DltConnectorClient } from '@dltConnector/DltConnectorClient' + +import { backendLogger as logger } from '@/server/logger' +import { + InterruptiveSleepManager, + TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY, +} from '@/util/InterruptiveSleepManager' + +let running = true + +export const stopSendTransactionsToDltConnector = (): void => { + running = false +} + +export async function sendTransactionsToDltConnector(): Promise { + const dltConnector = DltConnectorClient.getInstance() + if (!dltConnector) { + logger.info('sending to DltConnector currently not configured...') + running = false + return + } + logger.info('start sendTransactionsToDltConnector task') + + // eslint-disable-next-line no-unmodified-loop-condition + while (running) { + try { + // loop while work could be found + while (true) { + const pendingTransaction = await findNextPendingTransaction() + if (pendingTransaction instanceof User) { + const result = await dltConnector.registerAddress(pendingTransaction) + if (result?.succeed && result.recipe) { + const dltUser = DltUser.create() + dltUser.userId = pendingTransaction.id + dltUser.messageId = result.recipe.messageIdHex + // wait until saved, necessary before next call to findNextPendingTransaction + await DltUser.save(dltUser) + logger.info('store dltUser: messageId=%s in dltTx=%d', dltUser.messageId, dltUser.id) + } + } else if (pendingTransaction instanceof Transaction) { + const result = await dltConnector.transmitTransaction(pendingTransaction) + if (result?.succeed && result.recipe) { + const dltTransaction = DltTransaction.create() + dltTransaction.transactionId = pendingTransaction.id + dltTransaction.messageId = result.recipe.messageIdHex + // wait until saved, necessary before next call to findNextPendingTransaction + await DltTransaction.save(dltTransaction) + logger.info( + 'store dltTransaction: messageId=%s in dltTx=%d', + dltTransaction.messageId, + dltTransaction.id, + ) + } + } else { + // nothing to do, break inner loop and sleep until new work has arrived + break + } + } + await InterruptiveSleepManager.getInstance().sleep( + TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY, + 1000, + ) + } catch (e) { + logger.error(`error while sending to dlt-connector or writing messageId`, e) + } + } +} + +async function findNextPendingTransaction(): Promise { + const lastTransactionPromise: Promise = Transaction.createQueryBuilder() + .select() + .leftJoin(DltTransaction, 'dltTransaction', 'transaction.id = dltTransaction.transactionId') + .where('dltTransaction.transactionId IS NULL') + // eslint-disable-next-line camelcase + .orderBy({ balance_date: 'ASC', id: 'ASC' }) + .limit(1) + .getRawOne() + + const lastUserPromise: Promise = User.createQueryBuilder() + .leftJoin(DltUser, 'dltUser', 'user.id = dltUser.userId') + .where('dltUser.userId IS NULL') + // eslint-disable-next-line camelcase + .orderBy({ created_at: 'ASC', id: 'ASC' }) + .limit(1) + .getRawOne() + + const results = await Promise.all([lastTransactionPromise, lastUserPromise]) + if (results[0] && results[1]) { + return results[0].balanceDate < results[1].createdAt ? results[0] : results[1] + } else if (results[0]) { + return results[0] + } else if (results[1]) { + return results[1] + } + return null +} diff --git a/backend/src/util/InterruptiveSleep.ts b/backend/src/util/InterruptiveSleep.ts new file mode 100644 index 000000000..c21e57db9 --- /dev/null +++ b/backend/src/util/InterruptiveSleep.ts @@ -0,0 +1,31 @@ +/** + * Sleep, that can be interrupted + * call sleep only for msSteps and than check if interrupt was called + */ +export class InterruptiveSleep { + private interruptSleep = false + private msSteps = 10 + + constructor(msSteps: number) { + this.msSteps = msSteps + } + + public interrupt(): void { + this.interruptSleep = true + } + + private static _sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) + } + + public async sleep(ms: number): Promise { + let waited = 0 + this.interruptSleep = false + while (waited < ms && !this.interruptSleep) { + await InterruptiveSleep._sleep(this.msSteps) + waited += this.msSteps + } + } +} diff --git a/backend/src/util/InterruptiveSleepManager.ts b/backend/src/util/InterruptiveSleepManager.ts new file mode 100644 index 000000000..246269623 --- /dev/null +++ b/backend/src/util/InterruptiveSleepManager.ts @@ -0,0 +1,67 @@ +import { LogError } from '@/server/LogError' + +import { InterruptiveSleep } from './InterruptiveSleep' + +// Source: https://refactoring.guru/design-patterns/singleton/typescript/example +// and ../federation/client/FederationClientFactory.ts +/** + * Managing Instances of interruptive sleep it is inspired from conditions from c++ multithreading + * It is used for separate worker threads which will go to sleep after they haven't anything todo left, + * but with this Manager and InterruptiveSleep Object it sleeps only stepSize and check if something interrupted his sleep, + * so he can check for new work + */ +export const TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY = 'transmitToIota' + +// eslint-disable-next-line @typescript-eslint/no-extraneous-class +export class InterruptiveSleepManager { + // eslint-disable-next-line no-use-before-define + private static instance: InterruptiveSleepManager + private interruptiveSleep: Map = new Map() + private stepSizeMilliseconds = 10 + + /** + * The Singleton's constructor should always be private to prevent direct + * construction calls with the `new` operator. + */ + // eslint-disable-next-line no-useless-constructor, @typescript-eslint/no-empty-function + private constructor() {} + + /** + * The static method that controls the access to the singleton instance. + * + * This implementation let you subclass the Singleton class while keeping + * just one instance of each subclass around. + */ + public static getInstance(): InterruptiveSleepManager { + if (!InterruptiveSleepManager.instance) { + InterruptiveSleepManager.instance = new InterruptiveSleepManager() + } + return InterruptiveSleepManager.instance + } + + /** + * only for new created InterruptiveSleepManager Entries! + * @param step size in ms in which new! InterruptiveSleepManager check if they where triggered + */ + public setStepSize(ms: number) { + this.stepSizeMilliseconds = ms + } + + public interrupt(key: string): void { + const interruptiveSleep = this.interruptiveSleep.get(key) + if (interruptiveSleep) { + interruptiveSleep.interrupt() + } + } + + public sleep(key: string, ms: number): Promise { + if (!this.interruptiveSleep.has(key)) { + this.interruptiveSleep.set(key, new InterruptiveSleep(this.stepSizeMilliseconds)) + } + const interruptiveSleep = this.interruptiveSleep.get(key) + if (!interruptiveSleep) { + throw new LogError('map entry not exist after setting it') + } + return interruptiveSleep.sleep(ms) + } +}