move some code into interaction

This commit is contained in:
einhornimmond 2025-11-05 14:17:23 +01:00
parent cbd9870825
commit f09c8e917d
8 changed files with 205 additions and 229 deletions

View File

@ -1,63 +0,0 @@
export type Loader<T, ContextType> = (context: ContextType, offset: number, count: number) => Promise<T[]>
export interface Orderable<ContextType> {
getDate(): Date
// return count of new loaded items
ensureFilled(context: ContextType, batchSize: number): Promise<number>
pushToBlockchain(context: ContextType): Promise<void>
isEmpty(): boolean
get length(): number
}
export class OrderedContainer<T, ContextType> implements Orderable<ContextType> {
private items: T[] = []
private offset = 0
constructor(
private readonly loader: Loader<T, ContextType>,
private readonly getDateHandler: (item: T) => Date,
private readonly pushToBlockchainHandler: (context: ContextType, item: T) => Promise<void>
) {}
async ensureFilled(context: ContextType, batchSize: number): Promise<number> {
if (this.items.length === 0) {
this.items = await this.loader(context, this.offset, batchSize)
this.offset += this.items.length
return this.items.length
}
return 0
}
async pushToBlockchain(context: ContextType): Promise<void> {
return this.pushToBlockchainHandler(context, this.shift())
}
peek(): T {
if (this.isEmpty()) {
throw new Error(`[peek] No items, please call this only if isEmpty returns false`)
}
return this.items[0]
}
shift(): T {
const item = this.items.shift()
if (!item) {
throw new Error(`[shift] No items, shift return undefined`)
}
return item
}
get length(): number {
return this.items.length
}
getDate(): Date {
return this.getDateHandler(this.peek())
}
isEmpty(): boolean {
return this.items.length === 0
}
}

View File

