From 4c389635f14c09f0e77e971094de0f722865e4f3 Mon Sep 17 00:00:00 2001 From: einhorn_b Date: Tue, 23 Jan 2024 12:15:26 +0100 Subject: [PATCH] import code from dlt main branch, rename --- dlt-connector/src/client/IotaClient.ts | 21 +++++-- dlt-connector/src/data/const.ts | 1 + .../graphql/resolver/TransactionsResolver.ts | 6 +- dlt-connector/src/index.ts | 8 +++ .../community/HomeCommunity.role.ts | 6 +- .../transmitToIota/TransmitToIota.context.ts | 41 ++++++++++++ .../src/manager/InterruptiveSleepManager.ts | 63 +++++++++++++++++++ dlt-connector/src/tasks/transmitToIota.ts | 50 +++++++++++++++ dlt-connector/src/utils/InterruptiveSleep.ts | 31 +++++++++ 9 files changed, 219 insertions(+), 8 deletions(-) create mode 100644 dlt-connector/src/data/const.ts create mode 100644 dlt-connector/src/interactions/backendToDb/transmitToIota/TransmitToIota.context.ts create mode 100644 dlt-connector/src/manager/InterruptiveSleepManager.ts create mode 100644 dlt-connector/src/tasks/transmitToIota.ts create mode 100644 dlt-connector/src/utils/InterruptiveSleep.ts diff --git a/dlt-connector/src/client/IotaClient.ts b/dlt-connector/src/client/IotaClient.ts index f6e6b1772..0e5f24739 100644 --- a/dlt-connector/src/client/IotaClient.ts +++ b/dlt-connector/src/client/IotaClient.ts @@ -2,17 +2,19 @@ import { ClientBuilder } from '@iota/client' import { MessageWrapper } from '@iota/client/lib/types' import { CONFIG } from '@/config' - const client = new ClientBuilder().node(CONFIG.IOTA_API_URL).build() /** * send data message onto iota tangle - * use CONFIG.IOTA_COMMUNITY_ALIAS for index * @param {string | Uint8Array} message - the message as utf based string, will be converted to hex automatically from @iota/client + * @param {string | Uint8Array} topic - the iota topic to which the message will be sended * @return {Promise} the iota message typed */ -function sendMessage(message: string | Uint8Array): Promise { - return client.message().index(CONFIG.IOTA_COMMUNITY_ALIAS).data(message).submit() +function sendMessage( + message: string | Uint8Array, + topic: string | Uint8Array, +): Promise { + return client.message().index(topic).data(message).submit() } /** @@ -24,7 +26,16 @@ function receiveMessage(messageId: string): Promise { return client.getMessage().data(messageId) } -export { sendMessage, receiveMessage } +function receiveAllMessagesForTopic(topic: string | Uint8Array): Promise { + return client.getMessage().index(topic) +} + +async function getIotaMilestone(messageId: string): Promise { + const metadata = await client.getMessage().metadata(messageId) + return metadata.referencedByMilestoneIndex +} + +export { sendMessage, receiveMessage, receiveAllMessagesForTopic, getIotaMilestone } /** * example for message: diff --git a/dlt-connector/src/data/const.ts b/dlt-connector/src/data/const.ts new file mode 100644 index 000000000..82470e8d4 --- /dev/null +++ b/dlt-connector/src/data/const.ts @@ -0,0 +1 @@ +export const TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY = 'transmitToIota' diff --git a/dlt-connector/src/graphql/resolver/TransactionsResolver.ts b/dlt-connector/src/graphql/resolver/TransactionsResolver.ts index 6a5017fb1..5cd122644 100755 --- a/dlt-connector/src/graphql/resolver/TransactionsResolver.ts +++ b/dlt-connector/src/graphql/resolver/TransactionsResolver.ts @@ -1,12 +1,13 @@ +import { TransactionDraft } from '@input/TransactionDraft' import { Resolver, Arg, Mutation } from 'type-graphql' -import { TransactionDraft } from '@input/TransactionDraft' - +import { TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/data/const' import { TransactionRepository } from '@/data/Transaction.repository' import { CreateTransactionRecipeContext } from '@/interactions/backendToDb/transaction/CreateTransationRecipe.context' import { BackendTransactionLoggingView } from '@/logging/BackendTransactionLogging.view' import { logger } from '@/logging/logger' import { TransactionLoggingView } from '@/logging/TransactionLogging.view' +import { InterruptiveSleepManager } from '@/manager/InterruptiveSleepManager' import { LogError } from '@/server/LogError' import { TransactionError } from '../model/TransactionError' @@ -48,6 +49,7 @@ export class TransactionResolver { // we can store the transaction and with that automatic the backend transaction await transactionRecipe.save() } + InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY) return new TransactionResult(new TransactionRecipe(transactionRecipe)) // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error: any) { diff --git a/dlt-connector/src/index.ts b/dlt-connector/src/index.ts index c72978b35..b998430fd 100644 --- a/dlt-connector/src/index.ts +++ b/dlt-connector/src/index.ts @@ -2,16 +2,24 @@ import { CONFIG } from '@/config' import createServer from './server/createServer' +import { stopTransmitToIota, transmitToIota } from './tasks/transmitToIota' async function main() { // eslint-disable-next-line no-console console.log(`DLT_CONNECTOR_PORT=${CONFIG.DLT_CONNECTOR_PORT}`) const { app } = await createServer() + // loop run all the time, check for new transaction for sending to iota + void transmitToIota() app.listen(CONFIG.DLT_CONNECTOR_PORT, () => { // eslint-disable-next-line no-console console.log(`Server is running at http://localhost:${CONFIG.DLT_CONNECTOR_PORT}`) }) + + process.on('exit', () => { + // Add shutdown logic here. + stopTransmitToIota() + }) } main().catch((e) => { diff --git a/dlt-connector/src/interactions/backendToDb/community/HomeCommunity.role.ts b/dlt-connector/src/interactions/backendToDb/community/HomeCommunity.role.ts index 7a4798368..647c5d397 100644 --- a/dlt-connector/src/interactions/backendToDb/community/HomeCommunity.role.ts +++ b/dlt-connector/src/interactions/backendToDb/community/HomeCommunity.role.ts @@ -3,6 +3,7 @@ import { Transaction } from '@entity/Transaction' import { CONFIG } from '@/config' import { AccountFactory } from '@/data/Account.factory' +import { TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/data/const' import { KeyPair } from '@/data/KeyPair' import { Mnemonic } from '@/data/Mnemonic' import { TransactionErrorType } from '@/graphql/enum/TransactionErrorType' @@ -10,6 +11,7 @@ import { CommunityDraft } from '@/graphql/input/CommunityDraft' import { TransactionError } from '@/graphql/model/TransactionError' import { CommunityLoggingView } from '@/logging/CommunityLogging.view' import { logger } from '@/logging/logger' +import { InterruptiveSleepManager } from '@/manager/InterruptiveSleepManager' import { getDataSource } from '@/typeorm/DataSource' import { CreateTransactionRecipeContext } from '../transaction/CreateTransationRecipe.context' @@ -36,12 +38,14 @@ export class HomeCommunityRole extends CommunityRole { public async store(): Promise { try { - return await getDataSource().transaction(async (transactionalEntityManager) => { + const community = await getDataSource().transaction(async (transactionalEntityManager) => { const community = await transactionalEntityManager.save(this.self) await transactionalEntityManager.save(this.transactionRecipe) logger.debug('store home community', new CommunityLoggingView(community)) return community }) + InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY) + return community } catch (error) { logger.error('error saving home community into db: %s', error) throw new TransactionError( diff --git a/dlt-connector/src/interactions/backendToDb/transmitToIota/TransmitToIota.context.ts b/dlt-connector/src/interactions/backendToDb/transmitToIota/TransmitToIota.context.ts new file mode 100644 index 000000000..fb38428a2 --- /dev/null +++ b/dlt-connector/src/interactions/backendToDb/transmitToIota/TransmitToIota.context.ts @@ -0,0 +1,41 @@ +import { Transaction } from '@entity/Transaction' + +import { logger } from '@/logging/logger' +import { TransactionLoggingView } from '@/logging/TransactionLogging.view' + +/** + * @DCI-Context + * Context for sending transaction recipe to iota + * send every transaction only once to iota! + */ +export class TransmitToIotaContext { + // eslint-disable-next-line no-useless-constructor + public constructor(private transaction: Transaction) { + + } + + public async run(): Promise { + logger.info('transmit to iota', new TransactionLoggingView(this.transaction)) + const recipeController = new TransactionRecipe(recipe) + const { transaction, body } = recipeController.getGradidoTransaction() + const messageBuffer = GradidoTransaction.encode(transaction).finish() + + if (body.type === CrossGroupType.LOCAL) { + const resultMessage = await iotaSendMessage( + messageBuffer, + Buffer.from(recipe.community.iotaTopic, 'hex'), + ) + recipe.iotaMessageId = Buffer.from(resultMessage.messageId, 'hex') + logger.info('transmitted Gradido Transaction to Iota', { + id: recipe.id, + messageId: resultMessage.messageId, + }) + await getDataSource().manager.save(recipe) + } else { + throw new TransactionError( + TransactionErrorType.NOT_IMPLEMENTED_YET, + 'other as crossGroupType Local not implemented yet', + ) + } + } +} diff --git a/dlt-connector/src/manager/InterruptiveSleepManager.ts b/dlt-connector/src/manager/InterruptiveSleepManager.ts new file mode 100644 index 000000000..3fc5a9b88 --- /dev/null +++ b/dlt-connector/src/manager/InterruptiveSleepManager.ts @@ -0,0 +1,63 @@ +import { LogError } from '@/server/LogError' + +import { InterruptiveSleep } from '../utils/InterruptiveSleep' + +// Source: https://refactoring.guru/design-patterns/singleton/typescript/example +// and ../federation/client/FederationClientFactory.ts +/** + * A Singleton class defines the `getInstance` method that lets clients access + * the unique singleton instance. + */ +// eslint-disable-next-line @typescript-eslint/no-extraneous-class +export class InterruptiveSleepManager { + // eslint-disable-next-line no-use-before-define + private static instance: InterruptiveSleepManager + private interruptiveSleep: Map = new Map() + private stepSizeMilliseconds = 10 + + /** + * The Singleton's constructor should always be private to prevent direct + * construction calls with the `new` operator. + */ + // eslint-disable-next-line no-useless-constructor, @typescript-eslint/no-empty-function + private constructor() {} + + /** + * The static method that controls the access to the singleton instance. + * + * This implementation let you subclass the Singleton class while keeping + * just one instance of each subclass around. + */ + public static getInstance(): InterruptiveSleepManager { + if (!InterruptiveSleepManager.instance) { + InterruptiveSleepManager.instance = new InterruptiveSleepManager() + } + return InterruptiveSleepManager.instance + } + + /** + * only for new created InterruptiveSleepManager Entries! + * @param step size in ms in which new! InterruptiveSleepManager check if they where triggered + */ + public setStepSize(ms: number) { + this.stepSizeMilliseconds = ms + } + + public interrupt(key: string): void { + const interruptiveSleep = this.interruptiveSleep.get(key) + if (interruptiveSleep) { + interruptiveSleep.interrupt() + } + } + + public sleep(key: string, ms: number): Promise { + if (!this.interruptiveSleep.has(key)) { + this.interruptiveSleep.set(key, new InterruptiveSleep(this.stepSizeMilliseconds)) + } + const cond = this.interruptiveSleep.get(key) + if (!cond) { + throw new LogError('map entry not exist after setting it') + } + return cond.sleep(ms) + } +} diff --git a/dlt-connector/src/tasks/transmitToIota.ts b/dlt-connector/src/tasks/transmitToIota.ts new file mode 100644 index 000000000..5a0545bc8 --- /dev/null +++ b/dlt-connector/src/tasks/transmitToIota.ts @@ -0,0 +1,50 @@ +import { TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/data/const' +import { TransactionRepository } from '@/data/Transaction.repository' +import { TransmitToIotaContext } from '@/interactions/backendToDb/transmitToIota/TransmitToIota.context' +import { InterruptiveSleepManager } from '@/manager/InterruptiveSleepManager' + +import { logger } from '../logging/logger' + +function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} + +let running = true + +export const stopTransmitToIota = (): void => { + running = false +} +/** + * check for pending transactions: + * - if one found call TransmitToIotaContext + * - if not, wait 1000 ms and try again + * if a new transaction was added, the sleep will be interrupted + */ +export const transmitToIota = async (): Promise => { + logger.info('start iota message transmitter') + // eslint-disable-next-line no-unmodified-loop-condition + while (running) { + try { + while (true) { + const recipe = await TransactionRepository.getNextPendingTransaction() + if (!recipe) break + const transmitToIotaContext = new TransmitToIotaContext(recipe) + await transmitToIotaContext.run() + } + + await InterruptiveSleepManager.getInstance().sleep( + TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY, + // 1000, + 1000, + ) + } catch (error) { + logger.error('error while transmitting to iota, retry in 10 seconds ', error) + await sleep(10000) + } + } + logger.info( + 'end iota message transmitter, no further transaction will be transmitted. !!! Please restart Server !!!', + ) +} diff --git a/dlt-connector/src/utils/InterruptiveSleep.ts b/dlt-connector/src/utils/InterruptiveSleep.ts new file mode 100644 index 000000000..c21e57db9 --- /dev/null +++ b/dlt-connector/src/utils/InterruptiveSleep.ts @@ -0,0 +1,31 @@ +/** + * Sleep, that can be interrupted + * call sleep only for msSteps and than check if interrupt was called + */ +export class InterruptiveSleep { + private interruptSleep = false + private msSteps = 10 + + constructor(msSteps: number) { + this.msSteps = msSteps + } + + public interrupt(): void { + this.interruptSleep = true + } + + private static _sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) + } + + public async sleep(ms: number): Promise { + let waited = 0 + this.interruptSleep = false + while (waited < ms && !this.interruptSleep) { + await InterruptiveSleep._sleep(this.msSteps) + waited += this.msSteps + } + } +}