Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add partial param v1 #25126

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

JeanArhancet
Copy link
Contributor

@JeanArhancet JeanArhancet commented Jul 7, 2024

Closes #25014

This pull request includes the following changes:

  • Adds the "Transfer-Encoding: chunked" header if the response is chunked.
  • Adds support for the partial parameter in the JSON response.

Check-list:

  • I've read the contributing section of the project README.
  • Signed CLA (if not already signed).

Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not quite right. I have tried out the OSS version to see the behaviour of the partial parameter, and here is what I get:

curl -G 'http://localhost:8086/query?db=mydb&chunked=true&chunk_size=2' --data-urlencode 'q=SELECT * FROM "mymeas"'

{"results":[{"statement_id":0,"series":[{"name":"mymeas","columns":["time","myfield","mytag"],"values":[["2016-05-19T18:37:55Z",90,"1"],["2016-05-19T18:37:56Z",90,"1"]],"partial":true}],"partial":true}]}
{"results":[{"statement_id":0,"series":[{"name":"mymeas","columns":["time","myfield","mytag"],"values":[["2016-05-19T18:37:57Z",90,"1"],["2016-05-19T18:37:58Z",90,"1"]]}]}]}

You can see "partial": true only appears on the first line. It indicates whether there are more data points for that series, or statement, respectively (but since we only will support single statements, I think we would only ever see it appear in both places, or neither).

It does not appear in the second line because there are no more data points after that line.

In some of your tests, it is working as intended, but in others, you can see that it is appearing on lines where it should not, i.e., lines after which there are no more data points for that series. This might be because you are using the can_flush function to determine the value. I believe that function just indicates if it is ready to flush data to the output stream, based on data in the buffer, the chunk size specified, etc.


This requirement may be difficult to satisfy, the closest I can think of is by using a subset of the check that can_flush does, by just checking if the ChunkBuffer's series only has a single entry: that would indicate that it is still working on the current series, but I am not sure that would produce the desired result in all cases.

@JeanArhancet
Copy link
Contributor Author

@hiltontj Thanks for your feedback.

Do you have an example in the tests where the behavior is not as expected? From what I see, the partial is set to true for the first line but not for the second.

Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeanArhancet - should have done this in my original review, but here are comments in line for each test, based on my understanding of how the partial response parameter should work.

Comment on lines +1006 to +1011
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case, it worked correctly

Comment on lines 1049 to 1054
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case it did not work correctly, because there are no more results for the cpu measurement. I think in this case, the second partial: true should be there, i.e., for the statement, but not the first, i.e., for the measurement.

Comment on lines +1093 to +1098
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is correct ✅

Comment on lines 1111 to 1112
],
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not correct, as there are no more records for cpu.

Comment on lines +1115 to +1116
"statement_id": 0,
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is correct, as there are more records for the statement.

Comment on lines +1130 to +1135
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these are incorrect, as there are no more records for the measurement or statement.

@JeanArhancet
Copy link
Contributor Author

@hiltontj I believe I've fixed the issue. Could you please review the changes and let me know your thoughts?

@hiltontj
Copy link
Contributor

@JeanArhancet - it looks like you've made it so the existing test exhibits the correct behaviour, but I am not convinced that this will work properly for all cases.

The reason for my concern is that in this PR, the partial parameter's value is always being determined by what is currently in the buffer, but should also be factoring in what has yet to be buffered. That is, what the input RecordBatchStream will yield on the next call to poll_next.

I wonder if using a peekable Stream would give a solution.

@JeanArhancet
Copy link
Contributor Author

@JeanArhancet - it looks like you've made it so the existing test exhibits the correct behaviour, but I am not convinced that this will work properly for all cases.

The reason for my concern is that in this PR, the partial parameter's value is always being determined by what is currently in the buffer, but should also be factoring in what has yet to be buffered. That is, what the input RecordBatchStream will yield on the next call to poll_next.

I wonder if using a peekable Stream would give a solution.

Thanks for the clarification. I'm not entirely sure I understand correctly. Are both partial_results and partial_series incorrect in their current logic? Should I convert the QueryResponseStream input from Fuse<SendableRecordBatchStream> to Peekable<Fuse<SendableRecordBatchStream>> to address this issue?

@hiltontj
Copy link
Contributor

Thanks for the clarification. I'm not entirely sure I understand correctly. Are both partial_results and partial_series incorrect in their current logic?

I think they are correct with respect to the data that has been buffered, but because they don't factor in what is still in the Stream, then in the situation where there is a query that produces a stream of many RecordBatches, they may not be correct.

The current tests are at the integration level and don't really give you a lot of control over the underlying RecordBatch stream. You may be better off adding one or more unit tests where you can structure the size and number of batches in the stream.

Should I convert the QueryResponseStream input from Fuse<SendableRecordBatchStream> to Peekable<Fuse<SendableRecordBatchStream>> to address this issue?

Yes, what I was hinting at would involve doing this. I think trying to set up a unit test where you can control the RecordBatches in the stream and better confirm that it is working as intended (or not) would be a more useful first step before trying to implement this though.

@hiltontj
Copy link
Contributor

hiltontj commented Jul 12, 2024

Re: the unit test suggestion, the iter function would be an easy way to manually set up a stream of RecordBatches from a Vec<RecordBatch>.

Edit: then you could adapt that into a SendableRecordBatchStream to create a QueryResponseStream for your tests.

@JeanArhancet
Copy link
Contributor Author

@hiltontj I've added a unit test in the latest commit. The test fails because the latest response contains the partial flag set to true, but it should be set to none since it's the last response.

Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeanArhancet - this is a great start, I left a comment in-line.

async fn test_partial_flag() {
let batch = create_test_record_batch();
let schema = batch.schema();
let input_stream = stream::iter(vec![Ok(batch.clone())]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your input stream should have more than one record batch, as that will test the behaviour of the partial parameter when there are batches remaining in the stream, yet to be seen with poll_next.

If you can augment your create_test_record_batch method to generate batches for different measurements and with different sizes, then you could come up with and test different kinds of input streams, e.g.,

┌──────────────┐  ┌──────────────┐  ┌──────────────┐ 
│ RecordBatch0 │  │ RecordBatch1 │  │ RecordBatch2 │
│ meas: 'cpu'  │  │ meas: 'cpu'  │  │ meas: 'mem'  │
│ rows: 2      │  │ rows: 3      │  │ rows: 2      │
└──────────────┘, └──────────────┘, └──────────────┘

With the chunk size of 2, this should give the following chunks:

┌──────────────────────┐  ┌──────────────────────┐  ┌──────────────────────┐  ┌──────────────────────┐
│ Chunk0               │  │ Chunk1               │  │ Chunk2               │  │ Chunk3               │
│ meas: 'cpu'          │  │ meas: 'cpu'          │  │ meas: 'cpu'          │  │ meas: 'mem'          │
│ rows: 2              │  │ rows: 2              │  │ rows: 1              │  │ rows: 2              │  
│ partial(meas): true  │  │ partial(meas): true  │  │ partial(meas): false │  │ partial(meas): false │
│ partial(stmt): true  │  │ partial(stmt): true  │  │ partial(stmt): true  │  │ partial(stmt): false │
└──────────────────────┘, └──────────────────────┘, └──────────────────────┘, └──────────────────────┘

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. I have updated the test to align with that. With this new test, we can see that my implementation is not correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that in order to make this work, the streaming/buffering logic needs to be refactored/rethought a bit. It would be great if you are willing to take a stab at it!

I updated the original issue description to make a note of this.

Copy link
Contributor Author

@JeanArhancet JeanArhancet Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hiltontj I tried to solve this issue ( see commit 25f05a6) but was not successful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perhaps peekable is not the way to go. I wonder if the streaming logic, i.e., in the impl Stream for QueryResponseStream just needs to be re-ordered.

Right now, it always checks if it can flush before polling the input stream, which means that it could flush the entire buffer before polling the input stream again, and which seems to be the cause of the problem we are up against here.

If, instead, we just polled the input before flushing, and then, when flushing, always leave at most one chunk in the buffer. That way, there is always enough data in the buffer, when streaming chunks, to decide what partial should be.

The exception would be, when the input stream is done, and there are no more record batches, then you would want to flush the entire buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hiltontj Thanks for your feedback and description, but I'm a bit lost concerning the correct implementation and what I need to refactor. I don't think I will find the right approach 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hiltontj I've just pushed another unit test to verify the existence of the partial flag when the stream size is 1

@JeanArhancet JeanArhancet force-pushed the feat/add-partial-param-v1 branch 2 times, most recently from c30d160 to ab971ce Compare August 1, 2024 07:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support partial parameter in the chunked v1 /query API response
2 participants