@ -5,26 +5,11 @@ import {
Profiler
} from 'gradido-blockchain-js'
import { Logger } from 'log4js'
import {
CreatedUserDb,
loadDeletedTransactionLinks,
loadTransactionLinks,
loadTransactions,
loadUsers,
TransactionDb,
TransactionLinkDb
} from './database'
import { addRegisterAddressTransaction, addTransaction } from './blockchain'
import { generateKeyPairUserAccount } from './keyPair'
import { transactionDbToTransaction, transactionLinkDbToTransaction, userDbToTransaction } from './convert'
import { Orderable, OrderedContainer } from './OrderedContainer'
import { Context } from './Context'
import { bootstrap } from './bootstrap'
import { heapStats } from 'bun:jsc'
import { onShutdown } from '../../../../shared/src/helper/onShutdown'
import { sleep } from 'bun'
import { syncDbWithBlockchainContext } from './interaction/syncDbWithBlockchain/syncDbWithBlockchain.context'
const publicKeyUserIdMap = new Map<string, string>()
async function main() {
const timeUsed = new Profiler()
@ -40,48 +25,13 @@ async function main() {
// synchronize to blockchain
const BATCH_SIZE = 100
const users = new OrderedContainer(
(context: Context, offset: number, count: number) => getNextUsers(context, offset, count),
(user: CreatedUserDb) => user.createdAt,
(context: Context, user: CreatedUserDb) => pushRegisterAddressTransaction(context, user)
)
const transactions = new OrderedContainer(
getNextTransactions,
(transaction: TransactionDb) => transaction.balanceDate,
(context: Context, transaction: TransactionDb) => pushTransaction(context, transaction)
)
const transactionLinks = new OrderedContainer(
getNextTransactionLinks,
(transactionLink: TransactionLinkDb) => transactionLink.createdAt,
(context: Context, transactionLink: TransactionLinkDb) => pushTransactionLink(context, transactionLink)
)
const deletedTransactionLinks = new OrderedContainer(
getNextDeletedTransactionLinks,
(transaction: TransactionDb) => transaction.balanceDate,
(context: Context, transaction: TransactionDb) => pushTransaction(context, transaction)
)
try {
await synchronizeToBlockchain(context, [users, transactions, transactionLinks, deletedTransactionLinks], BATCH_SIZE)
await syncDbWithBlockchainContext(context, BATCH_SIZE)
} catch (e) {
console.error(e)
throw e
}
context.logger.info(`${timeUsed.string()} for synchronizing to blockchain`)
// timeUsed.reset()
/*context.communities.forEach((communityContext) => {
const f = new Filter()
// hotfix for bug in gradido_blockchain for Filter::ALL_TRANSACTIONS
f.pagination.size = 0
const transactions = communityContext.blockchain.findAll(f)
context.logger.info(`Community '${communityContext.communityId}', transactions: ${transactions.size()}`)
// logBlogchain(context.logger, communityContext.blockchain)
})
context.logger.info(`${timeUsed.string()} for logging blockchains`)
*/
const runtimeStats = heapStats()
context.logger.info(
`Memory Statistics: heap size: ${bytesToMbyte(runtimeStats.heapSize)} MByte, heap capacity: ${bytesToMbyte(runtimeStats.heapCapacity)} MByte, extra memory: ${bytesToMbyte(runtimeStats.extraMemorySize)} MByte`
@ -94,120 +44,6 @@ function bytesToMbyte(bytes: number): string {
return (bytes / 1024 / 1024).toFixed(4)
}
async function synchronizeToBlockchain(
context: Context,
containers: Orderable<Context>[],
batchSize: number
): Promise<void> {
const timeUsed = new Profiler()
while (true) {
timeUsed.reset()
const results = await Promise.all(containers.map(c => c.ensureFilled(context, batchSize)))
const loadedItemsCount = results.reduce((acc, c) => acc + c, 0)
// log only, if at least one new item was loaded
if (loadedItemsCount && context.logger.isInfoEnabled()) {
context.logger.info(`${loadedItemsCount} new items loaded from db in ${timeUsed.string()}`)
}
// remove empty containers
const available = containers.filter(c => !c.isEmpty())
if (available.length === 0) {
break
}
// find container with smallest date
if (available.length > 0) {
available.sort((a, b) => a.getDate().getTime() - b.getDate().getTime())
}
try {
await available[0].pushToBlockchain(context)
// await sleep(1)
} catch (e) {
context.logger.error(e)
// logBlogchain(context.logger, context.communities.values().next().value!.blockchain)
throw e
}
}
}
// ---------------- load from db graiddo backend transactions format -----------------------------------------------
/// load next max ${count} users and calculate key pair for calculating signatures later
async function getNextUsers(context: Context, offset: number, count: number): Promise<CreatedUserDb[]> {
const timeUsed = new Profiler()
const users = await loadUsers(context.db, offset, count)
for (const user of users) {
const communityContext = context.getCommunityContextByUuid(user.communityUuid)
const { userKeyPair, accountKeyPair } = await generateKeyPairUserAccount(user, context.cache, communityContext.topicId)
publicKeyUserIdMap.set(userKeyPair.convertToHex(), user.gradidoId)
publicKeyUserIdMap.set(accountKeyPair.convertToHex(), user.gradidoId)
}
if(users.length !== 0) {
context.logger.info(`${timeUsed.string()} for loading ${users.length} users from db and calculate ed25519 KeyPairs for them`)
}
return users
}
// load next max ${count} transactions (contain also redeem transaction link transactions)
async function getNextTransactions(context: Context, offset: number, count: number): Promise<TransactionDb[]> {
const timeUsed = new Profiler()
const transactions = await loadTransactions(context.db, offset, count)
if(transactions.length !== 0) {
context.logger.debug(`${timeUsed.string()} for loading ${transactions.length} transactions from db`)
}
return transactions
}
// load next max ${count} transaction links (freshly created links, in blockchain format this is a separate transaction)
async function getNextTransactionLinks(context: Context, offset: number, count: number): Promise<TransactionLinkDb[]> {
const timeUsed = new Profiler()
const transactionLinks = await loadTransactionLinks(context.db, offset, count)
if(transactionLinks.length !== 0) {
context.logger.debug(`${timeUsed.string()} for loading ${transactionLinks.length} transaction links from db`)
}
return transactionLinks
}
// load next max ${count} deleted transaction links (in blockchain format this is a separate transaction)
async function getNextDeletedTransactionLinks(context: Context, offset: number, count: number): Promise<TransactionDb[]> {
const timeUsed = new Profiler()
const deletedTransactionLinks = await loadDeletedTransactionLinks(context.db, offset, count)
if(deletedTransactionLinks.length !== 0) {
context.logger.debug(`${timeUsed.string()} for loading ${deletedTransactionLinks.length} deleted transaction links from db`)
}
return deletedTransactionLinks
}
// ---------------- put into in memory blockchain -----------------------------------------------
async function pushRegisterAddressTransaction(context: Context, user: CreatedUserDb): Promise<void> {
const communityContext = context.getCommunityContextByUuid(user.communityUuid)
const transaction = userDbToTransaction(user, communityContext.topicId)
return await addRegisterAddressTransaction(communityContext.blockchain, transaction)
}
async function pushTransaction(context: Context, transactionDb: TransactionDb): Promise<void> {
const senderCommunityContext = context.getCommunityContextByUuid(transactionDb.user.communityUuid)
// context.logger.info(`before adding non register address and non link transaction:`)
// logBlogchain(context.logger, senderCommunityContext.blockchain)
const recipientCommunityContext = context.getCommunityContextByUuid(transactionDb.linkedUser.communityUuid)
// CreationTransactionRole will check that community topic id belongs to home community
context.cache.setHomeCommunityTopicId(senderCommunityContext.topicId)
const transaction = transactionDbToTransaction(transactionDb, senderCommunityContext.topicId, recipientCommunityContext.topicId)
await addTransaction(senderCommunityContext.blockchain, recipientCommunityContext.blockchain, transaction)
// const firstTransaction = senderCommunityContext.blockchain.findOne(Filter.FIRST_TRANSACTION)
// console.log(`first transaction: ${firstTransaction?.getConfirmedTransaction()?.toJson(true)}`)
}
async function pushTransactionLink(context: Context, transactionLinkDb: TransactionLinkDb): Promise<void> {
const communityContext = context.getCommunityContextByUuid(transactionLinkDb.user.communityUuid)
const transaction = transactionLinkDbToTransaction(transactionLinkDb, communityContext.topicId)
await addTransaction(communityContext.blockchain, communityContext.blockchain, transaction)
}
// ---------------- utils ----------------------------------------------------------------------
function logBlogchain(logger: Logger, blockchain: InMemoryBlockchain) {
const f = new Filter()
f.pagination.size = 0

View File

@ -0,0 +1,67 @@
import { Context } from '../../Context'
import { getLogger, Logger } from 'log4js'
import { LOG4JS_BASE_CATEGORY } from '../../../../config/const'
import { Profiler } from 'gradido-blockchain-js'
export abstract class AbstractSyncRole<T> {
private items: T[] = []
private offset = 0
protected logger: Logger
constructor(protected readonly context: Context) {
this.logger = getLogger(`${LOG4JS_BASE_CATEGORY}.migrations.db-v2.7.0_to_blockchain-v3.5.interaction.syncDbWithBlockchain`)
}
abstract getDate(): Date
abstract loadFromDb(offset: number, count: number): Promise<T[]>
abstract pushToBlockchain(item: T): Promise<void>
abstract itemTypeName(): string
// return count of new loaded items
async ensureFilled(batchSize: number): Promise<number>
{
if (this.items.length === 0) {
let timeUsed: Profiler | undefined
if (this.logger.isDebugEnabled()) {
timeUsed = new Profiler()
}
this.items = await this.loadFromDb(this.offset, batchSize)
this.offset += this.items.length
if (timeUsed && this.items.length) {
this.logger.debug(`${timeUsed.string()} for loading ${this.items.length} ${this.itemTypeName()} from db`)
}
return this.items.length
}
return 0
}
async toBlockchain(): Promise<void> {
if (this.isEmpty()) {
throw new Error(`[toBlockchain] No items, please call this only if isEmpty returns false`)
}
await this.pushToBlockchain(this.shift())
}
peek(): T {
if (this.isEmpty()) {
throw new Error(`[peek] No items, please call this only if isEmpty returns false`)
}
return this.items[0]
}
shift(): T {
const item = this.items.shift()
if (!item) {
throw new Error(`[shift] No items, shift return undefined`)
}
return item
}
get length(): number {
return this.items.length
}
isEmpty(): boolean {
return this.items.length === 0
}
}

View File

@ -0,0 +1,13 @@
import { loadDeletedTransactionLinks } from '../../database'
import { TransactionsSyncRole } from './TransactionsSync.role'
import { TransactionDb } from '../../database'
export class DeletedTransactionLinksSyncRole extends TransactionsSyncRole {
itemTypeName(): string {
return 'deletedTransactionLinks'
}
async loadFromDb(offset: number, count: number): Promise<TransactionDb[]> {
return await loadDeletedTransactionLinks(this.context.db, offset, count)
}
}

View File

@ -0,0 +1,25 @@
import { TransactionLinkDb, loadTransactionLinks } from '../../database'
import { transactionLinkDbToTransaction } from '../../convert'
import { addTransaction } from '../../blockchain'
import { AbstractSyncRole } from './AbstractSync.role'
export class TransactionLinksSyncRole extends AbstractSyncRole<TransactionLinkDb> {
getDate(): Date {
return this.peek().createdAt
}
itemTypeName(): string {
return 'transactionLinks'
}
async loadFromDb(offset: number, count: number): Promise<TransactionLinkDb[]> {
return await loadTransactionLinks(this.context.db, offset, count)
}
async pushToBlockchain(item: TransactionLinkDb): Promise<void> {
const communityContext = this.context.getCommunityContextByUuid(item.user.communityUuid)
const transaction = transactionLinkDbToTransaction(item, communityContext.topicId)
await addTransaction(communityContext.blockchain, communityContext.blockchain, transaction)
}
}

View File

@ -0,0 +1,27 @@
import { TransactionDb, loadTransactions } from '../../database'
import { transactionDbToTransaction } from '../../convert'
import { addTransaction } from '../../blockchain'
import { AbstractSyncRole } from './AbstractSync.role'
export class TransactionsSyncRole extends AbstractSyncRole<TransactionDb> {
getDate(): Date {
return this.peek().balanceDate
}
itemTypeName(): string {
return 'transactions'
}
async loadFromDb(offset: number, count: number): Promise<TransactionDb[]> {
return await loadTransactions(this.context.db, offset, count)
}
async pushToBlockchain(item: TransactionDb): Promise<void> {
const senderCommunityContext = this.context.getCommunityContextByUuid(item.user.communityUuid)
const recipientCommunityContext = this.context.getCommunityContextByUuid(item.linkedUser.communityUuid)
this.context.cache.setHomeCommunityTopicId(senderCommunityContext.topicId)
const transaction = transactionDbToTransaction(item, senderCommunityContext.topicId, recipientCommunityContext.topicId)
await addTransaction(senderCommunityContext.blockchain, recipientCommunityContext.blockchain, transaction)
}
}

View File

@ -0,0 +1,32 @@
import { CreatedUserDb } from '../../database'
import { AbstractSyncRole } from './AbstractSync.role'
import { loadUsers } from '../../database'
import { generateKeyPairUserAccount } from '../../keyPair'
import { userDbToTransaction } from '../../convert'
import { addRegisterAddressTransaction } from '../../blockchain'
export class UsersSyncRole extends AbstractSyncRole<CreatedUserDb> {
getDate(): Date {
return this.peek().createdAt
}
itemTypeName(): string {
return 'users'
}
async loadFromDb(offset: number, count: number): Promise<CreatedUserDb[]> {
const users = await loadUsers(this.context.db, offset, count)
for (const user of users) {
const communityContext = this.context.getCommunityContextByUuid(user.communityUuid)
await generateKeyPairUserAccount(user, this.context.cache, communityContext.topicId)
}
return users
}
async pushToBlockchain(item: CreatedUserDb): Promise<void> {
const communityContext = this.context.getCommunityContextByUuid(item.communityUuid)
const transaction = userDbToTransaction(item, communityContext.topicId)
return await addRegisterAddressTransaction(communityContext.blockchain, transaction)
}
}

View File

@ -0,0 +1,39 @@
import { Context } from '../../Context'
import { Profiler } from 'gradido-blockchain-js'
import { TransactionsSyncRole } from './TransactionsSync.role'
import { DeletedTransactionLinksSyncRole } from './DeletedTransactionLinksSync.role'
import { TransactionLinksSyncRole } from './TransactionLinksSync.role'
import { UsersSyncRole } from './UsersSync.role'
export async function syncDbWithBlockchainContext(context: Context, batchSize: number) {
const timeUsed = new Profiler()
const containers = [
new UsersSyncRole(context),
new TransactionsSyncRole(context),
new DeletedTransactionLinksSyncRole(context),
new TransactionLinksSyncRole(context)
]
while (true) {
timeUsed.reset()
const results = await Promise.all(containers.map(c => c.ensureFilled(batchSize)))
const loadedItemsCount = results.reduce((acc, c) => acc + c, 0)
// log only, if at least one new item was loaded
if (loadedItemsCount && context.logger.isInfoEnabled()) {
context.logger.info(`${loadedItemsCount} new items loaded from db in ${timeUsed.string()}`)
}
// remove empty containers
const available = containers.filter(c => !c.isEmpty())
if (available.length === 0) {
break
}
// sort by date, to ensure container on index 0 is the one with the smallest date
if (available.length > 0) {
available.sort((a, b) => a.getDate().getTime() - b.getDate().getTime())
}
await available[0].toBlockchain()
}
}