refactor dlt connector usage, reduce complexity

This commit is contained in:
einhornimmond 2024-09-21 15:49:32 +02:00
parent d269fb4799
commit 68cb7b368b
12 changed files with 230 additions and 104 deletions

View File

@ -103,10 +103,12 @@ export class DltConnectorClient {
* transmit transaction via dlt-connector to iota
* and update dltTransactionId of transaction in db with iota message id
*/
public async transmitTransaction(transaction: DbTransaction): Promise<boolean> {
public async transmitTransaction(
transaction: DbTransaction,
): Promise<TransactionResult | undefined> {
// we don't need the receive transactions, there contain basically the same data as the send transactions
if ((transaction.typeId as TransactionTypeId) === TransactionTypeId.RECEIVE) {
return true
return
}
const typeString = getTransactionTypeString(transaction.typeId)
// no negative values in dlt connector, gradido concept don't use negative values so the code don't use it too
@ -132,17 +134,15 @@ export class DltConnectorClient {
// TODO: add account nr for user after they have also more than one account in backend
logger.debug('transmit transaction to dlt connector', params)
const {
data: {
sendTransaction: { error, succeed },
},
data: { sendTransaction: result },
} = await this.client.rawRequest<{ sendTransaction: TransactionResult }>(
sendTransaction,
params,
)
if (error) {
throw new Error(error.message)
if (result.error) {
throw new Error(result.error.message)
}
return succeed
return result
} catch (e) {
throw new LogError('Error send sending transaction to dlt-connector: ', e)
}

View File

@ -1,8 +1,7 @@
import { TransactionType } from '@dltConnector/enum/TransactionType'
export interface TransactionRecipe {
id: number
createdAt: string
type: TransactionType
topic: string
messageIdHex: string
}

View File

@ -43,6 +43,10 @@ import { Context, getUser, getClientTimezoneOffset } from '@/server/context'
import { LogError } from '@/server/LogError'
import { backendLogger as logger } from '@/server/logger'
import { calculateDecay } from '@/util/decay'
import {
InterruptiveSleepManager,
TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY,
} from '@/util/InterruptiveSleepManager'
import { TRANSACTIONS_LOCK } from '@/util/TRANSACTIONS_LOCK'
import { fullName } from '@/util/utilities'
@ -50,7 +54,6 @@ import { findContribution } from './util/contributions'
import { getUserCreation, validateContribution, getOpenCreations } from './util/creations'
import { findContributions } from './util/findContributions'
import { getLastTransaction } from './util/getLastTransaction'
import { sendTransactionsToDltConnector } from './util/sendTransactionsToDltConnector'
@Resolver()
export class ContributionResolver {
@ -473,8 +476,8 @@ export class ContributionResolver {
await queryRunner.commitTransaction()
// trigger to send transaction via dlt-connector
void sendTransactionsToDltConnector()
// notify dlt-connector loop for new work
InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY)
logger.info('creation commited successfuly.')
void sendContributionConfirmedEmail({

View File

@ -38,10 +38,11 @@ import { TRANSACTIONS_LOCK } from '@/util/TRANSACTIONS_LOCK'
import { fullName } from '@/util/utilities'
import { calculateBalance } from '@/util/validate'
import { sendTransactionsToDltConnector } from '../../tasks/sendTransactionsToDltConnector'
import { executeTransaction } from './TransactionResolver'
import { getUserCreation, validateContribution } from './util/creations'
import { getLastTransaction } from './util/getLastTransaction'
import { sendTransactionsToDltConnector } from './util/sendTransactionsToDltConnector'
import { transactionLinkList } from './util/transactionLinkList'
// TODO: do not export, test it inside the resolver

View File

@ -32,6 +32,10 @@ import { Context, getUser } from '@/server/context'
import { LogError } from '@/server/LogError'
import { backendLogger as logger } from '@/server/logger'
import { communityUser } from '@/util/communityUser'
import {
InterruptiveSleepManager,
TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY,
} from '@/util/InterruptiveSleepManager'
import { TRANSACTIONS_LOCK } from '@/util/TRANSACTIONS_LOCK'
import { fullName } from '@/util/utilities'
import { calculateBalance } from '@/util/validate'
@ -47,7 +51,6 @@ import {
processXComCommittingSendCoins,
processXComPendingSendCoins,
} from './util/processXComSendCoins'
import { sendTransactionsToDltConnector } from './util/sendTransactionsToDltConnector'
import { storeForeignUser } from './util/storeForeignUser'
import { transactionLinkSummary } from './util/transactionLinkSummary'
@ -177,8 +180,8 @@ export const executeTransaction = async (
transactionReceive.amount,
)
// trigger to send transaction via dlt-connector
void sendTransactionsToDltConnector()
// notify dlt-connector loop for new work
InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY)
} catch (e) {
await queryRunner.rollbackTransaction()
throw new LogError('Transaction was not successful', e)

View File

@ -30,7 +30,6 @@ import { GmsUserAuthenticationResult } from '@model/GmsUserAuthenticationResult'
import { User } from '@model/User'
import { UserAdmin, SearchUsersResult } from '@model/UserAdmin'
import { DltConnectorClient } from '@/apis/dltConnector/DltConnectorClient'
import { updateGmsUser } from '@/apis/gms/GmsClient'
import { GmsUser } from '@/apis/gms/model/GmsUser'
import { HumHubClient } from '@/apis/humhub/HumHubClient'
@ -67,6 +66,7 @@ import { LogError } from '@/server/LogError'
import { backendLogger as logger } from '@/server/logger'
import { communityDbUser } from '@/util/communityUser'
import { hasElopageBuys } from '@/util/hasElopageBuys'
import { InterruptiveSleepManager, TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/util/InterruptiveSleepManager'
import { getTimeDurationObject, printTimeDuration } from '@/util/time'
import random from 'random-bigint'
@ -385,11 +385,8 @@ export class UserResolver {
}
logger.info('createUser() successful...')
const dltConnector = DltConnectorClient.getInstance()
if (dltConnector) {
const r = await dltConnector.registerAddress(dbUser)
console.log('result from dlt', r)
}
// notify dlt-connector loop for new work
InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY)
if (redeemCode) {
eventRegisterRedeem.affectedUser = dbUser

View File

@ -1,81 +0,0 @@
import { IsNull } from '@dbTools/typeorm'
import { DltTransaction } from '@entity/DltTransaction'
import { Transaction } from '@entity/Transaction'
import { DltConnectorClient } from '@dltConnector/DltConnectorClient'
import { backendLogger as logger } from '@/server/logger'
import { Monitor, MonitorNames } from '@/util/Monitor'
export async function sendTransactionsToDltConnector(): Promise<void> {
logger.info('sendTransactionsToDltConnector...')
// check if this logic is still occupied, no concurrecy allowed
if (!Monitor.isLocked(MonitorNames.SEND_DLT_TRANSACTIONS)) {
// mark this block for occuption to prevent concurrency
Monitor.lockIt(MonitorNames.SEND_DLT_TRANSACTIONS)
try {
await createDltTransactions()
const dltConnector = DltConnectorClient.getInstance()
if (dltConnector) {
logger.debug('with sending to DltConnector...')
const dltTransactions = await DltTransaction.find({
where: { messageId: IsNull() },
relations: ['transaction'],
order: { createdAt: 'ASC', id: 'ASC' },
})
for (const dltTx of dltTransactions) {
if (!dltTx.transaction) {
continue
}
try {
const result = await dltConnector.transmitTransaction(dltTx.transaction)
// message id isn't known at this point of time, because transaction will not direct sended to iota,
// it will first go to db and then sended, if no transaction is in db before
if (result) {
dltTx.messageId = 'sended'
await DltTransaction.save(dltTx)
logger.info('store messageId=%s in dltTx=%d', dltTx.messageId, dltTx.id)
}
} catch (e) {
logger.error(
`error while sending to dlt-connector or writing messageId of dltTx=${dltTx.id}`,
e,
)
}
}
} else {
logger.info('sending to DltConnector currently not configured...')
}
} catch (e) {
logger.error('error on sending transactions to dlt-connector.', e)
} finally {
// releae Monitor occupation
Monitor.releaseIt(MonitorNames.SEND_DLT_TRANSACTIONS)
}
} else {
logger.info('sendTransactionsToDltConnector currently locked by monitor...')
}
}
async function createDltTransactions(): Promise<void> {
const dltqb = DltTransaction.createQueryBuilder().select('transactions_id')
const newTransactions: Transaction[] = await Transaction.createQueryBuilder()
.select('id')
.addSelect('balance_date')
.where('id NOT IN (' + dltqb.getSql() + ')')
// eslint-disable-next-line camelcase
.orderBy({ balance_date: 'ASC', id: 'ASC' })
.getRawMany()
const dltTxArray: DltTransaction[] = []
let idx = 0
while (newTransactions.length > dltTxArray.length) {
// timing problems with for(let idx = 0; idx < newTransactions.length; idx++) {
const dltTx = DltTransaction.create()
dltTx.transactionId = newTransactions[idx++].id
await DltTransaction.save(dltTx)
dltTxArray.push(dltTx)
}
}

View File

@ -1,6 +1,7 @@
import { CONFIG } from './config'
import { startValidateCommunities } from './federation/validateCommunities'
import { createServer } from './server/createServer'
import { sendTransactionsToDltConnector } from './tasks/sendTransactionsToDltConnector'
async function main() {
const { app } = await createServer()
@ -13,6 +14,10 @@ async function main() {
console.log(`GraphIQL available at http://localhost:${CONFIG.PORT}`)
}
})
// task is running the whole time for transmitting transaction via dlt-connector to iota
// can be notified with InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY)
// that a new transaction or user was stored in db
void sendTransactionsToDltConnector()
void startValidateCommunities(Number(CONFIG.FEDERATION_VALIDATE_COMMUNITY_TIMER))
}

View File

@ -0,0 +1,101 @@
import { DltTransaction } from '@entity/DltTransaction'
import { DltUser } from '@entity/DltUser'
import { Transaction } from '@entity/Transaction'
import { User } from '@entity/User'
import { DltConnectorClient } from '@dltConnector/DltConnectorClient'
import { backendLogger as logger } from '@/server/logger'
import {
InterruptiveSleepManager,
TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY,
} from '@/util/InterruptiveSleepManager'
let running = true
export const stopSendTransactionsToDltConnector = (): void => {
running = false
}
export async function sendTransactionsToDltConnector(): Promise<void> {
const dltConnector = DltConnectorClient.getInstance()
if (!dltConnector) {
logger.info('sending to DltConnector currently not configured...')
running = false
return
}
logger.info('start sendTransactionsToDltConnector task')
// eslint-disable-next-line no-unmodified-loop-condition
while (running) {
try {
// loop while work could be found
while (true) {
const pendingTransaction = await findNextPendingTransaction()
if (pendingTransaction instanceof User) {
const result = await dltConnector.registerAddress(pendingTransaction)
if (result?.succeed && result.recipe) {
const dltUser = DltUser.create()
dltUser.userId = pendingTransaction.id
dltUser.messageId = result.recipe.messageIdHex
// wait until saved, necessary before next call to findNextPendingTransaction
await DltUser.save(dltUser)
logger.info('store dltUser: messageId=%s in dltTx=%d', dltUser.messageId, dltUser.id)
}
} else if (pendingTransaction instanceof Transaction) {
const result = await dltConnector.transmitTransaction(pendingTransaction)
if (result?.succeed && result.recipe) {
const dltTransaction = DltTransaction.create()
dltTransaction.transactionId = pendingTransaction.id
dltTransaction.messageId = result.recipe.messageIdHex
// wait until saved, necessary before next call to findNextPendingTransaction
await DltTransaction.save(dltTransaction)
logger.info(
'store dltTransaction: messageId=%s in dltTx=%d',
dltTransaction.messageId,
dltTransaction.id,
)
}
} else {
// nothing to do, break inner loop and sleep until new work has arrived
break
}
}
await InterruptiveSleepManager.getInstance().sleep(
TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY,
1000,
)
} catch (e) {
logger.error(`error while sending to dlt-connector or writing messageId`, e)
}
}
}
async function findNextPendingTransaction(): Promise<Transaction | User | null> {
const lastTransactionPromise: Promise<Transaction | undefined> = Transaction.createQueryBuilder()
.select()
.leftJoin(DltTransaction, 'dltTransaction', 'transaction.id = dltTransaction.transactionId')
.where('dltTransaction.transactionId IS NULL')
// eslint-disable-next-line camelcase
.orderBy({ balance_date: 'ASC', id: 'ASC' })
.limit(1)
.getRawOne()
const lastUserPromise: Promise<User | undefined> = User.createQueryBuilder()
.leftJoin(DltUser, 'dltUser', 'user.id = dltUser.userId')
.where('dltUser.userId IS NULL')
// eslint-disable-next-line camelcase
.orderBy({ created_at: 'ASC', id: 'ASC' })
.limit(1)
.getRawOne()
const results = await Promise.all([lastTransactionPromise, lastUserPromise])
if (results[0] && results[1]) {
return results[0].balanceDate < results[1].createdAt ? results[0] : results[1]
} else if (results[0]) {
return results[0]
} else if (results[1]) {
return results[1]
}
return null
}

View File

@ -0,0 +1,31 @@
/**
* Sleep, that can be interrupted
* call sleep only for msSteps and than check if interrupt was called
*/
export class InterruptiveSleep {
private interruptSleep = false
private msSteps = 10
constructor(msSteps: number) {
this.msSteps = msSteps
}
public interrupt(): void {
this.interruptSleep = true
}
private static _sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
public async sleep(ms: number): Promise<void> {
let waited = 0
this.interruptSleep = false
while (waited < ms && !this.interruptSleep) {
await InterruptiveSleep._sleep(this.msSteps)
waited += this.msSteps
}
}
}

View File

@ -0,0 +1,67 @@
import { LogError } from '@/server/LogError'
import { InterruptiveSleep } from './InterruptiveSleep'
// Source: https://refactoring.guru/design-patterns/singleton/typescript/example
// and ../federation/client/FederationClientFactory.ts
/**
* Managing Instances of interruptive sleep it is inspired from conditions from c++ multithreading
* It is used for separate worker threads which will go to sleep after they haven't anything todo left,
* but with this Manager and InterruptiveSleep Object it sleeps only stepSize and check if something interrupted his sleep,
* so he can check for new work
*/
export const TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY = 'transmitToIota'
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
export class InterruptiveSleepManager {
// eslint-disable-next-line no-use-before-define
private static instance: InterruptiveSleepManager
private interruptiveSleep: Map<string, InterruptiveSleep> = new Map<string, InterruptiveSleep>()
private stepSizeMilliseconds = 10
/**
* The Singleton's constructor should always be private to prevent direct
* construction calls with the `new` operator.
*/
// eslint-disable-next-line no-useless-constructor, @typescript-eslint/no-empty-function
private constructor() {}
/**
* The static method that controls the access to the singleton instance.
*
* This implementation let you subclass the Singleton class while keeping
* just one instance of each subclass around.
*/
public static getInstance(): InterruptiveSleepManager {
if (!InterruptiveSleepManager.instance) {
InterruptiveSleepManager.instance = new InterruptiveSleepManager()
}
return InterruptiveSleepManager.instance
}
/**
* only for new created InterruptiveSleepManager Entries!
* @param step size in ms in which new! InterruptiveSleepManager check if they where triggered
*/
public setStepSize(ms: number) {
this.stepSizeMilliseconds = ms
}
public interrupt(key: string): void {
const interruptiveSleep = this.interruptiveSleep.get(key)
if (interruptiveSleep) {
interruptiveSleep.interrupt()
}
}
public sleep(key: string, ms: number): Promise<void> {
if (!this.interruptiveSleep.has(key)) {
this.interruptiveSleep.set(key, new InterruptiveSleep(this.stepSizeMilliseconds))
}
const interruptiveSleep = this.interruptiveSleep.get(key)
if (!interruptiveSleep) {
throw new LogError('map entry not exist after setting it')
}
return interruptiveSleep.sleep(ms)
}
}