From e7cbb6dc7930b7b19335286bf1908d2ed3cb9437 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 8 Oct 2024 11:33:10 +0100 Subject: [PATCH] fix(filecoin-api): parallel put to piece accept queue (#1560) Simply uses `p-map` to put items to the queue in parallel and speed up execution time of this lambda (which is currently timing out). --- .../filecoin-api/src/aggregator/events.js | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index d29b25b8c..0726bc291 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -1,6 +1,7 @@ import { Aggregator, Dealer } from '@web3-storage/filecoin-client' import { Aggregate, Piece } from '@web3-storage/data-segment' import { CBOR } from '@ucanto/core' +import map from 'p-map' import { getBufferedPieces, @@ -225,24 +226,30 @@ export const handleAggregateInsertToPieceAcceptQueue = async ( } // TODO: Batch per a maximum to queue - for (const piece of pieces) { - const inclusionProof = aggregateBuilder.resolveProof(piece.link) - if (inclusionProof.error) { - return inclusionProof - } - const addMessage = await context.pieceAcceptQueue.add({ - piece: piece.link, - aggregate: aggregateBuilder.link, - group: bufferStoreRes.ok.buffer.group, - inclusion: { - subtree: inclusionProof.ok[0], - index: inclusionProof.ok[1], - }, - }) + const results = await map( + pieces, + /** @returns {Promise>} */ + async piece => { + const inclusionProof = aggregateBuilder.resolveProof(piece.link) + if (inclusionProof.error) return inclusionProof + + const addMessage = await context.pieceAcceptQueue.add({ + piece: piece.link, + aggregate: aggregateBuilder.link, + group: bufferStoreRes.ok.buffer.group, + inclusion: { + subtree: inclusionProof.ok[0], + index: inclusionProof.ok[1], + }, + }) + if (addMessage.error) return addMessage - if (addMessage.error) { - return addMessage - } + return { ok: {} } + }, + { concurrency: 10 } + ) + for (const r of results) { + if (r.error) return r } return {