-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory allocation failed - continouos data feed stream #116
Comments
At first glance, I don't think this is related to pg-copy-streams directly. The issue is here your You're also making a new stream every time you receive a message, in the code above instead of using a single stream and copying all the messages in. You might want to reuse a single stream instance instead of making a new instance on each message received. The main issue is you need a way to throttle these messages as you receive them, and then read them off and write them into postgres via pg-copy-streams or any other method at a rate your database can handle. Think of it like pouring water into a cup with a hole at the bottom...if you pour in more water than can flow out the bottom your cup will overflow. The cup in this situation is your application's heap. Typically this is handled with some kind of work queue or pub/sub system in which the consumer of the message can consume messages as they can process them. So you'd need some system to receive the messages from the fire hose, put them into a queue (something like SQS, google pub/sub, rabbitMQ, kafka if you feeling fancy) and then have consumers read from that queue. The queue functions as a cup in this scenario, but message queues are made to be very large cups that can hold a lot of incoming data and let consumers read from it as they can.
If your |
Thank you for answering! Extremely helpful. Yes I agree the crux of the issue what you so well described as the cup and the water - more data is being produced than can be consumed. I also do believe your guidance regarding the message queue is the direction I will take. Although this might be beyond the scope of pg-copy-stream you seem to understand this and maybe can give me a pointer to this concern: If I follow the diagram you describe where I have the firehouse -> queue -> worker. How can I ensure the worker only consumes as much as it can handle? In other words - if the worker subscribes to updates in the queue and updates occur at an extremely fast pace - how do I avoid running into the same problem? You have already helped me tremendously with those comments and I greatly appreciate it! Also cheers for the amazing work on node-pg and this package! |
If you only have one postgres server, your should measure the ingestion throughput that it can accept. If it is inferior to the maximum throughput your are receiving in your 24/7 window there is no way you will be able to keep pace. You also need to find a queuing solution for the moments when you will need to do some maintenance on your postgres server. Depending on the volume of messages we are talking about, redis streams could be a solution. some remarks on the code
The ideal I think would be to wait until the first COPY operation if totally finished before doing the switch of pending to flowing. This algo would I think be a lot more pg friendly. With the current code you might be creating a congestion in postgres with too many COPY operations on the same table at the same time.
Keep us posted ! |
@jeromew: I found out what happens if you always have a 🔥 If you do not give PostgreSQL some breathing room between COPY operations; you're going to run into 🔒 locking issues 🔒.I'm interested if anyone has any ideas for the best ways to handle this entirely. I'm going to dump some tips for anyone working on this in the future. Pro-tip: Safely create async batchesUnder high load, a new event may arrive before the previous event's call to const getBatch = async () => {
// If another batch is already pending; we return the promise that will resolve to that batch.
// You can .then() or await the same promise multiple times; you cannot resolve()/reject() multiple values
if (nextBatch) {
return nextBatch
}
try {
nextBatch = producer.createBatch({ maxSizeInBytes })
const batch = await nextBatch
return batch
} finally {
nextBatch = undefined
}
} Anti-pattern: Immediately unpipe/piping another stream into rotating
|
@jmealo thanks for the report. As I understand it, you have been trying to use this library to do a long-running, high-throuput ingestion job.
did I understand correctly where you are coming from ? can you give a bit more explanations on the concurrent jobs you are trying to execute during ingestion ? |
For info, during ingestion, the COPY operation takes ROW EXCLUSIVE lock, just like INSERT, UPDATE and DELETE. cf https://stackoverflow.com/questions/61643550/what-locks-does-postgresql-copy-method-require |
I am closing this issue in favor of #118 in case some people want to share their chunking / long-ingestion batching solutions |
Hi! Thanks for the work on this package.
I am using it to insert a continuous data feed that comes in the form of an array. Let me describe:
The table I want to insert has - for example - three columns (colA,colB,colC).
The data feed comes in the form of an array that may contain one of multiple rows - and the time at which this data feed comes is often continuously but not always. What I mean by that is I may receive one message like this:
[colA,colB,colC]
then immediately after one like this:
[colA,colB,colC,colA,colB,colC,colA,colB,colC,colA,colB,colC]
In order to insert messages such as the second one I have to split it into arrays of the three columns and insert it. Note that at high traffic times I may be receiving thousands of these messages each containing maybe hundreds of rows. This is how the code looks like:
This code is working fine for some feeds with not so many incoming messages. But one feed with thousands of messages per second I am getting the following error at which point the script crashes:
I feel like I am doing something clearly wrong - maybe by instantiating a new
client.query
and a newstream
every time a new message comes? I do that because every message that comes should be inserted - can't wait until the feed finishes because it never finishes - it runs 24/7.My main concern is to know if there is something I can do with the code to address this issue or whether I should look for another solution that addresses my use case (streaming thousands of continuous messages into db.)
Maybe this package is not intended for this type of continuous feed? Any help or guidance would be greatly appreciated.
Cheers.
The text was updated successfully, but these errors were encountered: