correct path, add migration into dlt-connector/package.json

This commit is contained in:
Einhornimmond 2025-11-01 08:21:31 +01:00
parent 74bc448b96
commit bf39551416
11 changed files with 81 additions and 36 deletions

View File

@ -8,7 +8,7 @@
"pattern": "yyyy-MM-dd",
"layout":
{
"type": "pattern", "pattern": "%d{ISO8601} %p %c [%f : %l] - %m"
"type": "pattern", "pattern": "%d{ISO8601} %p %c - %m"
},
"compress": true,
"keepFileExt" : true,
@ -21,7 +21,7 @@
"pattern": "yyyy-MM-dd",
"layout":
{
"type": "pattern", "pattern": "%d{ISO8601} %p %c [topicId=%X{topicId}] [%f : %l] - %m"
"type": "pattern", "pattern": "%d{ISO8601} %p %c [topicId=%X{topicId}] - %m"
},
"compress": true,
"keepFileExt" : true,
@ -53,7 +53,7 @@
"type": "stdout",
"layout":
{
"type": "pattern", "pattern": "%d{ISO8601} %p %c [%f : %l] - %m"
"type": "pattern", "pattern": "%d{ISO8601} %p %c - %m"
}
}
},

View File

@ -10,6 +10,7 @@
"start": "bun run src/index.ts",
"build": "bun build src/index.ts --outdir=build --target=bun --external=gradido-blockchain-js --minify",
"dev": "bun run --watch src/index.ts",
"migrate": "cross-env MIMALLOC_SHOW_STATS=1 bun src/migrations/db-v2.7.0_to_blockchain-v3.5",
"test": "bun test",
"test:debug": "bun test --inspect-brk",
"typecheck": "tsc --noEmit",

View File

