From 4ec489f3be1e5d59f7bb9d2543f1963d5ab63679 Mon Sep 17 00:00:00 2001 From: ashkuc Date: Wed, 6 Dec 2023 21:20:18 +0100 Subject: [PATCH] Added event for force tokens rescan - fixes --- .../src/subscribers/pg.events.listener.ts | 44 +++-------------- .../src/subscribers/pg.payload.parsers.ts | 18 +++++-- .../src/subscribers/subscribers.module.ts | 2 + .../crawler/src/subscribers/token.rescaner.ts | 47 +++++++++++++++++++ common/sdk/sdk.service.ts | 1 - 5 files changed, 69 insertions(+), 43 deletions(-) create mode 100644 apps/crawler/src/subscribers/token.rescaner.ts diff --git a/apps/crawler/src/subscribers/pg.events.listener.ts b/apps/crawler/src/subscribers/pg.events.listener.ts index a872b67..db6f491 100644 --- a/apps/crawler/src/subscribers/pg.events.listener.ts +++ b/apps/crawler/src/subscribers/pg.events.listener.ts @@ -6,9 +6,8 @@ import { parseBlockRangePayload, parseTokenRangePayload, } from './pg.payload.parsers'; -import { TokenService } from '../services/token/token.service'; -import { EventName } from '@common/constants'; -import { BlocksRepository } from '@unique-nft/harvester/src/database/repositories/private.repositories'; +import { BlockService } from '../services/block.service'; +import { TokenReScanner } from './token.rescaner'; enum PG_EVENTS_CHANNELS { FORCE_RESCAN_BLOCK = 'force_rescan_block', @@ -33,8 +32,8 @@ export class PgEventsListener implements OnApplicationBootstrap { constructor( private blocksSubscriberService: BlocksSubscriberService, - private tokenService: TokenService, - private blocksRepository: BlocksRepository, + private blockService: BlockService, + private tokenReScanner: TokenReScanner, ) { const config = { host: process.env.POSTGRES_HOST, @@ -99,39 +98,10 @@ export class PgEventsListener implements OnApplicationBootstrap { } private async handleRescanTokens(payload: string) { - const { collectionId, tokenIds } = parseTokenRangePayload(payload); + const { collectionId, tokenIds, blockNumber } = + parseTokenRangePayload(payload); - this.logger.log( - `Going to force rescan tokens for collection ${collectionId} and tokens: ${tokenIds.join( - ', ', - )}`, - ); - - const lastBlock = await this.blocksRepository.findOne({ - order: { id: 'DESC' }, - }); - - for (const tokenId of tokenIds) { - try { - this.logger.log(`Rescan for token ${collectionId}/${tokenId}`); - - await this.tokenService.update({ - blockNumber: lastBlock.id, - collectionId, - tokenId, - eventName: EventName.TOKEN_PROPERTY_SET, - blockTimestamp: lastBlock.timestamp.getTime(), - blockHash: lastBlock.hash, - data: [], - }); - - this.logger.log(`Rescan for token ${collectionId}/${tokenId} finished`); - } catch (e: any) { - this.logger.error( - `Rescan for token ${collectionId}/${tokenId} failed: ${e.message}`, - ); - } - } + await this.tokenReScanner.rescanTokens(collectionId, tokenIds, blockNumber); } private async startFastRescan(payload: string) { diff --git a/apps/crawler/src/subscribers/pg.payload.parsers.ts b/apps/crawler/src/subscribers/pg.payload.parsers.ts index e63b9d6..b9dd7a0 100644 --- a/apps/crawler/src/subscribers/pg.payload.parsers.ts +++ b/apps/crawler/src/subscribers/pg.payload.parsers.ts @@ -34,15 +34,16 @@ export const parseBlockRangePayload = ( }; /** - * Parse string like "1/1,2,3,4,5" to object with collectionId and tokenIds + * Parse string like "1/1,2,3,4,5@block_number" to object with collectionId, tokenIds and blockNumber * @param payload */ export const parseTokenRangePayload = (payload: string) => { - const [collectionIdString, tokenIdsString] = payload.split('/'); + const [collectionIdString, tokenIdsString, blockNumberString] = + payload.split(/[\/@]/); - if (!collectionIdString || !tokenIdsString) { + if (!collectionIdString || !tokenIdsString || !blockNumberString) { throw new Error( - `Invalid token range payload ${payload}, expected format: collectionId/tokenId,tokenId,tokenId`, + `Invalid token range payload ${payload}, expected format: collectionId/tokenId,tokenId@blockNumber`, ); } @@ -53,10 +54,17 @@ export const parseTokenRangePayload = (payload: string) => { ); } + const blockNumber = parseInt(blockNumberString, 10); + if (isNaN(blockNumber)) { + throw new Error( + `Invalid token range payload ${payload}, blockNumber is not a number`, + ); + } + const tokenIds = tokenIdsString .split(',') .map((n) => parseInt(n.trim(), 10)) .filter((n) => !isNaN(n)); - return { collectionId, tokenIds }; + return { collectionId, tokenIds, blockNumber }; }; diff --git a/apps/crawler/src/subscribers/subscribers.module.ts b/apps/crawler/src/subscribers/subscribers.module.ts index d7f6250..f9ea517 100644 --- a/apps/crawler/src/subscribers/subscribers.module.ts +++ b/apps/crawler/src/subscribers/subscribers.module.ts @@ -8,6 +8,7 @@ import { SubscribersService } from './subscribers.service'; import { BlocksRepository } from '@unique-nft/harvester/src/database/repositories/private.repositories'; import { HarvesterStoreService } from './processor/harvester-store.service'; import { PgEventsListener } from './pg.events.listener'; +import { TokenReScanner } from './token.rescaner'; @Module({ imports: [ConfigModule, ServicesModule], @@ -19,6 +20,7 @@ import { PgEventsListener } from './pg.events.listener'; HarvesterStoreService, BlocksRepository, PgEventsListener, + TokenReScanner, ], exports: [SubscribersService], }) diff --git a/apps/crawler/src/subscribers/token.rescaner.ts b/apps/crawler/src/subscribers/token.rescaner.ts new file mode 100644 index 0000000..0123727 --- /dev/null +++ b/apps/crawler/src/subscribers/token.rescaner.ts @@ -0,0 +1,47 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Reader } from '@unique-nft/harvester'; +import { EventName } from '@common/constants'; +import { TokenService } from '../services/token/token.service'; + +@Injectable() +export class TokenReScanner { + logger = new Logger(TokenReScanner.name); + + constructor(private reader: Reader, private tokenService: TokenService) {} + + async rescanTokens( + collectionId: number, + tokenIds: number[], + blockNumber: number, + ): Promise { + this.logger.log( + `Going to force rescan tokens for collection ${collectionId} and tokens: ${tokenIds.join( + ', ', + )}`, + ); + + const block = await this.reader.getBlock(blockNumber); + + for (const tokenId of tokenIds) { + try { + this.logger.log(`Rescan for token ${collectionId}/${tokenId}`); + + await this.tokenService.update({ + blockNumber: block.id, + collectionId, + tokenId, + eventName: EventName.TOKEN_PROPERTY_SET, + blockTimestamp: block.timestamp.getTime(), + blockHash: block.hash, + data: [], + }); + + this.logger.log(`Rescan for token ${collectionId}/${tokenId} finished`); + } catch (e: any) { + this.logger.error( + `Rescan for token ${collectionId}/${tokenId} failed: ${e.message}; ${e.stack}`, + ); + } + } + } +} diff --git a/common/sdk/sdk.service.ts b/common/sdk/sdk.service.ts index eaac384..5b27967 100644 --- a/common/sdk/sdk.service.ts +++ b/common/sdk/sdk.service.ts @@ -14,7 +14,6 @@ import { TokenBalanceRequest } from '@unique-nft/substrate-client/refungible'; import { ChainProperties } from '@unique-nft/substrate-client/types'; import { ITotalIssuance } from '@common/constants'; -import * as console from 'console'; export interface ISpecSystemVersion { spec_version: number;