Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid busy loop on insufficient bytes on connection #207

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub const DEFAULT_NAME: &str = "POSTGRESQL_DEFAULT_NAME";
#[derive(Debug, Clone, Copy, Default)]
pub enum PgWireConnectionState {
#[default]
AwaitingSslRequest,
AwaitingStartup,
AuthenticationInProgress,
ReadyForQuery,
Expand Down
13 changes: 11 additions & 2 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ pub mod terminate;
#[derive(Debug)]
pub enum PgWireFrontendMessage {
Startup(startup::Startup),
SslRequest(startup::SslRequest),
// when client has no ssl configured, it skip this message.
// our decoder will return a `SslRequest(None)` for this case.
SslRequest(Option<startup::SslRequest>),
PasswordMessageFamily(startup::PasswordMessageFamily),

Query(simplequery::Query),
Expand Down Expand Up @@ -111,7 +113,13 @@ impl PgWireFrontendMessage {
pub fn encode(&self, buf: &mut BytesMut) -> PgWireResult<()> {
match self {
Self::Startup(msg) => msg.encode(buf),
Self::SslRequest(msg) => msg.encode(buf),
Self::SslRequest(msg) => {
if let Some(msg) = msg {
msg.encode(buf)
} else {
Ok(())
}
}
Self::PasswordMessageFamily(msg) => msg.encode(buf),

Self::Query(msg) => msg.encode(buf),
Expand All @@ -135,6 +143,7 @@ impl PgWireFrontendMessage {
pub fn decode(buf: &mut BytesMut) -> PgWireResult<Option<Self>> {
if buf.remaining() > 1 {
let first_byte = buf[0];

match first_byte {
// Password, SASLInitialResponse, SASLResponse can only be
// decoded under certain context
Expand Down
86 changes: 40 additions & 46 deletions src/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::Error as IOError;
use std::sync::Arc;

use bytes::BytesMut;
use bytes::Buf;
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
Expand Down Expand Up @@ -34,17 +34,31 @@ impl<S> Decoder for PgWireMessageServerCodec<S> {

fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.client_info.state() {
PgWireConnectionState::AwaitingStartup => {
if let Some(request) = SslRequest::decode(src)? {
return Ok(Some(PgWireFrontendMessage::SslRequest(request)));
PgWireConnectionState::AwaitingSslRequest => {
if src.remaining() >= SslRequest::BODY_SIZE {
self.client_info
.set_state(PgWireConnectionState::AwaitingStartup);

if let Some(request) = SslRequest::decode(src)? {
return Ok(Some(PgWireFrontendMessage::SslRequest(Some(request))));
} else {
// this is not a real message, but to indicate that
// client will not init ssl handshake
return Ok(Some(PgWireFrontendMessage::SslRequest(None)));
}
}

Ok(None)
}

PgWireConnectionState::AwaitingStartup => {
if let Some(startup) = Startup::decode(src)? {
return Ok(Some(PgWireFrontendMessage::Startup(startup)));
Ok(Some(PgWireFrontendMessage::Startup(startup)))
} else {
Ok(None)
}

Ok(None)
}

_ => PgWireFrontendMessage::decode(src),
}
}
Expand Down Expand Up @@ -256,54 +270,34 @@ enum SslNegotiationType {
None,
}

async fn check_ssl_negotiation(tcp_socket: &TcpStream) -> Result<SslNegotiationType, IOError> {
let mut buf = [0u8; SslRequest::BODY_SIZE];
loop {
let n = tcp_socket.peek(&mut buf).await?;

// the tcp_stream has ended
if n == 0 {
return Ok(SslNegotiationType::None);
}

if n >= SslRequest::BODY_SIZE {
break;
}
}
if buf[0] == 0x16 {
return Ok(SslNegotiationType::Direct);
}
async fn check_ssl_direct_negotiation(tcp_socket: &TcpStream) -> Result<bool, IOError> {
let mut buf = [0u8; 1];
let n = tcp_socket.peek(&mut buf).await?;

let mut buf = BytesMut::from(buf.as_slice());
if let Ok(Some(_)) = SslRequest::decode(&mut buf) {
return Ok(SslNegotiationType::Postgres);
}
Ok(SslNegotiationType::None)
Ok(n > 0 && buf[0] == 0x16)
}

async fn peek_for_sslrequest<ST>(
socket: &mut Framed<TcpStream, PgWireMessageServerCodec<ST>>,
ssl_supported: bool,
) -> Result<SslNegotiationType, IOError> {
let mut negotiation_type = check_ssl_negotiation(socket.get_ref()).await?;
match negotiation_type {
SslNegotiationType::Postgres => {
// consume request
socket.next().await;

let response = if ssl_supported {
PgWireBackendMessage::SslResponse(SslResponse::Accept)
} else {
negotiation_type = SslNegotiationType::None;
PgWireBackendMessage::SslResponse(SslResponse::Refuse)
};
socket.send(response).await?;
if check_ssl_direct_negotiation(socket.get_ref()).await? {
Ok(SslNegotiationType::Direct)
} else if let Some(Ok(PgWireFrontendMessage::SslRequest(Some(_)))) = socket.next().await {
if ssl_supported {
socket
.send(PgWireBackendMessage::SslResponse(SslResponse::Accept))
.await?;
Ok(SslNegotiationType::Postgres)
} else {
socket
.send(PgWireBackendMessage::SslResponse(SslResponse::Refuse))
.await?;
Ok(SslNegotiationType::None)
}
SslNegotiationType::Direct => {}
SslNegotiationType::None => {}
} else {
Ok(SslNegotiationType::None)
}

Ok(negotiation_type)
}

async fn do_process_socket<S, A, Q, EQ, C>(
Expand Down
Loading