@ -39,7 +39,7 @@ export async function checkHomeCommunity(
// wait for backend server
await isPortOpenRetry(backend.url)
// ask backend for home community
let homeCommunity = await backend.getHomeCommunityDraft()
let homeCommunity = await backend.getHomeCommunityDraft()
// on missing topicId, create one
if (!homeCommunity.hieroTopicId) {
const topicId = await hiero.createTopic(homeCommunity.name)

View File

@ -1,13 +1,13 @@
import { SQL } from 'bun'
import { InMemoryBlockchain } from 'gradido-blockchain-js'
import { Logger } from 'log4js'
import { getLogger, Logger } from 'log4js'
import { HieroId, Uuidv4 } from '../../schemas/typeGuard.schema'
import { KeyPairCacheManager } from '../../cache/KeyPairCacheManager'
import { loadConfig } from '../../bootstrap/init'
import { CONFIG } from '../../config'
import { LOG4JS_BASE_CATEGORY } from '../../config/const'
export type CommunityContext = {
communityId: string
@ -29,8 +29,9 @@ export class Context {
}
static create(): Context {
loadConfig()
return new Context(
loadConfig(),
getLogger(`${LOG4JS_BASE_CATEGORY}.migrations.db-v2.7.0_to_blockchain-v3.5`),
new SQL({
adapter: 'mysql',
hostname: CONFIG.MYSQL_HOST,

View File

@ -56,7 +56,7 @@ export async function addCommunityRootTransaction(blockchain: InMemoryBlockchain
export async function addRegisterAddressTransaction(blockchain: InMemoryBlockchain, transaction: Transaction): Promise<void> {
const registerAddressRole = new RegisterAddressTransactionRole(transaction)
if(addToBlockchain(await registerAddressRole.getGradidoTransactionBuilder(), blockchain, new Timestamp(transaction.createdAt))) {
logger.info(`Register Address Transaction added for user ${transaction.user.account!.userUuid}`)
logger.debug(`Register Address Transaction added for user ${transaction.user.account!.userUuid}`)
} else {
throw new Error(`Register Address Transaction not added for user ${transaction.user.account!.userUuid}`)
}
@ -71,7 +71,7 @@ export async function addTransaction(
if (transaction.type === InputTransactionType.GRADIDO_CREATION) {
const creationTransactionRole = new CreationTransactionRole(transaction)
if(addToBlockchain(await creationTransactionRole.getGradidoTransactionBuilder(), senderBlockchain, createdAtTimestamp)) {
logger.info(`Creation Transaction added for user ${transaction.user.account!.userUuid}`)
logger.debug(`Creation Transaction added for user ${transaction.user.account!.userUuid}`)
} else {
throw new Error(`Creation Transaction not added for user ${transaction.user.account!.userUuid}`)
}
@ -79,7 +79,7 @@ export async function addTransaction(
const transferTransactionRole = new TransferTransactionRole(transaction)
// will crash with cross group transaction
if(addToBlockchain(await transferTransactionRole.getGradidoTransactionBuilder(), senderBlockchain, createdAtTimestamp)) {
logger.info(`Transfer Transaction added for user ${transaction.user.account!.userUuid}`)
logger.debug(`Transfer Transaction added for user ${transaction.user.account!.userUuid}`)
} else {
throw new Error(`Transfer Transaction not added for user ${transaction.user.account!.userUuid}`)
}

View File

@ -50,6 +50,7 @@ export function transactionDbToTransaction(
&& transactionDb.typeId !== TransactionTypeId.RECEIVE) {
throw new Error('not implemented')
}
const user = {
communityTopicId: communityTopicId,
account: { userUuid: transactionDb.user.gradidoId, accountNr: 0 },

View File

@ -3,6 +3,8 @@ import { amountSchema, memoSchema, uuidv4Schema, identifierSeedSchema, gradidoAm
import { dateSchema, booleanSchema } from '../../schemas/typeConverter.schema'
import * as v from 'valibot'
import { GradidoUnit } from 'gradido-blockchain-js'
import { LOG4JS_BASE_CATEGORY } from '../../config/const'
import { getLogger } from 'log4js'
export const createdUserDbSchema = v.object({
gradidoId: uuidv4Schema,
@ -59,6 +61,8 @@ export type CreatedUserDb = v.InferOutput<typeof createdUserDbSchema>
export type TransactionLinkDb = v.InferOutput<typeof transactionLinkDbSchema>
export type CommunityDb = v.InferOutput<typeof communityDbSchema>
const logger = getLogger(`${LOG4JS_BASE_CATEGORY}.migrations.db-v2.7.0_to_blockchain-v3.6.blockchain`)
// queries
export async function loadCommunities(db: SQL): Promise<CommunityDb[]> {
const result = await db`
@ -89,10 +93,6 @@ export async function loadUsers(db: SQL, offset: number, count: number): Promise
ORDER by created_at ASC
LIMIT ${offset}, ${count}
`
const totalCount = await db`
SELECT COUNT(*) as count FROM users
`
console.log(`Loaded ${result.length} users, total ${totalCount[0].count} with offset ${offset} and count ${count}`)
return result.map((row: any) => {
return v.parse(createdUserDbSchema, row)
})
@ -100,9 +100,9 @@ export async function loadUsers(db: SQL, offset: number, count: number): Promise
export async function loadTransactions(db: SQL, offset: number, count: number): Promise<TransactionDb[]> {
const result = await db`
SELECT t.type_id, t.amount, t.balance_date, t.memo, t.creation_date,
u.gradido_id AS user_gradido_id, u.community_uuid AS user_community_uuid,
lu.gradido_id AS linked_user_gradido_id, lu.community_uuid AS linked_user_community_uuid,
SELECT t.id, t.type_id, t.amount, t.balance_date, t.memo, t.creation_date,
u.gradido_id AS user_gradido_id, u.community_uuid AS user_community_uuid, u.created_at as user_created_at,
lu.gradido_id AS linked_user_gradido_id, lu.community_uuid AS linked_user_community_uuid, lu.created_at as linked_user_created_at,
tl.code as transaction_link_code
FROM transactions as t
LEFT JOIN users u ON t.user_id = u.id
@ -113,6 +113,17 @@ export async function loadTransactions(db: SQL, offset: number, count: number):
`
return result.map((row: any) => {
// console.log(row)
// check for consistent data beforehand
const userCreatedAt = new Date(row.user_created_at)
const linkedUserCreatedAd = new Date(row.linked_user_created_at)
const balanceDate = new Date(row.balance_date)
if (userCreatedAt.getTime() > balanceDate.getTime() ||
linkedUserCreatedAd.getTime() > balanceDate.getTime()
){
logger.error(`table row: `, row)
throw new Error('at least one user was created after transaction balance date, logic error!')
}
let amount = GradidoUnit.fromString(row.amount)
if (row.type_id === TransactionTypeId.SEND) {
amount = amount.mul(new GradidoUnit(-1))

View File

@ -4,7 +4,8 @@ import {
SearchDirection_ASC,
HieroTransactionId,
Timestamp,
InteractionSerialize
InteractionSerialize,
Profiler
} from 'gradido-blockchain-js'
import { Logger } from 'log4js'
@ -16,23 +17,20 @@ import { Orderable, OrderedContainer } from './OrderedContainer'
import { Context } from './Context'
import { bootstrap } from './bootstrap'
import { RegisterAddressTransactionRole } from '../../interactions/sendToHiero/RegisterAddressTransaction.role'
import { heapStats } from 'bun:jsc'
const publicKeyUserIdMap = new Map<string, string>()
async function main() {
const timeUsed = new Profiler()
// prepare in memory blockchains
const context = await bootstrap()
const startTime = Date.now()
await getNextUsers(context, 0, 110)
const endTime = Date.now()
console.log(`getNextUsers took ${endTime - startTime} ms`)
// synchronize to blockchain
const BATCH_SIZE = 10
const BATCH_SIZE = 100
const users = new OrderedContainer(
(context: Context, offset: number, count: number) => loadUsers(context.db, offset, count),
(context: Context, offset: number, count: number) => getNextUsers(context, offset, count),
(user: CreatedUserDb) => user.createdAt,
(context: Context, user: CreatedUserDb) => pushRegisterAddressTransaction(context, user)
)
@ -46,22 +44,40 @@ async function main() {
// const deletedTransactionLinks = new OrderedContainer(getNextDeletedTransactionLinks, (transactionLink) => transactionLink.balanceDate)
await synchronizeToBlockchain(context, [users, transactions], BATCH_SIZE)
// log blockchains
for(const community of context.communities.values()) {
context.logger.info(`Community '${community.communityId}', blockchain`)
logBlogchain(context.logger, community.blockchain)
}
context.logger.info(`${timeUsed.string()} for synchronizing to blockchain`)
timeUsed.reset()
context.communities.forEach((communityContext) => {
const f = new Filter()
// hotfix for bug in gradido_blockchain for Filter::ALL_TRANSACTIONS
f.pagination.size = 0
const transactions = communityContext.blockchain.findAll(f)
context.logger.info(`Community '${communityContext.communityId}', transactions: ${transactions.size()}`)
// logBlogchain(context.logger, communityContext.blockchain)
})
context.logger.info(`${timeUsed.string()} for logging blockchains`)
context.db.close()
const runtimeStats = heapStats()
/*
heapSize: 24254530,
heapCapacity: 32191922,
extraMemorySize: 7003858
*/
context.logger.info(
`Memory Statistics: heap size: ${bytesToMbyte(runtimeStats.heapSize)} MByte, heap capacity: ${bytesToMbyte(runtimeStats.heapCapacity)} MByte, extra memory: ${bytesToMbyte(runtimeStats.extraMemorySize)} MByte`
)
return Promise.resolve()
}
function bytesToMbyte(bytes: number): string {
return (bytes / 1024 / 1024).toFixed(4)
}
async function synchronizeToBlockchain(
context: Context,
containers: Orderable<Context>[],
batchSize: number
): Promise<void> {
let rounds = 20
let rounds = 200
while (rounds-- > 0) {
await Promise.all(containers.map(c => c.ensureFilled(context, batchSize)))
// console.log(`filled containers, rounds left: ${rounds}`)
@ -119,6 +135,7 @@ async function synchronizeToBlockchain(
/// load next max ${count} users and calculate key pair for calculating signatures later
async function getNextUsers(context: Context, offset: number, count: number): Promise<CreatedUserDb[]> {
const timeUsed = new Profiler()
const users = await loadUsers(context.db, offset, count)
for (const user of users) {
const communityContext = context.getCommunityContextByUuid(user.communityUuid)
@ -126,22 +143,36 @@ async function getNextUsers(context: Context, offset: number, count: number): Pr
publicKeyUserIdMap.set(userKeyPair.convertToHex(), user.gradidoId)
publicKeyUserIdMap.set(accountKeyPair.convertToHex(), user.gradidoId)
}
if(users.length !== 0) {
context.logger.info(`${timeUsed.string()} for loading ${users.length} users from db and calculate ed25519 KeyPairs for them`)
}
return users
}
// load next max ${count} transactions (contain also redeem transaction link transactions)
async function getNextTransactions(context: Context, offset: number, count: number): Promise<TransactionDb[]> {
return loadTransactions(context.db, offset, count)
const timeUsed = new Profiler()
const transactions = await loadTransactions(context.db, offset, count)
if(transactions.length !== 0) {
context.logger.info(`${timeUsed.string()} for loading ${transactions.length} transactions from db`)
}
return transactions
}
// load next max ${count} transaction links (freshly created links, in blockchain format this is a separate transaction)
async function getNextTransactionLinks(context: Context, offset: number, count: number): Promise<TransactionLinkDb[]> {
return loadTransactionLinks(context.db, offset, count)
const timeUsed = new Profiler()
const transactionLinks = await loadTransactionLinks(context.db, offset, count)
context.logger.info(`${timeUsed.string()} for loading ${transactionLinks.length} transaction links from db`)
return transactionLinks
}
// load next max ${count} deleted transaction links (in blockchain format this is a separate transaction)
async function getNextDeletedTransactionLinks(context: Context, offset: number, count: number): Promise<TransactionDb[]> {
return loadDeletedTransactionLinks(context.db, offset, count)
const timeUsed = new Profiler()
const deletedTransactionLinks = await loadDeletedTransactionLinks(context.db, offset, count)
context.logger.info(`${timeUsed.string()} for loading ${deletedTransactionLinks.length} deleted transaction links from db`)
return deletedTransactionLinks
}
// ---------------- put into in memory blockchain -----------------------------------------------

View File

@ -56,7 +56,7 @@ export async function generateKeyPairUserAccount(
}
}).getKey()
const accountKeyPair = await cache.getKeyPair(accountKeyPairKey, () => Promise.resolve(accountKeyPairRole.generateKeyPair()))
logger.info(`Key Pairs for user and account added, user: ${userKeyPairKey}, account: ${accountKeyPairKey}`)
//logger.info(`Key Pairs for user and account added, user: ${userKeyPairKey}, account: ${accountKeyPairKey}`)
return {
userKeyPair: userKeyPair.getPublicKey()!,
accountKeyPair: accountKeyPair.getPublicKey()!