Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add partial param v1 #25126

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions influxdb3/tests/server/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,12 @@ async fn api_v1_query_chunked() {
"values": [
[1, "a", 0.9],
[2, "a", 0.89],
]
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Comment on lines +1006 to +1011
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case, it worked correctly

}
]
}),
Expand Down Expand Up @@ -1044,10 +1046,12 @@ async fn api_v1_query_chunked() {
[1, "a", 0.9],
[2, "a", 0.89],
[3, "a", 0.85]
]
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case it did not work correctly, because there are no more results for the cpu measurement. I think in this case, the second partial: true should be there, i.e., for the statement, but not the first, i.e., for the measurement.

}
]
}),
Expand Down Expand Up @@ -1086,10 +1090,12 @@ async fn api_v1_query_chunked() {
"values": [
[1, "a", 0.9],
[2, "a", 0.89],
]
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Comment on lines +1092 to +1097
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is correct ✅

}
]
}),
Expand All @@ -1102,10 +1108,12 @@ async fn api_v1_query_chunked() {
"columns": ["time","host","usage"],
"values": [
[3, "a", 0.85]
]
],
"partial": true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not correct, as there are no more records for cpu.

}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Comment on lines +1113 to +1114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is correct, as there are more records for the statement.

}
]
}),
Expand All @@ -1119,10 +1127,12 @@ async fn api_v1_query_chunked() {
"values": [
[4, "a", 0.5],
[5, "a", 0.6],
]
],
"partial": true
}
],
"statement_id": 0
"statement_id": 0,
"partial": true
Comment on lines +1128 to +1133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these are incorrect, as there are no more records for the measurement or statement.

}
]
}),
Expand Down
31 changes: 26 additions & 5 deletions influxdb3_server/src/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use chrono::{format::SecondsFormat, DateTime};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{ready, stream::Fuse, Stream, StreamExt};
use hyper::http::HeaderValue;
use hyper::{header::ACCEPT, header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use hyper::{
header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response,
StatusCode,
};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
use observability_deps::tracing::info;
Expand All @@ -35,6 +38,7 @@ use crate::QueryExecutor;
use super::{Error, HttpApi, Result};

const DEFAULT_CHUNK_SIZE: usize = 10_000;
const TRANSFER_ENCODING_CHUNKED: &str = "chunked";

impl<W, Q, T> HttpApi<W, Q, T>
where
Expand Down Expand Up @@ -74,11 +78,17 @@ where
QueryResponseStream::new(0, stream, chunk_size, format, epoch).map_err(QueryError)?;
let body = Body::wrap_stream(stream);

Ok(Response::builder()
let mut builder = Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, format.as_content_type())
.body(body)
.unwrap())
.header(CONTENT_TYPE, format.as_content_type());

// Check if the response is chunked.
// If it is, add the "Transfer-Encoding: chunked" header to the response builder.
if chunked {
builder = builder.header(TRANSFER_ENCODING, TRANSFER_ENCODING_CHUNKED);
}

Ok(builder.body(body).unwrap())
}
}

Expand Down Expand Up @@ -303,6 +313,8 @@ impl From<QueryResponse> for Bytes {
struct StatementResponse {
statement_id: usize,
series: Vec<Series>,
#[serde(skip_serializing_if = "Option::is_none")]
partial: Option<bool>,
}

/// The records produced for a single time series (measurement)
Expand All @@ -311,6 +323,8 @@ struct Series {
name: String,
columns: Vec<String>,
values: Vec<Row>,
#[serde(skip_serializing_if = "Option::is_none")]
partial: Option<bool>,
}

/// A single row, or record in a time series
Expand Down Expand Up @@ -535,18 +549,23 @@ impl QueryResponseStream {
/// Flush a single chunk, or time series, when operating in chunked mode
fn flush_one(&mut self) -> QueryResponse {
let columns = self.columns();

let partial = self.buffer.can_flush().then_some(true);

// this unwrap is okay because we only ever call flush_one
// after calling can_flush on the buffer:
let (name, values) = self.buffer.flush_one().unwrap();
let series = vec![Series {
name,
columns,
values,
partial,
}];
QueryResponse {
results: vec![StatementResponse {
statement_id: self.statement_id,
series,
partial,
}],
format: self.format,
}
Expand All @@ -563,12 +582,14 @@ impl QueryResponseStream {
name,
columns: columns.clone(),
values,
partial: None,
})
.collect();
Ok(QueryResponse {
results: vec![StatementResponse {
statement_id: self.statement_id,
series,
partial: None,
}],
format: self.format,
})
Expand Down