Skip to content

Commit

Permalink
feat: event-replay new_block events handling
Browse files Browse the repository at this point in the history
  • Loading branch information
csgui committed Jul 6, 2023
1 parent 5e63a44 commit 30eb324
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 3 deletions.
103 changes: 103 additions & 0 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,109 @@ export class PgWriteStore extends PgStore {
return result.count;
}

async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
const values: BlockInsertValues[] = blocks.map(block => ({
block_hash: block.block_hash,
index_block_hash: block.index_block_hash,
parent_index_block_hash: block.parent_index_block_hash,
parent_block_hash: block.parent_block_hash,
parent_microblock_hash: block.parent_microblock_hash,
parent_microblock_sequence: block.parent_microblock_sequence,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
burn_block_hash: block.burn_block_hash,
burn_block_height: block.burn_block_height,
miner_txid: block.miner_txid,
canonical: block.canonical,
execution_cost_read_count: block.execution_cost_read_count,
execution_cost_read_length: block.execution_cost_read_length,
execution_cost_runtime: block.execution_cost_runtime,
execution_cost_write_count: block.execution_cost_write_count,
execution_cost_write_length: block.execution_cost_write_length,
}));
await sql`
INSERT INTO blocks ${sql(values)}
`;
}

async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
canonical: mb.canonical,
microblock_canonical: mb.microblock_canonical,
microblock_hash: mb.microblock_hash,
microblock_sequence: mb.microblock_sequence,
microblock_parent_hash: mb.microblock_parent_hash,
parent_index_block_hash: mb.parent_index_block_hash,
block_height: mb.block_height,
parent_block_height: mb.parent_block_height,
parent_block_hash: mb.parent_block_hash,
index_block_hash: mb.index_block_hash,
block_hash: mb.block_hash,
parent_burn_block_height: mb.parent_burn_block_height,
parent_burn_block_hash: mb.parent_burn_block_hash,
parent_burn_block_time: mb.parent_burn_block_time,
}));
const mbResult = await sql`
INSERT INTO microblocks ${sql(values)}
`;
if (mbResult.count !== microblocks.length) {
throw new Error(
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
);
}
}

async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
const values: TxInsertValues[] = txs.map(tx => ({
tx_id: tx.tx_id,
raw_tx: tx.raw_result,
tx_index: tx.tx_index,
index_block_hash: tx.index_block_hash,
parent_index_block_hash: tx.parent_index_block_hash,
block_hash: tx.block_hash,
parent_block_hash: tx.parent_block_hash,
block_height: tx.block_height,
burn_block_time: tx.burn_block_time,
parent_burn_block_time: tx.parent_burn_block_time,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
canonical: tx.canonical,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
microblock_canonical: tx.microblock_canonical,
microblock_sequence: tx.microblock_sequence,
microblock_hash: tx.microblock_hash,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
raw_result: tx.raw_result,
event_count: tx.event_count,
execution_cost_read_count: tx.execution_cost_read_count,
execution_cost_read_length: tx.execution_cost_read_length,
execution_cost_runtime: tx.execution_cost_runtime,
execution_cost_write_count: tx.execution_cost_write_count,
execution_cost_write_length: tx.execution_cost_write_length,
}));
await sql`INSERT INTO txs ${sql(values)}`;
}

async updateBurnchainRewardSlotHolders({
burnchainBlockHash,
burnchainBlockHeight,
Expand Down
8 changes: 5 additions & 3 deletions src/event-replay/parquet-based/event-replay.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import * as tty from 'tty';

import { PgWriteStore } from '../../datastore/pg-write-store';
Expand All @@ -7,6 +6,7 @@ import { logger } from '../../logger';
import { createTimeTracker } from './helpers';
import { insertNewBurnBlockEvents } from './importers/new_burn_block_importer';
import { insertNewBlockEvents } from './importers/new_block_importer';
import { DatasetStore } from './dataset/store';

const MIGRATIONS_TABLE = 'pgmigrations';

Expand All @@ -20,6 +20,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>
isEventReplay: true,
});

const dataset = await DatasetStore.connect();

if (wipeDB) {
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
}
Expand Down Expand Up @@ -56,8 +58,8 @@ const run = async (wipeDB: boolean = false, disableIndexes: boolean = false) =>

