synchronize concurrency

This commit is contained in:
Claus-Peter Huebner 2023-07-25 00:56:51 +02:00
parent 82eb0db086
commit d1b5000d8f
2 changed files with 63 additions and 29 deletions

View File

@ -3,40 +3,56 @@ import { DltTransaction } from '@entity/DltTransaction'
import { DltConnectorClient } from '@/apis/DltConnectorClient'
import { backendLogger as logger } from '@/server/logger'
import { Monitor } from '@/util/Monitor'
export async function sendTransactionsToDltConnector(): Promise<void> {
const dltConnector = DltConnectorClient.getInstance()
if (dltConnector) {
const dltTransactions = await DltTransaction.find({
where: { messageId: IsNull() },
relations: ['transaction'],
order: { createdAt: 'ASC', id: 'ASC' },
})
for (const dltTx of dltTransactions) {
logger.debug('sending dltTx=', dltTx)
if (dltTx.transaction && (dltTx.transaction ?? false)) {
try {
const messageId = await dltConnector.transmitTransaction(dltTx.transaction)
logger.debug('received messageId=', messageId)
const dltMessageId = Buffer.from(messageId, 'hex')
logger.debug('dltMessageId as Buffer=', dltMessageId)
if (dltMessageId.length !== 32) {
logger.error(
'Error dlt message id is invalid: %s, should by 32 Bytes long in binary after converting from hex',
dltMessageId,
)
return
// check if this logic is still occupied, no concurrecy allowed
if (!Monitor.isLocked) {
// mark this block for occuption to prevent concurrency
Monitor.lockIt()
try {
const dltConnector = DltConnectorClient.getInstance()
if (dltConnector) {
const dltTransactions = await DltTransaction.find({
where: { messageId: IsNull() },
relations: ['transaction'],
order: { createdAt: 'ASC', id: 'ASC' },
})
for (const dltTx of dltTransactions) {
logger.debug('sending dltTx=', dltTx)
if (dltTx.transaction && (dltTx.transaction ?? false)) {
try {
const messageId = await dltConnector.transmitTransaction(dltTx.transaction)
logger.debug('received messageId=', messageId)
const dltMessageId = Buffer.from(messageId, 'hex')
logger.debug('dltMessageId as Buffer=', dltMessageId)
if (dltMessageId.length !== 32) {
logger.error(
'Error dlt message id is invalid: %s, should by 32 Bytes long in binary after converting from hex',
dltMessageId,
)
return
}
dltTx.messageId = dltMessageId.toString()
await DltTransaction.save(dltTx)
logger.info('store messageId=%s in dltTx=%d', dltTx.messageId, dltTx.id)
} catch (e) {
logger.error(
`error while sending to dlt-connector or writing messageId of dltTx=${dltTx.id}`,
e,
)
}
}
dltTx.messageId = dltMessageId.toString()
await DltTransaction.save(dltTx)
logger.info('store messageId=%s in dltTx=%d', dltTx.messageId, dltTx.id)
} catch (e) {
logger.error(
`error while sending to dlt-connector or writing messageId of dltTx=${dltTx.id}`,
e,
)
}
}
} catch (e) {
logger.error('error on sending transactions to dlt-connector.', e)
} finally {
// releae Monitor occuption
Monitor.releaseIt()
}
} else {
logger.info('sendTransactionsToDltConnector currently locked by monitor...')
}
}

View File

@ -0,0 +1,18 @@
export class Monitor {
private static lock = false
// eslint-disable-next-line no-useless-constructor, @typescript-eslint/no-empty-function
private constructor() {}
public static isLocked = (): boolean => {
return Monitor.lock
}
public static lockIt(): void {
Monitor.lock = true
}
public static releaseIt(): void {
Monitor.lock = false
}
}