From f09c8e917d83345bba7055df1fec681d33298f17 Mon Sep 17 00:00:00 2001 From: einhornimmond Date: Wed, 5 Nov 2025 14:17:23 +0100 Subject: [PATCH] move some code into interaction --- .../OrderedContainer.ts | 63 ------- .../db-v2.7.0_to_blockchain-v3.5/index.ts | 168 +----------------- .../syncDbWithBlockchain/AbstractSync.role.ts | 67 +++++++ .../DeletedTransactionLinksSync.role.ts | 13 ++ .../TransactionLinksSync.role.ts | 25 +++ .../TransactionsSync.role.ts | 27 +++ .../syncDbWithBlockchain/UsersSync.role.ts | 32 ++++ .../syncDbWithBlockchain.context.ts | 39 ++++ 8 files changed, 205 insertions(+), 229 deletions(-) delete mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/OrderedContainer.ts create mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/AbstractSync.role.ts create mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/DeletedTransactionLinksSync.role.ts create mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionLinksSync.role.ts create mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionsSync.role.ts create mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts create mode 100644 dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/OrderedContainer.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/OrderedContainer.ts deleted file mode 100644 index 47595b3d0..000000000 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/OrderedContainer.ts +++ /dev/null @@ -1,63 +0,0 @@ - -export type Loader = (context: ContextType, offset: number, count: number) => Promise - -export interface Orderable { - getDate(): Date - // return count of new loaded items - ensureFilled(context: ContextType, batchSize: number): Promise - pushToBlockchain(context: ContextType): Promise - isEmpty(): boolean - get length(): number -} - -export class OrderedContainer implements Orderable { - private items: T[] = [] - private offset = 0 - - constructor( - private readonly loader: Loader, - private readonly getDateHandler: (item: T) => Date, - private readonly pushToBlockchainHandler: (context: ContextType, item: T) => Promise - ) {} - - - async ensureFilled(context: ContextType, batchSize: number): Promise { - if (this.items.length === 0) { - this.items = await this.loader(context, this.offset, batchSize) - this.offset += this.items.length - return this.items.length - } - return 0 - } - - async pushToBlockchain(context: ContextType): Promise { - return this.pushToBlockchainHandler(context, this.shift()) - } - - peek(): T { - if (this.isEmpty()) { - throw new Error(`[peek] No items, please call this only if isEmpty returns false`) - } - return this.items[0] - } - - shift(): T { - const item = this.items.shift() - if (!item) { - throw new Error(`[shift] No items, shift return undefined`) - } - return item - } - - get length(): number { - return this.items.length - } - - getDate(): Date { - return this.getDateHandler(this.peek()) - } - - isEmpty(): boolean { - return this.items.length === 0 - } -} \ No newline at end of file diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts index 1c309d021..14a85e9bf 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts @@ -5,26 +5,11 @@ import { Profiler } from 'gradido-blockchain-js' import { Logger } from 'log4js' -import { - CreatedUserDb, - loadDeletedTransactionLinks, - loadTransactionLinks, - loadTransactions, - loadUsers, - TransactionDb, - TransactionLinkDb -} from './database' -import { addRegisterAddressTransaction, addTransaction } from './blockchain' -import { generateKeyPairUserAccount } from './keyPair' -import { transactionDbToTransaction, transactionLinkDbToTransaction, userDbToTransaction } from './convert' -import { Orderable, OrderedContainer } from './OrderedContainer' -import { Context } from './Context' import { bootstrap } from './bootstrap' import { heapStats } from 'bun:jsc' import { onShutdown } from '../../../../shared/src/helper/onShutdown' -import { sleep } from 'bun' +import { syncDbWithBlockchainContext } from './interaction/syncDbWithBlockchain/syncDbWithBlockchain.context' -const publicKeyUserIdMap = new Map() async function main() { const timeUsed = new Profiler() @@ -40,48 +25,13 @@ async function main() { // synchronize to blockchain const BATCH_SIZE = 100 - const users = new OrderedContainer( - (context: Context, offset: number, count: number) => getNextUsers(context, offset, count), - (user: CreatedUserDb) => user.createdAt, - (context: Context, user: CreatedUserDb) => pushRegisterAddressTransaction(context, user) - ) - - const transactions = new OrderedContainer( - getNextTransactions, - (transaction: TransactionDb) => transaction.balanceDate, - (context: Context, transaction: TransactionDb) => pushTransaction(context, transaction) - ) - - const transactionLinks = new OrderedContainer( - getNextTransactionLinks, - (transactionLink: TransactionLinkDb) => transactionLink.createdAt, - (context: Context, transactionLink: TransactionLinkDb) => pushTransactionLink(context, transactionLink) - ) - - const deletedTransactionLinks = new OrderedContainer( - getNextDeletedTransactionLinks, - (transaction: TransactionDb) => transaction.balanceDate, - (context: Context, transaction: TransactionDb) => pushTransaction(context, transaction) - ) - try { - await synchronizeToBlockchain(context, [users, transactions, transactionLinks, deletedTransactionLinks], BATCH_SIZE) + await syncDbWithBlockchainContext(context, BATCH_SIZE) } catch (e) { console.error(e) throw e } context.logger.info(`${timeUsed.string()} for synchronizing to blockchain`) - // timeUsed.reset() - /*context.communities.forEach((communityContext) => { - const f = new Filter() - // hotfix for bug in gradido_blockchain for Filter::ALL_TRANSACTIONS - f.pagination.size = 0 - const transactions = communityContext.blockchain.findAll(f) - context.logger.info(`Community '${communityContext.communityId}', transactions: ${transactions.size()}`) - // logBlogchain(context.logger, communityContext.blockchain) - }) - context.logger.info(`${timeUsed.string()} for logging blockchains`) - */ const runtimeStats = heapStats() context.logger.info( `Memory Statistics: heap size: ${bytesToMbyte(runtimeStats.heapSize)} MByte, heap capacity: ${bytesToMbyte(runtimeStats.heapCapacity)} MByte, extra memory: ${bytesToMbyte(runtimeStats.extraMemorySize)} MByte` @@ -94,120 +44,6 @@ function bytesToMbyte(bytes: number): string { return (bytes / 1024 / 1024).toFixed(4) } -async function synchronizeToBlockchain( - context: Context, - containers: Orderable[], - batchSize: number -): Promise { - const timeUsed = new Profiler() - while (true) { - timeUsed.reset() - const results = await Promise.all(containers.map(c => c.ensureFilled(context, batchSize))) - const loadedItemsCount = results.reduce((acc, c) => acc + c, 0) - // log only, if at least one new item was loaded - if (loadedItemsCount && context.logger.isInfoEnabled()) { - context.logger.info(`${loadedItemsCount} new items loaded from db in ${timeUsed.string()}`) - } - - // remove empty containers - const available = containers.filter(c => !c.isEmpty()) - if (available.length === 0) { - break - } - - // find container with smallest date - if (available.length > 0) { - available.sort((a, b) => a.getDate().getTime() - b.getDate().getTime()) - } - - try { - await available[0].pushToBlockchain(context) - // await sleep(1) - } catch (e) { - context.logger.error(e) - // logBlogchain(context.logger, context.communities.values().next().value!.blockchain) - throw e - } - } -} - -// ---------------- load from db graiddo backend transactions format ----------------------------------------------- - -/// load next max ${count} users and calculate key pair for calculating signatures later -async function getNextUsers(context: Context, offset: number, count: number): Promise { - const timeUsed = new Profiler() - const users = await loadUsers(context.db, offset, count) - for (const user of users) { - const communityContext = context.getCommunityContextByUuid(user.communityUuid) - const { userKeyPair, accountKeyPair } = await generateKeyPairUserAccount(user, context.cache, communityContext.topicId) - publicKeyUserIdMap.set(userKeyPair.convertToHex(), user.gradidoId) - publicKeyUserIdMap.set(accountKeyPair.convertToHex(), user.gradidoId) - } - if(users.length !== 0) { - context.logger.info(`${timeUsed.string()} for loading ${users.length} users from db and calculate ed25519 KeyPairs for them`) - } - return users -} - -// load next max ${count} transactions (contain also redeem transaction link transactions) -async function getNextTransactions(context: Context, offset: number, count: number): Promise { - const timeUsed = new Profiler() - const transactions = await loadTransactions(context.db, offset, count) - if(transactions.length !== 0) { - context.logger.debug(`${timeUsed.string()} for loading ${transactions.length} transactions from db`) - } - return transactions -} - -// load next max ${count} transaction links (freshly created links, in blockchain format this is a separate transaction) -async function getNextTransactionLinks(context: Context, offset: number, count: number): Promise { - const timeUsed = new Profiler() - const transactionLinks = await loadTransactionLinks(context.db, offset, count) - if(transactionLinks.length !== 0) { - context.logger.debug(`${timeUsed.string()} for loading ${transactionLinks.length} transaction links from db`) - } - return transactionLinks -} - -// load next max ${count} deleted transaction links (in blockchain format this is a separate transaction) -async function getNextDeletedTransactionLinks(context: Context, offset: number, count: number): Promise { - const timeUsed = new Profiler() - const deletedTransactionLinks = await loadDeletedTransactionLinks(context.db, offset, count) - if(deletedTransactionLinks.length !== 0) { - context.logger.debug(`${timeUsed.string()} for loading ${deletedTransactionLinks.length} deleted transaction links from db`) - } - return deletedTransactionLinks -} - -// ---------------- put into in memory blockchain ----------------------------------------------- - -async function pushRegisterAddressTransaction(context: Context, user: CreatedUserDb): Promise { - const communityContext = context.getCommunityContextByUuid(user.communityUuid) - const transaction = userDbToTransaction(user, communityContext.topicId) - return await addRegisterAddressTransaction(communityContext.blockchain, transaction) -} - -async function pushTransaction(context: Context, transactionDb: TransactionDb): Promise { - const senderCommunityContext = context.getCommunityContextByUuid(transactionDb.user.communityUuid) - // context.logger.info(`before adding non register address and non link transaction:`) - // logBlogchain(context.logger, senderCommunityContext.blockchain) - const recipientCommunityContext = context.getCommunityContextByUuid(transactionDb.linkedUser.communityUuid) - // CreationTransactionRole will check that community topic id belongs to home community - context.cache.setHomeCommunityTopicId(senderCommunityContext.topicId) - const transaction = transactionDbToTransaction(transactionDb, senderCommunityContext.topicId, recipientCommunityContext.topicId) - await addTransaction(senderCommunityContext.blockchain, recipientCommunityContext.blockchain, transaction) - // const firstTransaction = senderCommunityContext.blockchain.findOne(Filter.FIRST_TRANSACTION) - // console.log(`first transaction: ${firstTransaction?.getConfirmedTransaction()?.toJson(true)}`) -} - -async function pushTransactionLink(context: Context, transactionLinkDb: TransactionLinkDb): Promise { - const communityContext = context.getCommunityContextByUuid(transactionLinkDb.user.communityUuid) - const transaction = transactionLinkDbToTransaction(transactionLinkDb, communityContext.topicId) - await addTransaction(communityContext.blockchain, communityContext.blockchain, transaction) -} - -// ---------------- utils ---------------------------------------------------------------------- - function logBlogchain(logger: Logger, blockchain: InMemoryBlockchain) { const f = new Filter() f.pagination.size = 0 diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/AbstractSync.role.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/AbstractSync.role.ts new file mode 100644 index 000000000..44fd0b471 --- /dev/null +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/AbstractSync.role.ts @@ -0,0 +1,67 @@ +import { Context } from '../../Context' +import { getLogger, Logger } from 'log4js' +import { LOG4JS_BASE_CATEGORY } from '../../../../config/const' +import { Profiler } from 'gradido-blockchain-js' + +export abstract class AbstractSyncRole { + private items: T[] = [] + private offset = 0 + protected logger: Logger + + constructor(protected readonly context: Context) { + this.logger = getLogger(`${LOG4JS_BASE_CATEGORY}.migrations.db-v2.7.0_to_blockchain-v3.5.interaction.syncDbWithBlockchain`) + } + + abstract getDate(): Date + abstract loadFromDb(offset: number, count: number): Promise + abstract pushToBlockchain(item: T): Promise + abstract itemTypeName(): string + + // return count of new loaded items + async ensureFilled(batchSize: number): Promise + { + if (this.items.length === 0) { + let timeUsed: Profiler | undefined + if (this.logger.isDebugEnabled()) { + timeUsed = new Profiler() + } + this.items = await this.loadFromDb(this.offset, batchSize) + this.offset += this.items.length + if (timeUsed && this.items.length) { + this.logger.debug(`${timeUsed.string()} for loading ${this.items.length} ${this.itemTypeName()} from db`) + } + return this.items.length + } + return 0 + } + + async toBlockchain(): Promise { + if (this.isEmpty()) { + throw new Error(`[toBlockchain] No items, please call this only if isEmpty returns false`) + } + await this.pushToBlockchain(this.shift()) + } + + peek(): T { + if (this.isEmpty()) { + throw new Error(`[peek] No items, please call this only if isEmpty returns false`) + } + return this.items[0] + } + + shift(): T { + const item = this.items.shift() + if (!item) { + throw new Error(`[shift] No items, shift return undefined`) + } + return item + } + + get length(): number { + return this.items.length + } + + isEmpty(): boolean { + return this.items.length === 0 + } +} diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/DeletedTransactionLinksSync.role.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/DeletedTransactionLinksSync.role.ts new file mode 100644 index 000000000..d4e8eaa6e --- /dev/null +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/DeletedTransactionLinksSync.role.ts @@ -0,0 +1,13 @@ +import { loadDeletedTransactionLinks } from '../../database' +import { TransactionsSyncRole } from './TransactionsSync.role' +import { TransactionDb } from '../../database' + +export class DeletedTransactionLinksSyncRole extends TransactionsSyncRole { + itemTypeName(): string { + return 'deletedTransactionLinks' + } + + async loadFromDb(offset: number, count: number): Promise { + return await loadDeletedTransactionLinks(this.context.db, offset, count) + } +} \ No newline at end of file diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionLinksSync.role.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionLinksSync.role.ts new file mode 100644 index 000000000..c4d5e94d7 --- /dev/null +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionLinksSync.role.ts @@ -0,0 +1,25 @@ +import { TransactionLinkDb, loadTransactionLinks } from '../../database' +import { transactionLinkDbToTransaction } from '../../convert' +import { addTransaction } from '../../blockchain' +import { AbstractSyncRole } from './AbstractSync.role' + +export class TransactionLinksSyncRole extends AbstractSyncRole { + getDate(): Date { + return this.peek().createdAt + } + + itemTypeName(): string { + return 'transactionLinks' + } + + async loadFromDb(offset: number, count: number): Promise { + return await loadTransactionLinks(this.context.db, offset, count) + } + + async pushToBlockchain(item: TransactionLinkDb): Promise { + const communityContext = this.context.getCommunityContextByUuid(item.user.communityUuid) + const transaction = transactionLinkDbToTransaction(item, communityContext.topicId) + await addTransaction(communityContext.blockchain, communityContext.blockchain, transaction) + } +} + \ No newline at end of file diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionsSync.role.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionsSync.role.ts new file mode 100644 index 000000000..2f05e4378 --- /dev/null +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/TransactionsSync.role.ts @@ -0,0 +1,27 @@ +import { TransactionDb, loadTransactions } from '../../database' +import { transactionDbToTransaction } from '../../convert' +import { addTransaction } from '../../blockchain' +import { AbstractSyncRole } from './AbstractSync.role' + +export class TransactionsSyncRole extends AbstractSyncRole { + getDate(): Date { + return this.peek().balanceDate + } + + itemTypeName(): string { + return 'transactions' + } + + async loadFromDb(offset: number, count: number): Promise { + return await loadTransactions(this.context.db, offset, count) + } + + async pushToBlockchain(item: TransactionDb): Promise { + const senderCommunityContext = this.context.getCommunityContextByUuid(item.user.communityUuid) + const recipientCommunityContext = this.context.getCommunityContextByUuid(item.linkedUser.communityUuid) + this.context.cache.setHomeCommunityTopicId(senderCommunityContext.topicId) + const transaction = transactionDbToTransaction(item, senderCommunityContext.topicId, recipientCommunityContext.topicId) + await addTransaction(senderCommunityContext.blockchain, recipientCommunityContext.blockchain, transaction) + } +} + \ No newline at end of file diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts new file mode 100644 index 000000000..ca4799b68 --- /dev/null +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts @@ -0,0 +1,32 @@ +import { CreatedUserDb } from '../../database' +import { AbstractSyncRole } from './AbstractSync.role' +import { loadUsers } from '../../database' +import { generateKeyPairUserAccount } from '../../keyPair' +import { userDbToTransaction } from '../../convert' +import { addRegisterAddressTransaction } from '../../blockchain' + + +export class UsersSyncRole extends AbstractSyncRole { + getDate(): Date { + return this.peek().createdAt + } + + itemTypeName(): string { + return 'users' + } + + async loadFromDb(offset: number, count: number): Promise { + const users = await loadUsers(this.context.db, offset, count) + for (const user of users) { + const communityContext = this.context.getCommunityContextByUuid(user.communityUuid) + await generateKeyPairUserAccount(user, this.context.cache, communityContext.topicId) + } + return users + } + + async pushToBlockchain(item: CreatedUserDb): Promise { + const communityContext = this.context.getCommunityContextByUuid(item.communityUuid) + const transaction = userDbToTransaction(item, communityContext.topicId) + return await addRegisterAddressTransaction(communityContext.blockchain, transaction) + } +} diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts new file mode 100644 index 000000000..edef30caa --- /dev/null +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts @@ -0,0 +1,39 @@ +import { Context } from '../../Context' +import { Profiler } from 'gradido-blockchain-js' +import { TransactionsSyncRole } from './TransactionsSync.role' +import { DeletedTransactionLinksSyncRole } from './DeletedTransactionLinksSync.role' +import { TransactionLinksSyncRole } from './TransactionLinksSync.role' +import { UsersSyncRole } from './UsersSync.role' + +export async function syncDbWithBlockchainContext(context: Context, batchSize: number) { + const timeUsed = new Profiler() + const containers = [ + new UsersSyncRole(context), + new TransactionsSyncRole(context), + new DeletedTransactionLinksSyncRole(context), + new TransactionLinksSyncRole(context) + ] + + while (true) { + timeUsed.reset() + const results = await Promise.all(containers.map(c => c.ensureFilled(batchSize))) + const loadedItemsCount = results.reduce((acc, c) => acc + c, 0) + // log only, if at least one new item was loaded + if (loadedItemsCount && context.logger.isInfoEnabled()) { + context.logger.info(`${loadedItemsCount} new items loaded from db in ${timeUsed.string()}`) + } + + // remove empty containers + const available = containers.filter(c => !c.isEmpty()) + if (available.length === 0) { + break + } + + // sort by date, to ensure container on index 0 is the one with the smallest date + if (available.length > 0) { + available.sort((a, b) => a.getDate().getTime() - b.getDate().getTime()) + } + await available[0].toBlockchain() + } +} +