mirror of
https://github.com/IT4Change/gradido.git
synced 2026-02-06 09:56:05 +00:00
import code from dlt main branch, rename
This commit is contained in:
parent
0ed023b014
commit
4c389635f1
@ -2,17 +2,19 @@ import { ClientBuilder } from '@iota/client'
|
||||
import { MessageWrapper } from '@iota/client/lib/types'
|
||||
|
||||
import { CONFIG } from '@/config'
|
||||
|
||||
const client = new ClientBuilder().node(CONFIG.IOTA_API_URL).build()
|
||||
|
||||
/**
|
||||
* send data message onto iota tangle
|
||||
* use CONFIG.IOTA_COMMUNITY_ALIAS for index
|
||||
* @param {string | Uint8Array} message - the message as utf based string, will be converted to hex automatically from @iota/client
|
||||
* @param {string | Uint8Array} topic - the iota topic to which the message will be sended
|
||||
* @return {Promise<MessageWrapper>} the iota message typed
|
||||
*/
|
||||
function sendMessage(message: string | Uint8Array): Promise<MessageWrapper> {
|
||||
return client.message().index(CONFIG.IOTA_COMMUNITY_ALIAS).data(message).submit()
|
||||
function sendMessage(
|
||||
message: string | Uint8Array,
|
||||
topic: string | Uint8Array,
|
||||
): Promise<MessageWrapper> {
|
||||
return client.message().index(topic).data(message).submit()
|
||||
}
|
||||
|
||||
/**
|
||||
@ -24,7 +26,16 @@ function receiveMessage(messageId: string): Promise<MessageWrapper> {
|
||||
return client.getMessage().data(messageId)
|
||||
}
|
||||
|
||||
export { sendMessage, receiveMessage }
|
||||
function receiveAllMessagesForTopic(topic: string | Uint8Array): Promise<string[]> {
|
||||
return client.getMessage().index(topic)
|
||||
}
|
||||
|
||||
async function getIotaMilestone(messageId: string): Promise<number | undefined> {
|
||||
const metadata = await client.getMessage().metadata(messageId)
|
||||
return metadata.referencedByMilestoneIndex
|
||||
}
|
||||
|
||||
export { sendMessage, receiveMessage, receiveAllMessagesForTopic, getIotaMilestone }
|
||||
|
||||
/**
|
||||
* example for message:
|
||||
|
||||
1
dlt-connector/src/data/const.ts
Normal file
1
dlt-connector/src/data/const.ts
Normal file
@ -0,0 +1 @@
|
||||
export const TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY = 'transmitToIota'
|
||||
@ -1,12 +1,13 @@
|
||||
import { TransactionDraft } from '@input/TransactionDraft'
|
||||
import { Resolver, Arg, Mutation } from 'type-graphql'
|
||||
|
||||
import { TransactionDraft } from '@input/TransactionDraft'
|
||||
|
||||
import { TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/data/const'
|
||||
import { TransactionRepository } from '@/data/Transaction.repository'
|
||||
import { CreateTransactionRecipeContext } from '@/interactions/backendToDb/transaction/CreateTransationRecipe.context'
|
||||
import { BackendTransactionLoggingView } from '@/logging/BackendTransactionLogging.view'
|
||||
import { logger } from '@/logging/logger'
|
||||
import { TransactionLoggingView } from '@/logging/TransactionLogging.view'
|
||||
import { InterruptiveSleepManager } from '@/manager/InterruptiveSleepManager'
|
||||
import { LogError } from '@/server/LogError'
|
||||
|
||||
import { TransactionError } from '../model/TransactionError'
|
||||
@ -48,6 +49,7 @@ export class TransactionResolver {
|
||||
// we can store the transaction and with that automatic the backend transaction
|
||||
await transactionRecipe.save()
|
||||
}
|
||||
InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY)
|
||||
return new TransactionResult(new TransactionRecipe(transactionRecipe))
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} catch (error: any) {
|
||||
|
||||
@ -2,16 +2,24 @@
|
||||
import { CONFIG } from '@/config'
|
||||
|
||||
import createServer from './server/createServer'
|
||||
import { stopTransmitToIota, transmitToIota } from './tasks/transmitToIota'
|
||||
|
||||
async function main() {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(`DLT_CONNECTOR_PORT=${CONFIG.DLT_CONNECTOR_PORT}`)
|
||||
const { app } = await createServer()
|
||||
|
||||
// loop run all the time, check for new transaction for sending to iota
|
||||
void transmitToIota()
|
||||
app.listen(CONFIG.DLT_CONNECTOR_PORT, () => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(`Server is running at http://localhost:${CONFIG.DLT_CONNECTOR_PORT}`)
|
||||
})
|
||||
|
||||
process.on('exit', () => {
|
||||
// Add shutdown logic here.
|
||||
stopTransmitToIota()
|
||||
})
|
||||
}
|
||||
|
||||
main().catch((e) => {
|
||||
|
||||
@ -3,6 +3,7 @@ import { Transaction } from '@entity/Transaction'
|
||||
|
||||
import { CONFIG } from '@/config'
|
||||
import { AccountFactory } from '@/data/Account.factory'
|
||||
import { TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/data/const'
|
||||
import { KeyPair } from '@/data/KeyPair'
|
||||
import { Mnemonic } from '@/data/Mnemonic'
|
||||
import { TransactionErrorType } from '@/graphql/enum/TransactionErrorType'
|
||||
@ -10,6 +11,7 @@ import { CommunityDraft } from '@/graphql/input/CommunityDraft'
|
||||
import { TransactionError } from '@/graphql/model/TransactionError'
|
||||
import { CommunityLoggingView } from '@/logging/CommunityLogging.view'
|
||||
import { logger } from '@/logging/logger'
|
||||
import { InterruptiveSleepManager } from '@/manager/InterruptiveSleepManager'
|
||||
import { getDataSource } from '@/typeorm/DataSource'
|
||||
|
||||
import { CreateTransactionRecipeContext } from '../transaction/CreateTransationRecipe.context'
|
||||
@ -36,12 +38,14 @@ export class HomeCommunityRole extends CommunityRole {
|
||||
|
||||
public async store(): Promise<Community> {
|
||||
try {
|
||||
return await getDataSource().transaction(async (transactionalEntityManager) => {
|
||||
const community = await getDataSource().transaction(async (transactionalEntityManager) => {
|
||||
const community = await transactionalEntityManager.save(this.self)
|
||||
await transactionalEntityManager.save(this.transactionRecipe)
|
||||
logger.debug('store home community', new CommunityLoggingView(community))
|
||||
return community
|
||||
})
|
||||
InterruptiveSleepManager.getInstance().interrupt(TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY)
|
||||
return community
|
||||
} catch (error) {
|
||||
logger.error('error saving home community into db: %s', error)
|
||||
throw new TransactionError(
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
import { Transaction } from '@entity/Transaction'
|
||||
|
||||
import { logger } from '@/logging/logger'
|
||||
import { TransactionLoggingView } from '@/logging/TransactionLogging.view'
|
||||
|
||||
/**
|
||||
* @DCI-Context
|
||||
* Context for sending transaction recipe to iota
|
||||
* send every transaction only once to iota!
|
||||
*/
|
||||
export class TransmitToIotaContext {
|
||||
// eslint-disable-next-line no-useless-constructor
|
||||
public constructor(private transaction: Transaction) {
|
||||
|
||||
}
|
||||
|
||||
public async run(): Promise<void> {
|
||||
logger.info('transmit to iota', new TransactionLoggingView(this.transaction))
|
||||
const recipeController = new TransactionRecipe(recipe)
|
||||
const { transaction, body } = recipeController.getGradidoTransaction()
|
||||
const messageBuffer = GradidoTransaction.encode(transaction).finish()
|
||||
|
||||
if (body.type === CrossGroupType.LOCAL) {
|
||||
const resultMessage = await iotaSendMessage(
|
||||
messageBuffer,
|
||||
Buffer.from(recipe.community.iotaTopic, 'hex'),
|
||||
)
|
||||
recipe.iotaMessageId = Buffer.from(resultMessage.messageId, 'hex')
|
||||
logger.info('transmitted Gradido Transaction to Iota', {
|
||||
id: recipe.id,
|
||||
messageId: resultMessage.messageId,
|
||||
})
|
||||
await getDataSource().manager.save(recipe)
|
||||
} else {
|
||||
throw new TransactionError(
|
||||
TransactionErrorType.NOT_IMPLEMENTED_YET,
|
||||
'other as crossGroupType Local not implemented yet',
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
63
dlt-connector/src/manager/InterruptiveSleepManager.ts
Normal file
63
dlt-connector/src/manager/InterruptiveSleepManager.ts
Normal file
@ -0,0 +1,63 @@
|
||||
import { LogError } from '@/server/LogError'
|
||||
|
||||
import { InterruptiveSleep } from '../utils/InterruptiveSleep'
|
||||
|
||||
// Source: https://refactoring.guru/design-patterns/singleton/typescript/example
|
||||
// and ../federation/client/FederationClientFactory.ts
|
||||
/**
|
||||
* A Singleton class defines the `getInstance` method that lets clients access
|
||||
* the unique singleton instance.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
|
||||
export class InterruptiveSleepManager {
|
||||
// eslint-disable-next-line no-use-before-define
|
||||
private static instance: InterruptiveSleepManager
|
||||
private interruptiveSleep: Map<string, InterruptiveSleep> = new Map<string, InterruptiveSleep>()
|
||||
private stepSizeMilliseconds = 10
|
||||
|
||||
/**
|
||||
* The Singleton's constructor should always be private to prevent direct
|
||||
* construction calls with the `new` operator.
|
||||
*/
|
||||
// eslint-disable-next-line no-useless-constructor, @typescript-eslint/no-empty-function
|
||||
private constructor() {}
|
||||
|
||||
/**
|
||||
* The static method that controls the access to the singleton instance.
|
||||
*
|
||||
* This implementation let you subclass the Singleton class while keeping
|
||||
* just one instance of each subclass around.
|
||||
*/
|
||||
public static getInstance(): InterruptiveSleepManager {
|
||||
if (!InterruptiveSleepManager.instance) {
|
||||
InterruptiveSleepManager.instance = new InterruptiveSleepManager()
|
||||
}
|
||||
return InterruptiveSleepManager.instance
|
||||
}
|
||||
|
||||
/**
|
||||
* only for new created InterruptiveSleepManager Entries!
|
||||
* @param step size in ms in which new! InterruptiveSleepManager check if they where triggered
|
||||
*/
|
||||
public setStepSize(ms: number) {
|
||||
this.stepSizeMilliseconds = ms
|
||||
}
|
||||
|
||||
public interrupt(key: string): void {
|
||||
const interruptiveSleep = this.interruptiveSleep.get(key)
|
||||
if (interruptiveSleep) {
|
||||
interruptiveSleep.interrupt()
|
||||
}
|
||||
}
|
||||
|
||||
public sleep(key: string, ms: number): Promise<void> {
|
||||
if (!this.interruptiveSleep.has(key)) {
|
||||
this.interruptiveSleep.set(key, new InterruptiveSleep(this.stepSizeMilliseconds))
|
||||
}
|
||||
const cond = this.interruptiveSleep.get(key)
|
||||
if (!cond) {
|
||||
throw new LogError('map entry not exist after setting it')
|
||||
}
|
||||
return cond.sleep(ms)
|
||||
}
|
||||
}
|
||||
50
dlt-connector/src/tasks/transmitToIota.ts
Normal file
50
dlt-connector/src/tasks/transmitToIota.ts
Normal file
@ -0,0 +1,50 @@
|
||||
import { TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY } from '@/data/const'
|
||||
import { TransactionRepository } from '@/data/Transaction.repository'
|
||||
import { TransmitToIotaContext } from '@/interactions/backendToDb/transmitToIota/TransmitToIota.context'
|
||||
import { InterruptiveSleepManager } from '@/manager/InterruptiveSleepManager'
|
||||
|
||||
import { logger } from '../logging/logger'
|
||||
|
||||
function sleep(ms: number) {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms)
|
||||
})
|
||||
}
|
||||
|
||||
let running = true
|
||||
|
||||
export const stopTransmitToIota = (): void => {
|
||||
running = false
|
||||
}
|
||||
/**
|
||||
* check for pending transactions:
|
||||
* - if one found call TransmitToIotaContext
|
||||
* - if not, wait 1000 ms and try again
|
||||
* if a new transaction was added, the sleep will be interrupted
|
||||
*/
|
||||
export const transmitToIota = async (): Promise<void> => {
|
||||
logger.info('start iota message transmitter')
|
||||
// eslint-disable-next-line no-unmodified-loop-condition
|
||||
while (running) {
|
||||
try {
|
||||
while (true) {
|
||||
const recipe = await TransactionRepository.getNextPendingTransaction()
|
||||
if (!recipe) break
|
||||
const transmitToIotaContext = new TransmitToIotaContext(recipe)
|
||||
await transmitToIotaContext.run()
|
||||
}
|
||||
|
||||
await InterruptiveSleepManager.getInstance().sleep(
|
||||
TRANSMIT_TO_IOTA_INTERRUPTIVE_SLEEP_KEY,
|
||||
// 1000,
|
||||
1000,
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('error while transmitting to iota, retry in 10 seconds ', error)
|
||||
await sleep(10000)
|
||||
}
|
||||
}
|
||||
logger.info(
|
||||
'end iota message transmitter, no further transaction will be transmitted. !!! Please restart Server !!!',
|
||||
)
|
||||
}
|
||||
31
dlt-connector/src/utils/InterruptiveSleep.ts
Normal file
31
dlt-connector/src/utils/InterruptiveSleep.ts
Normal file
@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Sleep, that can be interrupted
|
||||
* call sleep only for msSteps and than check if interrupt was called
|
||||
*/
|
||||
export class InterruptiveSleep {
|
||||
private interruptSleep = false
|
||||
private msSteps = 10
|
||||
|
||||
constructor(msSteps: number) {
|
||||
this.msSteps = msSteps
|
||||
}
|
||||
|
||||
public interrupt(): void {
|
||||
this.interruptSleep = true
|
||||
}
|
||||
|
||||
private static _sleep(ms: number) {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms)
|
||||
})
|
||||
}
|
||||
|
||||
public async sleep(ms: number): Promise<void> {
|
||||
let waited = 0
|
||||
this.interruptSleep = false
|
||||
while (waited < ms && !this.interruptSleep) {
|
||||
await InterruptiveSleep._sleep(this.msSteps)
|
||||
waited += this.msSteps
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user