diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/blockchain.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/blockchain.ts index fd89d5ea0..fceecf541 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/blockchain.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/blockchain.ts @@ -5,10 +5,13 @@ import { HieroTransactionId, InMemoryBlockchain, InteractionSerialize, + Pagination, + Profiler, + SearchDirection_DESC, Timestamp, + TransactionType_DEFERRED_TRANSFER, } from 'gradido-blockchain-js' import { getLogger } from 'log4js' -import * as v from 'valibot' import { LOG4JS_BASE_CATEGORY } from '../../config/const' import { InputTransactionType } from '../../data/InputTransactionType.enum' import { LinkedTransactionKeyPairRole } from '../../interactions/resolveKeyPair/LinkedTransactionKeyPair.role' @@ -20,11 +23,14 @@ import { RegisterAddressTransactionRole } from '../../interactions/sendToHiero/R import { TransferTransactionRole } from '../../interactions/sendToHiero/TransferTransaction.role' import { Community, Transaction } from '../../schemas/transaction.schema' import { identifierSeedSchema } from '../../schemas/typeGuard.schema' +import { AbstractTransactionRole } from '../../interactions/sendToHiero/AbstractTransaction.role' +import * as v from 'valibot' const logger = getLogger( `${LOG4JS_BASE_CATEGORY}.migrations.db-v2.7.0_to_blockchain-v3.6.blockchain`, ) export const defaultHieroAccount = new HieroAccountId(0, 0, 2) +let transactionAddedToBlockchainSum = 0 function addToBlockchain( builder: GradidoTransactionBuilder, @@ -36,7 +42,7 @@ function addToBlockchain( const transactionId = new HieroTransactionId(createdAtTimestamp, defaultHieroAccount) const interactionSerialize = new InteractionSerialize(transactionId) - try { + try { const result = blockchain.createAndAddConfirmedTransaction( transaction, interactionSerialize.run(), @@ -67,115 +73,77 @@ export async function addCommunityRootTransaction( } } -export async function addRegisterAddressTransaction( - blockchain: InMemoryBlockchain, - transaction: Transaction, -): Promise { - const registerAddressRole = new RegisterAddressTransactionRole(transaction) - if ( - addToBlockchain( - await registerAddressRole.getGradidoTransactionBuilder(), - blockchain, - new Timestamp(transaction.createdAt), - ) - ) { - logger.debug( - `Register Address Transaction added for user ${transaction.user.account!.userUuid}`, - ) - } else { - throw new Error( - `Register Address Transaction not added for user ${transaction.user.account!.userUuid}`, - ) - } -} - export async function addTransaction( senderBlockchain: InMemoryBlockchain, _recipientBlockchain: InMemoryBlockchain, transaction: Transaction, ): Promise { + + let debugTmpStr = '' + const createdAtTimestamp = new Timestamp(transaction.createdAt) + let role: AbstractTransactionRole if (transaction.type === InputTransactionType.GRADIDO_CREATION) { - const creationTransactionRole = new CreationTransactionRole(transaction) - if ( - addToBlockchain( - await creationTransactionRole.getGradidoTransactionBuilder(), - senderBlockchain, - createdAtTimestamp, - ) - ) { - logger.debug(`Creation Transaction added for user ${transaction.user.account!.userUuid}`) - } else { - throw new Error( - `Creation Transaction not added for user ${transaction.user.account!.userUuid}`, - ) - } + role = new CreationTransactionRole(transaction) } else if (transaction.type === InputTransactionType.GRADIDO_TRANSFER) { - const transferTransactionRole = new TransferTransactionRole(transaction) - // will crash with cross group transaction - if ( - addToBlockchain( - await transferTransactionRole.getGradidoTransactionBuilder(), - senderBlockchain, - createdAtTimestamp, - ) - ) { - logger.debug(`Transfer Transaction added for user ${transaction.user.account!.userUuid}`) - } else { - throw new Error( - `Transfer Transaction not added for user ${transaction.user.account!.userUuid}`, - ) - } + role = new TransferTransactionRole(transaction) + } else if (transaction.type == InputTransactionType.REGISTER_ADDRESS) { + role = new RegisterAddressTransactionRole(transaction) } else if (transaction.type === InputTransactionType.GRADIDO_DEFERRED_TRANSFER) { - const transferTransactionRole = new DeferredTransferTransactionRole(transaction) - if ( - addToBlockchain( - await transferTransactionRole.getGradidoTransactionBuilder(), - senderBlockchain, - createdAtTimestamp, - ) - ) { - logger.debug( - `Deferred Transfer Transaction added for user ${transaction.user.account!.userUuid}`, - ) - } else { - throw new Error( - `Deferred Transfer Transaction not added for user ${transaction.user.account!.userUuid}`, - ) - } + role = new DeferredTransferTransactionRole(transaction) } else if (transaction.type === InputTransactionType.GRADIDO_REDEEM_DEFERRED_TRANSFER) { const seedKeyPairRole = new LinkedTransactionKeyPairRole( v.parse(identifierSeedSchema, transaction.user.seed), ) const f = new Filter() f.involvedPublicKey = seedKeyPairRole.generateKeyPair().getPublicKey() - const deferredTransaction = senderBlockchain.findOne(f) - if (!deferredTransaction) { + f.transactionType = TransactionType_DEFERRED_TRANSFER + const deferredTransactions = senderBlockchain.findAll(f) + if (!deferredTransactions) { throw new Error( `redeem deferred transfer: couldn't find parent deferred transfer on Gradido Node for ${JSON.stringify(transaction, null, 2)} and public key from seed: ${f.involvedPublicKey?.convertToHex()}`, ) } + if (deferredTransactions.size() != 1) { + logger.error( + `redeem deferred transfer: found ${deferredTransactions.size()} parent deferred transfer on Gradido Node for ${JSON.stringify(transaction, null, 2)} and public key from seed: ${f.involvedPublicKey?.convertToHex()}`, + ) + for(let i = 0; i < deferredTransactions.size(); i++) { + logger.error(`deferred transaction ${i}: ${deferredTransactions.get(i)?.getConfirmedTransaction()?.toJson(true)}`) + } + throw new Error( + `redeem deferred transfer: found ${deferredTransactions.size()} parent deferred transfer on Gradido Node for ${JSON.stringify(transaction, null, 2)} and public key from seed: ${f.involvedPublicKey?.convertToHex()}`, + ) + } + const deferredTransaction = deferredTransactions.get(0)! const confirmedDeferredTransaction = deferredTransaction.getConfirmedTransaction() if (!confirmedDeferredTransaction) { throw new Error('redeem deferred transfer: invalid TransactionEntry') } - const redeemTransactionRole = new RedeemDeferredTransferTransactionRole( + debugTmpStr += `\nconfirmed deferred transaction: ${confirmedDeferredTransaction.toJson(true)} with filter: ${f.toJson(true)}` + role = new RedeemDeferredTransferTransactionRole( transaction, confirmedDeferredTransaction, ) - const involvedUser = transaction.user.account - ? transaction.user.account.userUuid - : transaction.linkedUser?.account?.userUuid - if ( - addToBlockchain( - await redeemTransactionRole.getGradidoTransactionBuilder(), - senderBlockchain, - createdAtTimestamp, - ) - ) { - logger.debug(`Redeem Deferred Transfer Transaction added for user ${involvedUser}`) - } else { - throw new Error(`Redeem Deferred Transfer Transaction not added for user ${involvedUser}`) - } + } else { + throw new Error(`Transaction type ${transaction.type} not supported`) + } + const involvedUser = transaction.user.account + ? transaction.user.account.userUuid + : transaction.linkedUser?.account?.userUuid + if (addToBlockchain(await role.getGradidoTransactionBuilder(), senderBlockchain, createdAtTimestamp)) { + logger.debug(`${transaction.type} Transaction added for user ${involvedUser}`) + transactionAddedToBlockchainSum++ + } else { + logger.error(debugTmpStr) + /*const f = new Filter() + f.searchDirection = SearchDirection_DESC + f.pagination = new Pagination(15) + const transactions = senderBlockchain.findAll(f) + for(let i = transactions.size() - 1; i >= 0; i--) { + logger.error(`transaction ${i}: ${transactions.get(i)?.getConfirmedTransaction()?.toJson(true)}`) + }*/ + logger.error(`transaction: ${JSON.stringify(transaction, null, 2)}`) + throw new Error(`${transaction.type} Transaction not added for user ${involvedUser}, after ${transactionAddedToBlockchainSum} transactions`) } } diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/bootstrap.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/bootstrap.ts index 45eff5ffd..0f45d2197 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/bootstrap.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/bootstrap.ts @@ -4,13 +4,14 @@ import { HieroId, hieroIdSchema } from '../../schemas/typeGuard.schema' import { addCommunityRootTransaction } from './blockchain' import { Context } from './Context' import { communityDbToCommunity } from './convert' -import { loadCommunities } from './database' +import { loadCommunities, loadContributionLinkModeratorCache } from './database' import { generateKeyPairCommunity } from './keyPair' import { CommunityContext } from './valibot.schema' export async function bootstrap(): Promise { const context = await Context.create() context.communities = await bootstrapCommunities(context) + await loadContributionLinkModeratorCache(context.db) return context } diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/database.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/database.ts index 87cb3eb4a..bfdcfe70e 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/database.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/database.ts @@ -1,4 +1,4 @@ -import { asc, eq, inArray, isNotNull, sql } from 'drizzle-orm' +import { and, asc, eq, inArray, isNotNull, lt, sql } from 'drizzle-orm' import { alias } from 'drizzle-orm/mysql-core' import { MySql2Database } from 'drizzle-orm/mysql2' import { GradidoUnit } from 'gradido-blockchain-js' @@ -7,6 +7,8 @@ import * as v from 'valibot' import { LOG4JS_BASE_CATEGORY } from '../../config/const' import { communitiesTable, + contributionsTable, + eventsTable, transactionLinksTable, transactionsTable, usersTable, @@ -27,6 +29,24 @@ const logger = getLogger( `${LOG4JS_BASE_CATEGORY}.migrations.db-v2.7.0_to_blockchain-v3.6.blockchain`, ) +const contributionLinkModerators = new Map() + +export async function loadContributionLinkModeratorCache(db: MySql2Database): Promise { + const result = await db + .select({ + event: eventsTable, + user: usersTable, + }) + .from(eventsTable) + .leftJoin(usersTable, eq(eventsTable.actingUserId, usersTable.id)) + .where(eq(eventsTable.type, 'ADMIN_CONTRIBUTION_LINK_CREATE')) + .orderBy(asc(eventsTable.id)) + + result.map((row: any) => { + contributionLinkModerators.set(row.event.involvedContributionLinkId, v.parse(createdUserDbSchema, row.user)) + }) +} + // queries export async function loadCommunities(db: MySql2Database): Promise { const result = await db @@ -103,16 +123,32 @@ export async function loadTransactions( .limit(count) .offset(offset) - return result.map((row: any) => { + return await Promise.all(result.map(async (row: any) => { // console.log(row) try { + const user = v.parse(createdUserDbSchema, row.user) + let linkedUser: CreatedUserDb | null | undefined = null + if (!row.linkedUser) { + const contribution = await db + .select({contributionLinkId: contributionsTable.contributionLinkId}) + .from(contributionsTable) + .where(eq(contributionsTable.transactionId, row.transaction.id)) + .limit(1) + if (contribution && contribution.length > 0 && contribution[0].contributionLinkId) { + linkedUser = contributionLinkModerators.get(contribution[0].contributionLinkId) + } + } else { + linkedUser = v.parse(createdUserDbSchema, row.linkedUser) + } + if (!linkedUser) { + throw new Error(`linked user not found for transaction ${row.transaction.id}`) + } + // check for consistent data beforehand - const userCreatedAt = new Date(row.user.createdAt) - const linkedUserCreatedAd = new Date(row.linkedUser.createdAt) const balanceDate = new Date(row.transaction.balanceDate) if ( - userCreatedAt.getTime() > balanceDate.getTime() || - linkedUserCreatedAd.getTime() > balanceDate.getTime() + user.createdAt.getTime() > balanceDate.getTime() || + linkedUser?.createdAt.getTime() > balanceDate.getTime() ) { logger.error(`table row: `, row) throw new Error( @@ -127,8 +163,8 @@ export async function loadTransactions( return v.parse(transactionDbSchema, { ...row.transaction, transactionLinkCode: row.transactionLink ? row.transactionLink.code : null, - user: row.user, - linkedUser: row.linkedUser, + user, + linkedUser, }) } catch (e) { logger.error(`table row: ${JSON.stringify(row, null, 2)}`) @@ -137,7 +173,7 @@ export async function loadTransactions( } throw e } - }) + })) } export async function loadTransactionLinks( @@ -170,7 +206,10 @@ export async function loadDeletedTransactionLinks( .select() .from(transactionLinksTable) .leftJoin(usersTable, eq(transactionLinksTable.userId, usersTable.id)) - .where(isNotNull(transactionLinksTable.deletedAt)) + .where(and( + isNotNull(transactionLinksTable.deletedAt), + lt(transactionLinksTable.deletedAt, transactionLinksTable.validUntil) + )) .orderBy(asc(transactionLinksTable.deletedAt), asc(transactionLinksTable.id)) .limit(count) .offset(offset) diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/drizzle.schema.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/drizzle.schema.ts index e08231f4d..c984bb24e 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/drizzle.schema.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/drizzle.schema.ts @@ -1,5 +1,6 @@ import { sql } from 'drizzle-orm' import { + bigint, char, datetime, decimal, @@ -24,6 +25,22 @@ export const communitiesTable = mysqlTable( (table) => [unique('uuid_key').on(table.communityUuid)], ) +export const contributionsTable = mysqlTable("contributions", { + id: int().autoincrement().notNull(), + contributionLinkId: int("contribution_link_id").default(sql`NULL`), + confirmedBy: int("confirmed_by").default(sql`NULL`), + confirmedAt: datetime("confirmed_at", { mode: 'string'}).default(sql`NULL`), + deletedAt: datetime("deleted_at", { mode: 'string'}).default(sql`NULL`), + transactionId: int("transaction_id").default(sql`NULL`), +}) + +export const eventsTable = mysqlTable("events", { + id: int().autoincrement().notNull(), + type: varchar({ length: 100 }).notNull(), + actingUserId: int("acting_user_id").notNull(), + involvedContributionLinkId: int("involved_contribution_link_id").default(sql`NULL`), +}) + export const usersTable = mysqlTable( 'users', { diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts index 875c59eba..09da8abf8 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/index.ts @@ -4,7 +4,7 @@ import { exportAllCommunities } from './binaryExport' import { bootstrap } from './bootstrap' import { syncDbWithBlockchainContext } from './interaction/syncDbWithBlockchain/syncDbWithBlockchain.context' -const BATCH_SIZE = 100 +const BATCH_SIZE = 250 async function main() { // prepare in memory blockchains diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts index d6b40938f..4c1b9ca2c 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/UsersSync.role.ts @@ -1,4 +1,4 @@ -import { addRegisterAddressTransaction } from '../../blockchain' +import { addTransaction } from '../../blockchain' import { userDbToTransaction } from '../../convert' import { loadUsers } from '../../database' import { generateKeyPairUserAccount } from '../../keyPair' @@ -26,6 +26,6 @@ export class UsersSyncRole extends AbstractSyncRole { async pushToBlockchain(item: CreatedUserDb): Promise { const communityContext = this.context.getCommunityContextByUuid(item.communityUuid) const transaction = userDbToTransaction(item, communityContext.topicId) - return await addRegisterAddressTransaction(communityContext.blockchain, transaction) + return await addTransaction(communityContext.blockchain, communityContext.blockchain, transaction) } } diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts index e67cd70da..c62187181 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/interaction/syncDbWithBlockchain/syncDbWithBlockchain.context.ts @@ -6,21 +6,23 @@ import { TransactionsSyncRole } from './TransactionsSync.role' import { UsersSyncRole } from './UsersSync.role' export async function syncDbWithBlockchainContext(context: Context, batchSize: number) { - const timeUsed = new Profiler() + const timeUsedDB = new Profiler() + const timeUsedBlockchain = new Profiler() const containers = [ new UsersSyncRole(context), new TransactionsSyncRole(context), new DeletedTransactionLinksSyncRole(context), new TransactionLinksSyncRole(context), ] - + let transactionsCount = 0 + let transactionsCountSinceLastLog = 0 while (true) { - timeUsed.reset() + timeUsedDB.reset() const results = await Promise.all(containers.map((c) => c.ensureFilled(batchSize))) const loadedItemsCount = results.reduce((acc, c) => acc + c, 0) // log only, if at least one new item was loaded - if (loadedItemsCount && context.logger.isInfoEnabled()) { - context.logger.info(`${loadedItemsCount} new items loaded from db in ${timeUsed.string()}`) + if (loadedItemsCount && context.logger.isDebugEnabled()) { + context.logger.debug(`${loadedItemsCount} new items loaded from db in ${timeUsedDB.string()}`) } // remove empty containers @@ -34,5 +36,14 @@ export async function syncDbWithBlockchainContext(context: Context, batchSize: n available.sort((a, b) => a.getDate().getTime() - b.getDate().getTime()) } await available[0].toBlockchain() + process.stdout.write(`successfully added to blockchain: ${transactionsCount}\r`) + transactionsCount++ + transactionsCountSinceLastLog++ + if (transactionsCountSinceLastLog >= batchSize) { + context.logger.debug(`${transactionsCountSinceLastLog} transactions added to blockchain in ${timeUsedBlockchain.string()}`) + timeUsedBlockchain.reset() + transactionsCountSinceLastLog = 0 + } } + process.stdout.write(`\n`) } diff --git a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/valibot.schema.ts b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/valibot.schema.ts index 26fc985c7..1fc99ec3b 100644 --- a/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/valibot.schema.ts +++ b/dlt-connector/src/migrations/db-v2.7.0_to_blockchain-v3.5/valibot.schema.ts @@ -21,7 +21,7 @@ export const userDbSchema = v.object({ communityUuid: uuidv4Schema, }) -export const transactionDbSchema = v.object({ +export const transactionDbSchema = v.pipe(v.object({ typeId: v.enum(TransactionTypeId), amount: gradidoAmountSchema, balanceDate: dateSchema, @@ -30,7 +30,12 @@ export const transactionDbSchema = v.object({ user: userDbSchema, linkedUser: userDbSchema, transactionLinkCode: v.nullish(identifierSeedSchema), -}) +}), v.custom((value: any) => { + if (value.user && value.linkedUser && !value.transactionLinkCode && value.user.gradidoId === value.linkedUser.gradidoId) { + throw new Error(`expect user to be different from linkedUser: ${JSON.stringify(value, null, 2)}`) + } + return value +})) export const transactionLinkDbSchema = v.object({ user: userDbSchema,