Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket updates #44

Merged
merged 10 commits into from
Nov 10, 2020
12 changes: 6 additions & 6 deletions include/crow/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ namespace crow
///Set the server's log level

///
/// Possible values are:
/// crow::LogLevel::Debug (0)
/// crow::LogLevel::Info (1)
/// crow::LogLevel::Warning (2)
/// crow::LogLevel::Error (3)
/// crow::LogLevel::Critical (4)
/// Possible values are:<br>
/// crow::LogLevel::Debug (0)<br>
/// crow::LogLevel::Info (1)<br>
/// crow::LogLevel::Warning (2)<br>
/// crow::LogLevel::Error (3)<br>
/// crow::LogLevel::Critical (4)<br>
self_t& loglevel(crow::LogLevel level)
{
crow::logger::setLogLevel(level);
Expand Down
30 changes: 21 additions & 9 deletions include/crow/socket_adaptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,31 +108,43 @@ namespace crow

bool is_open()
{
return raw_socket().is_open();
return ssl_socket_ ? raw_socket().is_open() : false;
}

void close()
{
boost::system::error_code ec;
raw_socket().close(ec);
if (is_open())
{
boost::system::error_code ec;
raw_socket().close(ec);
}
}

void shutdown_readwrite()
{
boost::system::error_code ec;
raw_socket().shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
if (is_open())
{
boost::system::error_code ec;
raw_socket().shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
}
}

void shutdown_write()
{
boost::system::error_code ec;
raw_socket().shutdown(boost::asio::socket_base::shutdown_type::shutdown_send, ec);
if (is_open())
{
boost::system::error_code ec;
raw_socket().shutdown(boost::asio::socket_base::shutdown_type::shutdown_send, ec);
}
}

void shutdown_read()
{
boost::system::error_code ec;
raw_socket().shutdown(boost::asio::socket_base::shutdown_type::shutdown_receive, ec);
if (is_open())
{
boost::system::error_code ec;
raw_socket().shutdown(boost::asio::socket_base::shutdown_type::shutdown_receive, ec);
}
}

boost::asio::io_service& get_io_service()
Expand Down
152 changes: 123 additions & 29 deletions include/crow/websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ namespace crow
Payload,
};

///A base class for websocket connection.
struct connection
{
virtual void send_binary(const std::string& msg) = 0;
virtual void send_text(const std::string& msg) = 0;
virtual void send_ping(const std::string& msg) = 0;
virtual void send_pong(const std::string& msg) = 0;
virtual void close(const std::string& msg = "quit") = 0;
virtual ~connection(){}

Expand All @@ -32,10 +35,35 @@ namespace crow
void* userdata_;
};

// 0 1 2 3 -byte
// 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 -bit
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// +-------------------------------- - - - - - - - - - - - - - - - +
// : Payload Data continued ... :
// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
// | Payload Data continued ... |
// +---------------------------------------------------------------+

