Skip to content

Commit

Permalink
fix(filecoin-api): parallel put to piece accept queue (#1560)
Browse files Browse the repository at this point in the history
Simply uses `p-map` to put items to the queue in parallel and speed up
execution time of this lambda (which is currently timing out).
  • Loading branch information
alanshaw authored Oct 8, 2024
1 parent 9e2b1d4 commit e7cbb6d
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Aggregator, Dealer } from '@web3-storage/filecoin-client'
import { Aggregate, Piece } from '@web3-storage/data-segment'
import { CBOR } from '@ucanto/core'
import map from 'p-map'

import {
getBufferedPieces,
Expand Down Expand Up @@ -225,24 +226,30 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
}

// TODO: Batch per a maximum to queue
for (const piece of pieces) {
const inclusionProof = aggregateBuilder.resolveProof(piece.link)
if (inclusionProof.error) {
return inclusionProof
}
const addMessage = await context.pieceAcceptQueue.add({
piece: piece.link,
aggregate: aggregateBuilder.link,
group: bufferStoreRes.ok.buffer.group,
inclusion: {
subtree: inclusionProof.ok[0],
index: inclusionProof.ok[1],
},
})
const results = await map(
pieces,
/** @returns {Promise<import('@ucanto/interface').Result<import('@ucanto/interface').Unit, RangeError|import('../types.js').QueueAddError>>} */
async piece => {
const inclusionProof = aggregateBuilder.resolveProof(piece.link)
if (inclusionProof.error) return inclusionProof

const addMessage = await context.pieceAcceptQueue.add({
piece: piece.link,
aggregate: aggregateBuilder.link,
group: bufferStoreRes.ok.buffer.group,
inclusion: {
subtree: inclusionProof.ok[0],
index: inclusionProof.ok[1],
},
})
if (addMessage.error) return addMessage

if (addMessage.error) {
return addMessage
}
return { ok: {} }
},
{ concurrency: 10 }
)
for (const r of results) {
if (r.error) return r
}

return {
Expand Down

0 comments on commit e7cbb6d

Please sign in to comment.