use semaphore to lock transactions

This commit is contained in:
Ulf Gebhardt 2022-12-06 11:25:37 +01:00
parent bf2e7fb79d
commit 7c528d5f9b
Signed by: ulfgebhardt
GPG Key ID: DA6B843E748679C9

View File

@ -40,6 +40,11 @@ import { sendTransactionLinkRedeemedEmail } from '@/mailer/sendTransactionLinkRe
import { Event, EventTransactionReceive, EventTransactionSend } from '@/event/Event'
import { eventProtocol } from '@/event/EventProtocolEmitter'
import { Semaphore } from 'await-semaphore'
const CONCURRENT_TRANSACTIONS = 1
const LOCK_TRANSACTIONS = new Semaphore(CONCURRENT_TRANSACTIONS)
export const executeTransaction = async (
amount: Decimal,
memo: string,
@ -51,141 +56,149 @@ export const executeTransaction = async (
`executeTransaction(amount=${amount}, memo=${memo}, sender=${sender}, recipient=${recipient})...`,
)
if (sender.id === recipient.id) {
logger.error(`Sender and Recipient are the same.`)
throw new Error('Sender and Recipient are the same.')
}
if (memo.length > MEMO_MAX_CHARS) {
logger.error(`memo text is too long: memo.length=${memo.length} > ${MEMO_MAX_CHARS}`)
throw new Error(`memo text is too long (${MEMO_MAX_CHARS} characters maximum)`)
}
if (memo.length < MEMO_MIN_CHARS) {
logger.error(`memo text is too short: memo.length=${memo.length} < ${MEMO_MIN_CHARS}`)
throw new Error(`memo text is too short (${MEMO_MIN_CHARS} characters minimum)`)
}
// validate amount
const receivedCallDate = new Date()
const sendBalance = await calculateBalance(
sender.id,
amount.mul(-1),
receivedCallDate,
transactionLink,
)
logger.debug(`calculated Balance=${sendBalance}`)
if (!sendBalance) {
logger.error(`user hasn't enough GDD or amount is < 0 : balance=${sendBalance}`)
throw new Error("user hasn't enough GDD or amount is < 0")
}
const queryRunner = getConnection().createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction('REPEATABLE READ')
logger.debug(`open Transaction to write...`)
// acquire lock
const releaseLock = await LOCK_TRANSACTIONS.acquire()
try {
// transaction
const transactionSend = new dbTransaction()
transactionSend.typeId = TransactionTypeId.SEND
transactionSend.memo = memo
transactionSend.userId = sender.id
transactionSend.linkedUserId = recipient.id
transactionSend.amount = amount.mul(-1)
transactionSend.balance = sendBalance.balance
transactionSend.balanceDate = receivedCallDate
transactionSend.decay = sendBalance.decay.decay
transactionSend.decayStart = sendBalance.decay.start
transactionSend.previous = sendBalance.lastTransactionId
transactionSend.transactionLinkId = transactionLink ? transactionLink.id : null
await queryRunner.manager.insert(dbTransaction, transactionSend)
logger.debug(`sendTransaction inserted: ${dbTransaction}`)
const transactionReceive = new dbTransaction()
transactionReceive.typeId = TransactionTypeId.RECEIVE
transactionReceive.memo = memo
transactionReceive.userId = recipient.id
transactionReceive.linkedUserId = sender.id
transactionReceive.amount = amount
const receiveBalance = await calculateBalance(recipient.id, amount, receivedCallDate)
transactionReceive.balance = receiveBalance ? receiveBalance.balance : amount
transactionReceive.balanceDate = receivedCallDate
transactionReceive.decay = receiveBalance ? receiveBalance.decay.decay : new Decimal(0)
transactionReceive.decayStart = receiveBalance ? receiveBalance.decay.start : null
transactionReceive.previous = receiveBalance ? receiveBalance.lastTransactionId : null
transactionReceive.linkedTransactionId = transactionSend.id
transactionReceive.transactionLinkId = transactionLink ? transactionLink.id : null
await queryRunner.manager.insert(dbTransaction, transactionReceive)
logger.debug(`receive Transaction inserted: ${dbTransaction}`)
// Save linked transaction id for send
transactionSend.linkedTransactionId = transactionReceive.id
await queryRunner.manager.update(dbTransaction, { id: transactionSend.id }, transactionSend)
logger.debug(`send Transaction updated: ${transactionSend}`)
if (transactionLink) {
logger.info(`transactionLink: ${transactionLink}`)
transactionLink.redeemedAt = receivedCallDate
transactionLink.redeemedBy = recipient.id
await queryRunner.manager.update(
dbTransactionLink,
{ id: transactionLink.id },
transactionLink,
)
if (sender.id === recipient.id) {
logger.error(`Sender and Recipient are the same.`)
throw new Error('Sender and Recipient are the same.')
}
await queryRunner.commitTransaction()
logger.info(`commit Transaction successful...`)
if (memo.length > MEMO_MAX_CHARS) {
logger.error(`memo text is too long: memo.length=${memo.length} > ${MEMO_MAX_CHARS}`)
throw new Error(`memo text is too long (${MEMO_MAX_CHARS} characters maximum)`)
}
const eventTransactionSend = new EventTransactionSend()
eventTransactionSend.userId = transactionSend.userId
eventTransactionSend.xUserId = transactionSend.linkedUserId
eventTransactionSend.transactionId = transactionSend.id
eventTransactionSend.amount = transactionSend.amount.mul(-1)
await eventProtocol.writeEvent(new Event().setEventTransactionSend(eventTransactionSend))
if (memo.length < MEMO_MIN_CHARS) {
logger.error(`memo text is too short: memo.length=${memo.length} < ${MEMO_MIN_CHARS}`)
throw new Error(`memo text is too short (${MEMO_MIN_CHARS} characters minimum)`)
}
const eventTransactionReceive = new EventTransactionReceive()
eventTransactionReceive.userId = transactionReceive.userId
eventTransactionReceive.xUserId = transactionReceive.linkedUserId
eventTransactionReceive.transactionId = transactionReceive.id
eventTransactionReceive.amount = transactionReceive.amount
await eventProtocol.writeEvent(new Event().setEventTransactionReceive(eventTransactionReceive))
} catch (e) {
await queryRunner.rollbackTransaction()
logger.error(`Transaction was not successful: ${e}`)
throw new Error(`Transaction was not successful: ${e}`)
} finally {
await queryRunner.release()
}
logger.debug(`prepare Email for transaction received...`)
// send notification email
// TODO: translate
await sendTransactionReceivedEmail({
senderFirstName: sender.firstName,
senderLastName: sender.lastName,
recipientFirstName: recipient.firstName,
recipientLastName: recipient.lastName,
email: recipient.emailContact.email,
senderEmail: sender.emailContact.email,
amount,
overviewURL: CONFIG.EMAIL_LINK_OVERVIEW,
})
if (transactionLink) {
await sendTransactionLinkRedeemedEmail({
senderFirstName: recipient.firstName,
senderLastName: recipient.lastName,
recipientFirstName: sender.firstName,
recipientLastName: sender.lastName,
email: sender.emailContact.email,
senderEmail: recipient.emailContact.email,
// validate amount
const receivedCallDate = new Date()
const sendBalance = await calculateBalance(
sender.id,
amount.mul(-1),
receivedCallDate,
transactionLink,
)
logger.debug(`calculated Balance=${sendBalance}`)
if (!sendBalance) {
logger.error(`user hasn't enough GDD or amount is < 0 : balance=${sendBalance}`)
throw new Error("user hasn't enough GDD or amount is < 0")
}
const queryRunner = getConnection().createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction('REPEATABLE READ')
logger.debug(`open Transaction to write...`)
try {
// transaction
const transactionSend = new dbTransaction()
transactionSend.typeId = TransactionTypeId.SEND
transactionSend.memo = memo
transactionSend.userId = sender.id
transactionSend.linkedUserId = recipient.id
transactionSend.amount = amount.mul(-1)
transactionSend.balance = sendBalance.balance
transactionSend.balanceDate = receivedCallDate
transactionSend.decay = sendBalance.decay.decay
transactionSend.decayStart = sendBalance.decay.start
transactionSend.previous = sendBalance.lastTransactionId
transactionSend.transactionLinkId = transactionLink ? transactionLink.id : null
await queryRunner.manager.insert(dbTransaction, transactionSend)
logger.debug(`sendTransaction inserted: ${dbTransaction}`)
const transactionReceive = new dbTransaction()
transactionReceive.typeId = TransactionTypeId.RECEIVE
transactionReceive.memo = memo
transactionReceive.userId = recipient.id
transactionReceive.linkedUserId = sender.id
transactionReceive.amount = amount
const receiveBalance = await calculateBalance(recipient.id, amount, receivedCallDate)
transactionReceive.balance = receiveBalance ? receiveBalance.balance : amount
transactionReceive.balanceDate = receivedCallDate
transactionReceive.decay = receiveBalance ? receiveBalance.decay.decay : new Decimal(0)
transactionReceive.decayStart = receiveBalance ? receiveBalance.decay.start : null
transactionReceive.previous = receiveBalance ? receiveBalance.lastTransactionId : null
transactionReceive.linkedTransactionId = transactionSend.id
transactionReceive.transactionLinkId = transactionLink ? transactionLink.id : null
await queryRunner.manager.insert(dbTransaction, transactionReceive)
logger.debug(`receive Transaction inserted: ${dbTransaction}`)
// Save linked transaction id for send
transactionSend.linkedTransactionId = transactionReceive.id
await queryRunner.manager.update(dbTransaction, { id: transactionSend.id }, transactionSend)
logger.debug(`send Transaction updated: ${transactionSend}`)
if (transactionLink) {
logger.info(`transactionLink: ${transactionLink}`)
transactionLink.redeemedAt = receivedCallDate
transactionLink.redeemedBy = recipient.id
await queryRunner.manager.update(
dbTransactionLink,
{ id: transactionLink.id },
transactionLink,
)
}
await queryRunner.commitTransaction()
logger.info(`commit Transaction successful...`)
const eventTransactionSend = new EventTransactionSend()
eventTransactionSend.userId = transactionSend.userId
eventTransactionSend.xUserId = transactionSend.linkedUserId
eventTransactionSend.transactionId = transactionSend.id
eventTransactionSend.amount = transactionSend.amount.mul(-1)
await eventProtocol.writeEvent(new Event().setEventTransactionSend(eventTransactionSend))
const eventTransactionReceive = new EventTransactionReceive()
eventTransactionReceive.userId = transactionReceive.userId
eventTransactionReceive.xUserId = transactionReceive.linkedUserId
eventTransactionReceive.transactionId = transactionReceive.id
eventTransactionReceive.amount = transactionReceive.amount
await eventProtocol.writeEvent(
new Event().setEventTransactionReceive(eventTransactionReceive),
)
} catch (e) {
await queryRunner.rollbackTransaction()
logger.error(`Transaction was not successful: ${e}`)
throw new Error(`Transaction was not successful: ${e}`)
} finally {
await queryRunner.release()
}
logger.debug(`prepare Email for transaction received...`)
// send notification email
// TODO: translate
await sendTransactionReceivedEmail({
senderFirstName: sender.firstName,
senderLastName: sender.lastName,
recipientFirstName: recipient.firstName,
recipientLastName: recipient.lastName,
email: recipient.emailContact.email,
senderEmail: sender.emailContact.email,
amount,
memo,
overviewURL: CONFIG.EMAIL_LINK_OVERVIEW,
})
if (transactionLink) {
await sendTransactionLinkRedeemedEmail({
senderFirstName: recipient.firstName,
senderLastName: recipient.lastName,
recipientFirstName: sender.firstName,
recipientLastName: sender.lastName,
email: sender.emailContact.email,
senderEmail: recipient.emailContact.email,
amount,
memo,
overviewURL: CONFIG.EMAIL_LINK_OVERVIEW,
})
}
logger.info(`finished executeTransaction successfully`)
return true
} finally {
releaseLock()
}
logger.info(`finished executeTransaction successfully`)
return true
}
@Resolver()