///A websocket connection.
template <typename Adaptor>
class Connection : public connection
{
public:
/// Constructor for a connection.

///
/// Requires a request with an "Upgrade: websocket" header.<br>
/// Automatically handles the handshake.
Connection(const crow::request& req, Adaptor&& adaptor,
std::function<void(crow::websocket::connection&)> open_handler,
std::function<void(crow::websocket::connection&, const std::string&, bool)> message_handler,
Expand Down Expand Up @@ -72,19 +100,40 @@ namespace crow
start(crow::utility::base64encode((char*)digest, 20));
}

/// Send data through the socket.
template<typename CompletionHandler>
void dispatch(CompletionHandler handler)
{
adaptor_.get_io_service().dispatch(handler);
}

/// Send data through the socket and return immediately.
template<typename CompletionHandler>
void post(CompletionHandler handler)
{
adaptor_.get_io_service().post(handler);
}

void send_pong(const std::string& msg)
/// Send a "Ping" message.

///
/// Usually invoked to check if the other point is still online.
void send_ping(const std::string& msg) override
{
dispatch([this, msg]{
char buf[3] = "\x89\x00";
buf[1] += msg.size();
The-EDev marked this conversation as resolved.
Show resolved Hide resolved
write_buffers_.emplace_back(buf, buf+2);
write_buffers_.emplace_back(msg);
do_write();
});
}

/// Send a "Pong" message.

///
/// Usually automatically invoked as a response to a "Ping" message.
void send_pong(const std::string& msg) override
{
dispatch([this, msg]{
char buf[3] = "\x8A\x00";
Expand All @@ -95,6 +144,7 @@ namespace crow
});
}

/// Send a binary encoded message.
void send_binary(const std::string& msg) override
{
dispatch([this, msg]{
Expand All @@ -105,6 +155,7 @@ namespace crow
});
}

/// Send a plaintext message.
void send_text(const std::string& msg) override
{
dispatch([this, msg]{
Expand All @@ -115,6 +166,10 @@ namespace crow
});
}

/// Send a close signal.

///
/// Sets a flag to destroy the object once the message is sent.
void close(const std::string& msg) override
{
dispatch([this, msg]{
Expand All @@ -134,6 +189,7 @@ namespace crow

protected:

/// Generate the websocket headers using an opcode and the message size (in bytes).
std::string build_header(int opcode, size_t size)
{
char buf[2+8] = "\x80\x00";
Expand All @@ -157,6 +213,10 @@ namespace crow
}
}

/// Send the HTTP upgrade response.

///
/// Finishes the handshake process, then starts reading messages from the socket.
void start(std::string&& hello)
{
static std::string header = "HTTP/1.1 101 Switching Protocols\r\n"
Expand All @@ -174,15 +234,23 @@ namespace crow
do_read();
}

/// Read a websocket message.

///
/// Involves:<br>
/// Handling headers (opcodes, size).<br>
/// Unmasking the payload.<br>
/// Reading the actual payload.<br>
void do_read()
{
is_reading = true;
switch(state_)
{
case WebSocketReadState::MiniHeader:
{
mini_header_ = 0;
//boost::asio::async_read(adaptor_.socket(), boost::asio::buffer(&mini_header_, 1),
adaptor_.socket().async_read_some(boost::asio::buffer(&mini_header_, 2),
adaptor_.socket().async_read_some(boost::asio::buffer(&mini_header_, 2),
[this](const boost::system::error_code& ec, std::size_t
#ifdef CROW_ENABLE_DEBUG
bytes_transferred
Expand All @@ -200,8 +268,11 @@ namespace crow
}
#endif

if (!ec && ((mini_header_ & 0x80) == 0x80))
if (!ec)
{
if ((mini_header_ & 0x80) == 0x80)
has_mask_ = true;

if ((mini_header_ & 0x7f) == 127)
{
state_ = WebSocketReadState::Len64;
Expand Down Expand Up @@ -300,34 +371,42 @@ namespace crow
}
break;
case WebSocketReadState::Mask:
boost::asio::async_read(adaptor_.socket(), boost::asio::buffer((char*)&mask_, 4),
[this](const boost::system::error_code& ec, std::size_t
#ifdef CROW_ENABLE_DEBUG
bytes_transferred
#endif
)
{
is_reading = false;
if (has_mask_)
{
boost::asio::async_read(adaptor_.socket(), boost::asio::buffer((char*)&mask_, 4),
[this](const boost::system::error_code& ec, std::size_t
#ifdef CROW_ENABLE_DEBUG
if (!ec && bytes_transferred != 4)
bytes_transferred
#endif
)
{
throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
}
is_reading = false;
#ifdef CROW_ENABLE_DEBUG
if (!ec && bytes_transferred != 4)
{
throw std::runtime_error("WebSocket:Mask:async_read fail:asio bug?");
}
#endif

if (!ec)
{
state_ = WebSocketReadState::Payload;
do_read();
}
else
{
close_connection_ = true;
if (error_handler_)
error_handler_(*this);
adaptor_.close();
}
});
if (!ec)
{
state_ = WebSocketReadState::Payload;
do_read();
}
else
{
close_connection_ = true;
if (error_handler_)
error_handler_(*this);
adaptor_.close();
}
});
}
else
{
state_ = WebSocketReadState::Payload;
do_read();
}
break;
case WebSocketReadState::Payload:
{
Expand Down Expand Up @@ -365,21 +444,30 @@ namespace crow
}
}

/// Check if the FIN bit is set.
bool is_FIN()
{
return mini_header_ & 0x8000;
}

/// Extract the opcode from the header.
int opcode()
{
return (mini_header_ & 0x0f00) >> 8;
}

/// Process the payload fragment.

///
/// Unmasks the fragment, checks the opcode, merges fragments into 1 message body, and calls the appropriate handler.
void handle_fragment()
{
for(decltype(fragment_.length()) i = 0; i < fragment_.length(); i ++)
if (has_mask_)
{
fragment_[i] ^= ((char*)&mask_)[i%4];
for(decltype(fragment_.length()) i = 0; i < fragment_.length(); i ++)
{
fragment_[i] ^= ((char*)&mask_)[i%4];
}
}
switch(opcode())
{
Expand Down Expand Up @@ -454,6 +542,10 @@ namespace crow
fragment_.clear();
}

/// Send the buffers' data through the socket.

///
/// Also destroyes the object if the Close flag is set.
void do_write()
{
if (sending_buffers_.empty())
Expand Down Expand Up @@ -485,6 +577,7 @@ namespace crow
}
}

/// Destroy the Connection.
void check_destroy()
{
//if (has_sent_close_ && has_recv_close_)
Expand All @@ -509,6 +602,7 @@ namespace crow
uint64_t remaining_length_{0};
bool close_connection_{false};
bool is_reading{false};
bool has_mask_{false};
uint32_t mask_;
uint16_t mini_header_;
bool has_sent_close_{false};
Expand Down
Loading