mirror of
https://github.com/IT4Change/gradido.git
synced 2026-03-01 12:44:43 +00:00
introduce redis-semaphore along with a running docker redis server
This commit is contained in:
parent
d177081df5
commit
a2ee080ed5
@ -43,7 +43,6 @@ import {
|
|||||||
import { UpdateUnconfirmedContributionContext } from '@/interactions/updateUnconfirmedContribution/UpdateUnconfirmedContribution.context'
|
import { UpdateUnconfirmedContributionContext } from '@/interactions/updateUnconfirmedContribution/UpdateUnconfirmedContribution.context'
|
||||||
import { LogError } from '@/server/LogError'
|
import { LogError } from '@/server/LogError'
|
||||||
import { Context, getClientTimezoneOffset, getUser } from '@/server/context'
|
import { Context, getClientTimezoneOffset, getUser } from '@/server/context'
|
||||||
// import { TRANSACTIONS_LOCK } from 'database'
|
|
||||||
import { fullName } from 'core'
|
import { fullName } from 'core'
|
||||||
import { calculateDecay, Decay } from 'shared'
|
import { calculateDecay, Decay } from 'shared'
|
||||||
|
|
||||||
@ -61,11 +60,9 @@ import { extractGraphQLFields } from './util/extractGraphQLFields'
|
|||||||
import { findContributions } from './util/findContributions'
|
import { findContributions } from './util/findContributions'
|
||||||
import { getLastTransaction } from 'database'
|
import { getLastTransaction } from 'database'
|
||||||
import { contributionTransaction } from '@/apis/dltConnector'
|
import { contributionTransaction } from '@/apis/dltConnector'
|
||||||
import { Redis } from 'ioredis'
|
|
||||||
import { Mutex } from 'redis-semaphore'
|
import { Mutex } from 'redis-semaphore'
|
||||||
|
|
||||||
const db = AppDatabase.getInstance()
|
const db = AppDatabase.getInstance()
|
||||||
const redisClient = new Redis()
|
|
||||||
const createLogger = () => getLogger(`${LOG4JS_BASE_CATEGORY_NAME}.graphql.resolver.ContributionResolver`)
|
const createLogger = () => getLogger(`${LOG4JS_BASE_CATEGORY_NAME}.graphql.resolver.ContributionResolver`)
|
||||||
|
|
||||||
@Resolver(() => Contribution)
|
@Resolver(() => Contribution)
|
||||||
@ -440,8 +437,7 @@ export class ContributionResolver {
|
|||||||
const logger = createLogger()
|
const logger = createLogger()
|
||||||
logger.addContext('contribution', id)
|
logger.addContext('contribution', id)
|
||||||
// acquire lock
|
// acquire lock
|
||||||
// const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
const mutex = new Mutex (db.getRedisClient(), 'TRANSACTIONS_LOCK')
|
||||||
const mutex = new Mutex(redisClient, 'TRANSACTIONS_LOCK')
|
|
||||||
await mutex.acquire()
|
await mutex.acquire()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -32,8 +32,6 @@ import { listTransactionLinksAdmin } from '@/seeds/graphql/queries'
|
|||||||
import { transactionLinks } from '@/seeds/transactionLink/index'
|
import { transactionLinks } from '@/seeds/transactionLink/index'
|
||||||
import { bibiBloxberg } from '@/seeds/users/bibi-bloxberg'
|
import { bibiBloxberg } from '@/seeds/users/bibi-bloxberg'
|
||||||
import { peterLustig } from '@/seeds/users/peter-lustig'
|
import { peterLustig } from '@/seeds/users/peter-lustig'
|
||||||
import { TRANSACTIONS_LOCK } from 'database'
|
|
||||||
|
|
||||||
import { LOG4JS_BASE_CATEGORY_NAME } from '@/config/const'
|
import { LOG4JS_BASE_CATEGORY_NAME } from '@/config/const'
|
||||||
import { getLogger } from 'config-schema/test/testSetup'
|
import { getLogger } from 'config-schema/test/testSetup'
|
||||||
import { transactionLinkCode } from './TransactionLinkResolver'
|
import { transactionLinkCode } from './TransactionLinkResolver'
|
||||||
@ -46,8 +44,8 @@ jest.mock('@/password/EncryptorUtils')
|
|||||||
CONFIG.DLT_CONNECTOR = false
|
CONFIG.DLT_CONNECTOR = false
|
||||||
|
|
||||||
// mock semaphore to allow use fake timers
|
// mock semaphore to allow use fake timers
|
||||||
jest.mock('database/src/util/TRANSACTIONS_LOCK')
|
// jest.mock('database/src/util/TRANSACTIONS_LOCK')
|
||||||
TRANSACTIONS_LOCK.acquire = jest.fn().mockResolvedValue(jest.fn())
|
// TRANSACTIONS_LOCK.acquire = jest.fn().mockResolvedValue(jest.fn())
|
||||||
|
|
||||||
let mutate: ApolloServerTestClient['mutate']
|
let mutate: ApolloServerTestClient['mutate']
|
||||||
let query: ApolloServerTestClient['query']
|
let query: ApolloServerTestClient['query']
|
||||||
|
|||||||
@ -50,7 +50,6 @@ import { Redis } from 'ioredis'
|
|||||||
import { Mutex } from 'redis-semaphore'
|
import { Mutex } from 'redis-semaphore'
|
||||||
|
|
||||||
const db = AppDatabase.getInstance()
|
const db = AppDatabase.getInstance()
|
||||||
const redisClient = new Redis()
|
|
||||||
const createLogger = () => getLogger(`${LOG4JS_BASE_CATEGORY_NAME}.graphql.resolver.TransactionResolver`)
|
const createLogger = () => getLogger(`${LOG4JS_BASE_CATEGORY_NAME}.graphql.resolver.TransactionResolver`)
|
||||||
|
|
||||||
export const executeTransaction = async (
|
export const executeTransaction = async (
|
||||||
@ -63,7 +62,7 @@ export const executeTransaction = async (
|
|||||||
): Promise<boolean> => {
|
): Promise<boolean> => {
|
||||||
// acquire lock
|
// acquire lock
|
||||||
// const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
// const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
||||||
const mutex = new Mutex(redisClient, 'TRANSACTIONS_LOCK')
|
const mutex = new Mutex(db.getRedisClient(), 'TRANSACTIONS_LOCK')
|
||||||
await mutex.acquire()
|
await mutex.acquire()
|
||||||
|
|
||||||
const receivedCallDate = new Date()
|
const receivedCallDate = new Date()
|
||||||
|
|||||||
@ -22,7 +22,9 @@ import { bibiBloxberg } from '@/seeds/users/bibi-bloxberg'
|
|||||||
import { bobBaumeister } from '@/seeds/users/bob-baumeister'
|
import { bobBaumeister } from '@/seeds/users/bob-baumeister'
|
||||||
import { peterLustig } from '@/seeds/users/peter-lustig'
|
import { peterLustig } from '@/seeds/users/peter-lustig'
|
||||||
import { CONFIG } from '@/config'
|
import { CONFIG } from '@/config'
|
||||||
import { TRANSACTIONS_LOCK } from 'database'
|
// import { TRANSACTIONS_LOCK } from 'database'
|
||||||
|
import { Mutex } from 'redis-semaphore'
|
||||||
|
import { AppDatabase } from 'database'
|
||||||
|
|
||||||
jest.mock('@/password/EncryptorUtils')
|
jest.mock('@/password/EncryptorUtils')
|
||||||
|
|
||||||
@ -35,28 +37,34 @@ let testEnv: {
|
|||||||
mutate: ApolloServerTestClient['mutate']
|
mutate: ApolloServerTestClient['mutate']
|
||||||
query: ApolloServerTestClient['query']
|
query: ApolloServerTestClient['query']
|
||||||
con: DataSource
|
con: DataSource
|
||||||
|
db: AppDatabase
|
||||||
}
|
}
|
||||||
|
let mutex: Mutex
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
testEnv = await testEnvironment()
|
testEnv = await testEnvironment()
|
||||||
mutate = testEnv.mutate
|
mutate = testEnv.mutate
|
||||||
con = testEnv.con
|
con = testEnv.con
|
||||||
|
mutex = new Mutex(testEnv.db.getRedisClient(), 'TRANSACTIONS_LOCK')
|
||||||
await cleanDB()
|
await cleanDB()
|
||||||
})
|
})
|
||||||
|
|
||||||
afterAll(async () => {
|
afterAll(async () => {
|
||||||
await cleanDB()
|
await cleanDB()
|
||||||
await con.destroy()
|
await con.destroy()
|
||||||
|
await testEnv.db.getRedisClient().quit()
|
||||||
})
|
})
|
||||||
|
|
||||||
type RunOrder = { [key: number]: { start: number, end: number } }
|
type RunOrder = { [key: number]: { start: number, end: number } }
|
||||||
async function fakeWork(runOrder: RunOrder, index: number) {
|
async function fakeWork(runOrder: RunOrder, index: number) {
|
||||||
const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
// const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
||||||
|
await mutex.acquire()
|
||||||
|
|
||||||
const startDate = new Date()
|
const startDate = new Date()
|
||||||
await new Promise((resolve) => setTimeout(resolve, Math.random() * 50))
|
await new Promise((resolve) => setTimeout(resolve, Math.random() * 50))
|
||||||
const endDate = new Date()
|
const endDate = new Date()
|
||||||
runOrder[index] = { start: startDate.getTime(), end: endDate.getTime() }
|
runOrder[index] = { start: startDate.getTime(), end: endDate.getTime() }
|
||||||
releaseLock()
|
// releaseLock()
|
||||||
|
await mutex.release()
|
||||||
}
|
}
|
||||||
|
|
||||||
describe('semaphore', () => {
|
describe('semaphore', () => {
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import { slowDown } from 'express-slow-down'
|
|||||||
import helmet from 'helmet'
|
import helmet from 'helmet'
|
||||||
import { Logger, getLogger } from 'log4js'
|
import { Logger, getLogger } from 'log4js'
|
||||||
import { DataSource } from 'typeorm'
|
import { DataSource } from 'typeorm'
|
||||||
|
import { Redis } from 'ioredis'
|
||||||
import { GRADIDO_REALM, LOG4JS_BASE_CATEGORY_NAME } from '@/config/const'
|
import { GRADIDO_REALM, LOG4JS_BASE_CATEGORY_NAME } from '@/config/const'
|
||||||
import { AppDatabase } from 'database'
|
import { AppDatabase } from 'database'
|
||||||
import { context as serverContext } from './context'
|
import { context as serverContext } from './context'
|
||||||
@ -23,6 +23,7 @@ interface ServerDef {
|
|||||||
apollo: ApolloServer
|
apollo: ApolloServer
|
||||||
app: Express
|
app: Express
|
||||||
con: DataSource
|
con: DataSource
|
||||||
|
db: AppDatabase
|
||||||
}
|
}
|
||||||
|
|
||||||
export const createServer = async (
|
export const createServer = async (
|
||||||
@ -104,5 +105,5 @@ export const createServer = async (
|
|||||||
)
|
)
|
||||||
logger.debug('createServer...successful')
|
logger.debug('createServer...successful')
|
||||||
|
|
||||||
return { apollo, app, con: db.getDataSource() }
|
return { apollo, app, con: db.getDataSource(), db }
|
||||||
}
|
}
|
||||||
|
|||||||
@ -35,7 +35,7 @@ export const testEnvironment = async (testLogger = getLogger('apollo'), testI18n
|
|||||||
const testClient = createTestClient(server.apollo)
|
const testClient = createTestClient(server.apollo)
|
||||||
const mutate = testClient.mutate
|
const mutate = testClient.mutate
|
||||||
const query = testClient.query
|
const query = testClient.query
|
||||||
return { mutate, query, con }
|
return { mutate, query, con, db: server.db }
|
||||||
}
|
}
|
||||||
|
|
||||||
export const resetEntity = async (entity: any) => {
|
export const resetEntity = async (entity: any) => {
|
||||||
|
|||||||
@ -14,13 +14,12 @@ import { LOG4JS_BASE_CATEGORY_NAME } from '../../config/const'
|
|||||||
import { PendingTransactionState } from 'shared'
|
import { PendingTransactionState } from 'shared'
|
||||||
// import { LogError } from '@/server/LogError'
|
// import { LogError } from '@/server/LogError'
|
||||||
import { calculateSenderBalance } from '../../util/calculateSenderBalance'
|
import { calculateSenderBalance } from '../../util/calculateSenderBalance'
|
||||||
import { TRANSACTIONS_LOCK, getLastTransaction } from 'database'
|
// import { TRANSACTIONS_LOCK, getLastTransaction } from 'database'
|
||||||
|
import { getLastTransaction } from 'database'
|
||||||
import { getLogger } from 'log4js'
|
import { getLogger } from 'log4js'
|
||||||
import { Redis } from 'ioredis'
|
|
||||||
import { Mutex } from 'redis-semaphore'
|
import { Mutex } from 'redis-semaphore'
|
||||||
|
|
||||||
const db = AppDatabase.getInstance()
|
const db = AppDatabase.getInstance()
|
||||||
const redisClient = new Redis()
|
|
||||||
const logger = getLogger(
|
const logger = getLogger(
|
||||||
`${LOG4JS_BASE_CATEGORY_NAME}.graphql.logic.settlePendingSenderTransaction`,
|
`${LOG4JS_BASE_CATEGORY_NAME}.graphql.logic.settlePendingSenderTransaction`,
|
||||||
)
|
)
|
||||||
@ -33,7 +32,7 @@ export async function settlePendingSenderTransaction(
|
|||||||
// TODO: synchronisation with TRANSACTION_LOCK of federation-modul necessary!!!
|
// TODO: synchronisation with TRANSACTION_LOCK of federation-modul necessary!!!
|
||||||
// acquire lock
|
// acquire lock
|
||||||
// const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
// const releaseLock = await TRANSACTIONS_LOCK.acquire()
|
||||||
const mutex = new Mutex(redisClient, 'TRANSACTIONS_LOCK')
|
const mutex = new Mutex(db.getRedisClient(), 'TRANSACTIONS_LOCK')
|
||||||
await mutex.acquire()
|
await mutex.acquire()
|
||||||
|
|
||||||
const queryRunner = db.getDataSource().createQueryRunner()
|
const queryRunner = db.getDataSource().createQueryRunner()
|
||||||
|
|||||||
@ -5,12 +5,14 @@ import { getLogger } from 'log4js'
|
|||||||
import { latestDbVersion } from '.'
|
import { latestDbVersion } from '.'
|
||||||
import { CONFIG } from './config'
|
import { CONFIG } from './config'
|
||||||
import { LOG4JS_BASE_CATEGORY_NAME } from './config/const'
|
import { LOG4JS_BASE_CATEGORY_NAME } from './config/const'
|
||||||
|
import Redis from 'ioredis'
|
||||||
|
|
||||||
const logger = getLogger(`${LOG4JS_BASE_CATEGORY_NAME}.AppDatabase`)
|
const logger = getLogger(`${LOG4JS_BASE_CATEGORY_NAME}.AppDatabase`)
|
||||||
|
|
||||||
export class AppDatabase {
|
export class AppDatabase {
|
||||||
private static instance: AppDatabase
|
private static instance: AppDatabase
|
||||||
private dataSource: DBDataSource | undefined
|
private dataSource: DBDataSource | undefined
|
||||||
|
private redisClient: Redis
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Singleton's constructor should always be private to prevent direct
|
* The Singleton's constructor should always be private to prevent direct
|
||||||
@ -88,10 +90,24 @@ export class AppDatabase {
|
|||||||
}
|
}
|
||||||
// check for correct database version
|
// check for correct database version
|
||||||
await this.checkDBVersion()
|
await this.checkDBVersion()
|
||||||
|
|
||||||
|
this.redisClient = new Redis(CONFIG.REDIS_URL)
|
||||||
|
console.log('Redis status=', this.redisClient.status)
|
||||||
|
logger.info('Redis status=', this.redisClient.status)
|
||||||
}
|
}
|
||||||
|
|
||||||
public async destroy(): Promise<void> {
|
public async destroy(): Promise<void> {
|
||||||
await this.dataSource?.destroy()
|
await this.dataSource?.destroy()
|
||||||
|
if (this.redisClient) {
|
||||||
|
this.redisClient.quit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public getRedisClient(): Redis {
|
||||||
|
if (!this.redisClient) {
|
||||||
|
throw new Error('Redis client not initialized')
|
||||||
|
}
|
||||||
|
return this.redisClient
|
||||||
}
|
}
|
||||||
|
|
||||||
// ######################################
|
// ######################################
|
||||||
|
|||||||
@ -24,5 +24,6 @@ const database = {
|
|||||||
}
|
}
|
||||||
const PRODUCTION = process.env.NODE_ENV === 'production' || false
|
const PRODUCTION = process.env.NODE_ENV === 'production' || false
|
||||||
const nodeEnv = process.env.NODE_ENV || 'development'
|
const nodeEnv = process.env.NODE_ENV || 'development'
|
||||||
|
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'
|
||||||
|
|
||||||
export const CONFIG = { ...database, NODE_ENV: nodeEnv, PRODUCTION, ...defaults }
|
export const CONFIG = { ...database, NODE_ENV: nodeEnv, PRODUCTION, REDIS_URL, ...defaults }
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user