try {
await Promise.all([
insertNewBurnBlockEvents(db, timeTracker),
// insertNewBlockEvents(db, timeTracker)
insertNewBurnBlockEvents(db, dataset, timeTracker),
insertNewBlockEvents(db, dataset, timeTracker)
]);
} catch (err) {
throw err;
Expand Down
144 changes: 144 additions & 0 deletions src/event-replay/parquet-based/importers/new_block_importer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { Readable, Writable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { PgWriteStore } from '../../../datastore/pg-write-store';
import { parseNewBlockMessage } from '../../../event-stream/event-server';
import { DbBlock, DbMicroblock, DbTx } from '../../../datastore/common';
import { logger } from '../../../logger';
import { TimeTracker } from '../helpers';
import { getApiConfiguredChainID, batchIterate } from '../../../helpers';
import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message';
import { DatasetStore } from '../dataset/store';

const batchInserters: BatchInserter[] = [];

const chainID = getApiConfiguredChainID();

interface BatchInserter<T = any> {
push(entries: T[]): Promise<void>;
flush(): Promise<void>;
}

function createBatchInserter<T>({
batchSize,
insertFn,
}: {
batchSize: number;
insertFn: (entries: T[]) => Promise<void>;
}): BatchInserter<T> {
let entryBuffer: T[] = [];
return {
async push(entries: T[]) {
entries.length === 1
? entryBuffer.push(entries[0])
: entries.forEach(e => entryBuffer.push(e));
if (entryBuffer.length === batchSize) {
await insertFn(entryBuffer);
entryBuffer.length = 0;
} else if (entryBuffer.length > batchSize) {
for (const batch of batchIterate(entryBuffer, batchSize)) {
await insertFn(batch);
}
entryBuffer.length = 0;
}
},
async flush() {
logger.info('Flushing remaining data...');
if (entryBuffer.length > 0) {
await insertFn(entryBuffer);
entryBuffer = [];
}
},
};
}

const populateBatchInserters = async (db: PgWriteStore) => {
const dbBlockBatchInserter = createBatchInserter<DbBlock>({
batchSize: 100,
insertFn: (entries) => {
logger.info('Inserting blocks...');
return db.insertBlockBatch(db.sql, entries);
},
});
batchInserters.push(dbBlockBatchInserter);

const dbMicroblockBatchInserter = createBatchInserter<DbMicroblock>({
batchSize: 200,
insertFn: (entries) => {
logger.info('Inserting microblocks...');
return db.insertMicroblock(db.sql, entries);
},
});
batchInserters.push(dbMicroblockBatchInserter);

const dbTxBatchInserter = createBatchInserter<DbTx>({
batchSize: 1000,
insertFn: (entries) => {
logger.info('Inserting txs...');
return db.insertTxBatch(db.sql, entries);
},
});
batchInserters.push(dbTxBatchInserter);

return new Writable({
objectMode: true,
write: async (data: CoreNodeBlockMessage, _encoding, next) => {

let dbData;
try {
dbData = parseNewBlockMessage(chainID, data);
} catch (err) {
logger.error('Error when parsing new_block event');
console.error(err);

throw err;
}

const insertTxs = async (dbData: any) => {
for (const entry of dbData.txs) {
await dbTxBatchInserter.push([entry.tx]);
}
};

await Promise.all([
// Insert blocks
dbBlockBatchInserter.push([dbData.block]),
// Insert microblocks
dbMicroblockBatchInserter.push(dbData.microblocks),
// Insert Txs
insertTxs(dbData)
]);

next();
}
});
}

const transformDataToJSON = async () => {
return new Transform({
objectMode: true,
transform: async (data, _encoding, callback) => {
callback(null, JSON.parse(data.payload));
}
});
};

export const insertNewBlockEvents = async (db: PgWriteStore, dataset: DatasetStore, timeTracker: TimeTracker) => {
logger.info(`Inserting NEW_BLOCK events to db...`);

await timeTracker.track('insertNewBlockEvents', async () => {
const payload = await dataset.newBlockEventsOrderedPayloadStream();
const toJSON = await transformDataToJSON();
const insertBatchData = await populateBatchInserters(db);

await pipeline(
Readable.from(payload),
toJSON,
insertBatchData
.on('finish', async () => {
for (const batchInserter of batchInserters) {
await batchInserter.flush();
}
})
)
});
};
105 changes: 105 additions & 0 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,3 +939,108 @@ export async function startEventServer(opts: {
});
return eventStreamServer;
}

export function parseNewBlockMessage(chainId: ChainID, msg: CoreNodeBlockMessage) {
const parsedTxs: CoreNodeParsedTxMessage[] = [];
const blockData: CoreNodeMsgBlockData = {
...msg,
};
msg.transactions.forEach(item => {
const parsedTx = parseMessageTransaction(chainId, item, blockData, msg.events);
if (parsedTx) {
parsedTxs.push(parsedTx);
}
});

// calculate total execution cost of the block
const totalCost = msg.transactions.reduce(
(prev, cur) => {
return {
execution_cost_read_count: prev.execution_cost_read_count + cur.execution_cost.read_count,
execution_cost_read_length:
prev.execution_cost_read_length + cur.execution_cost.read_length,
execution_cost_runtime: prev.execution_cost_runtime + cur.execution_cost.runtime,
execution_cost_write_count:
prev.execution_cost_write_count + cur.execution_cost.write_count,
execution_cost_write_length:
prev.execution_cost_write_length + cur.execution_cost.write_length,
};
},
{
execution_cost_read_count: 0,
execution_cost_read_length: 0,
execution_cost_runtime: 0,
execution_cost_write_count: 0,
execution_cost_write_length: 0,
}
);

const dbBlock: DbBlock = {
canonical: true,
block_hash: msg.block_hash,
index_block_hash: msg.index_block_hash,
parent_index_block_hash: msg.parent_index_block_hash,
parent_block_hash: msg.parent_block_hash,
parent_microblock_hash: msg.parent_microblock,
parent_microblock_sequence: msg.parent_microblock_sequence,
block_height: msg.block_height,
burn_block_time: msg.burn_block_time,
burn_block_hash: msg.burn_block_hash,
burn_block_height: msg.burn_block_height,
miner_txid: msg.miner_txid,
execution_cost_read_count: totalCost.execution_cost_read_count,
execution_cost_read_length: totalCost.execution_cost_read_length,
execution_cost_runtime: totalCost.execution_cost_runtime,
execution_cost_write_count: totalCost.execution_cost_write_count,
execution_cost_write_length: totalCost.execution_cost_write_length,
};

const dbMinerRewards: DbMinerReward[] = [];
for (const minerReward of msg.matured_miner_rewards) {
const dbMinerReward: DbMinerReward = {
canonical: true,
block_hash: minerReward.from_stacks_block_hash,
index_block_hash: msg.index_block_hash,
from_index_block_hash: minerReward.from_index_consensus_hash,
mature_block_height: msg.block_height,
recipient: minerReward.recipient,
miner_address: minerReward.miner_address ?? minerReward.recipient,
coinbase_amount: BigInt(minerReward.coinbase_amount),
tx_fees_anchored: BigInt(minerReward.tx_fees_anchored),
tx_fees_streamed_confirmed: BigInt(minerReward.tx_fees_streamed_confirmed),
tx_fees_streamed_produced: BigInt(minerReward.tx_fees_streamed_produced),
};
dbMinerRewards.push(dbMinerReward);
}

const dbMicroblocks = parseMicroblocksFromTxs({
parentIndexBlockHash: msg.parent_index_block_hash,
txs: msg.transactions,
parentBurnBlock: {
height: msg.parent_burn_block_height,
hash: msg.parent_burn_block_hash,
time: msg.parent_burn_block_timestamp,
},
}).map(mb => {
const microblock: DbMicroblock = {
...mb,
canonical: true,
microblock_canonical: true,
block_height: msg.block_height,
parent_block_height: msg.block_height - 1,
parent_block_hash: msg.parent_block_hash,
index_block_hash: msg.index_block_hash,
block_hash: msg.block_hash,
};
return microblock;
});

const dbData: DataStoreBlockUpdateData = {
block: dbBlock,
microblocks: dbMicroblocks,
minerRewards: dbMinerRewards,
txs: parseDataStoreTxEventData(parsedTxs, msg.events, msg, chainId),
};

return dbData;
}

0 comments on commit 30eb324

Please sign in to comment.