Skip to content

Commit

Permalink
Add DEDICATED_DECOMPRESSOR CompressOption
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Hultman authored and Alex Hultman committed Sep 28, 2021
1 parent 7a3bc47 commit 415fec3
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 14 deletions.
3 changes: 2 additions & 1 deletion examples/EchoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ int main() {
.passphrase = "1234"
}).ws<PerSocketData>("/*", {
/* Settings */
.compression = uWS::DEDICATED_COMPRESSOR_4KB,
.compression = uWS::CompressOptions(uWS::DEDICATED_COMPRESSOR_4KB | uWS::DEDICATED_DECOMPRESSOR),
.maxPayloadLength = 100 * 1024 * 1024,
.idleTimeout = 16,
.maxBackpressure = 100 * 1024 * 1024,
Expand All @@ -30,6 +30,7 @@ int main() {
.upgrade = nullptr,
.open = [](auto */*ws*/) {
/* Open event here, you may access ws->getUserData() which points to a PerSocketData struct */

},
.message = [](auto *ws, std::string_view message, uWS::OpCode opCode) {
ws->send(message, opCode, true);
Expand Down
2 changes: 1 addition & 1 deletion src/App.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ struct TemplatedApp {
unsigned int numSubscribers(std::string_view topic) {
Topic *t = topicTree->lookupTopic(topic);
if (t) {
return t->size();
return (unsigned int) t->size();
}

return 0;
Expand Down
5 changes: 4 additions & 1 deletion src/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,11 @@ struct HttpResponse : public AsyncSocket<SSL> {
CompressOptions compressOptions = CompressOptions::DISABLED;
if (secWebSocketExtensions.length() && webSocketContextData->compression != DISABLED) {

/* We always want shared inflation */
/* We always want shared inflation, (or the full 15) */
int wantedInflationWindow = 0;
if (webSocketContextData->compression & DEDICATED_DECOMPRESSOR) {
wantedInflationWindow = 15;
}

/* Map from selected compressor */
int wantedCompressionWindow = (webSocketContextData->compression & 0xFF00) >> 8;
Expand Down
20 changes: 13 additions & 7 deletions src/PerMessageDeflate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@

/* We always define these options no matter if ZLIB is enabled or not */
namespace uWS {
/* Compressor mode is HIGH8(windowBits), LOW8(memLevel) */
/* Compressor mode is 16 low bit where HIGH8(windowBits), LOW8(memLevel) */
enum CompressOptions : uint32_t {
DISABLED = 0,
SHARED_COMPRESSOR = 1,
/* Highest bit is shared compressor */
SHARED_COMPRESSOR = (uint32_t)1 << (uint32_t)31,
/* Second highest bit is DEDICATED_DECOMPRESSOR */
DEDICATED_DECOMPRESSOR = (uint32_t)1 << (uint32_t)30,
/* Lowest 16 bit describe compressor */
DEDICATED_COMPRESSOR_3KB = 9 << 8 | 1,
DEDICATED_COMPRESSOR_4KB = 9 << 8 | 2,
DEDICATED_COMPRESSOR_8KB = 10 << 8 | 3,
Expand Down Expand Up @@ -59,15 +63,15 @@ namespace uWS {
#if defined(UWS_NO_ZLIB) || defined(UWS_MOCK_ZLIB)
struct ZlibContext {};
struct InflationStream {
std::optional<std::string_view> inflate(ZlibContext *zlibContext, std::string_view compressed, size_t maxPayloadLength) {
std::optional<std::string_view> inflate(ZlibContext * /*zlibContext*/, std::string_view compressed, size_t maxPayloadLength, bool /*reset*/) {
return compressed.substr(0, std::min(maxPayloadLength, compressed.length()));
}
};
struct DeflationStream {
std::string_view deflate(ZlibContext *zlibContext, std::string_view raw, bool reset) {
std::string_view deflate(ZlibContext * /*zlibContext*/, std::string_view raw, bool /*reset*/) {
return raw;
}
DeflationStream(int compressOptions) {
DeflationStream(CompressOptions /*compressOptions*/) {
}
};
#else
Expand Down Expand Up @@ -201,7 +205,7 @@ struct InflationStream {
}

/* Zero length inflates are possible and valid */
std::optional<std::string_view> inflate(ZlibContext *zlibContext, std::string_view compressed, size_t maxPayloadLength) {
std::optional<std::string_view> inflate(ZlibContext *zlibContext, std::string_view compressed, size_t maxPayloadLength, bool reset) {

#ifdef UWS_USE_LIBDEFLATE
/* Try fast path first */
Expand Down Expand Up @@ -242,7 +246,9 @@ struct InflationStream {

} while (inflationStream.avail_out == 0 && zlibContext->dynamicInflationBuffer.length() <= maxPayloadLength);

inflateReset(&inflationStream);
if (reset) {
inflateReset(&inflationStream);
}

if ((err != Z_BUF_ERROR && err != Z_OK) || zlibContext->dynamicInflationBuffer.length() > maxPayloadLength) {
return std::nullopt;
Expand Down
18 changes: 16 additions & 2 deletions src/WebSocketContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ struct WebSocketContext {
webSocketData->compressionStatus = WebSocketData::CompressionStatus::ENABLED;

LoopData *loopData = (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) s)));
auto inflatedFrame = loopData->inflationStream->inflate(loopData->zlibContext, {data, length}, webSocketContextData->maxPayloadLength);
/* Decompress using shared or dedicated decompressor */
std::optional<std::string_view> inflatedFrame;
if (webSocketData->inflationStream) {
inflatedFrame = webSocketData->inflationStream->inflate(loopData->zlibContext, {data, length}, webSocketContextData->maxPayloadLength, false);
} else {
inflatedFrame = loopData->inflationStream->inflate(loopData->zlibContext, {data, length}, webSocketContextData->maxPayloadLength, true);
}

if (!inflatedFrame.has_value()) {
forceClose(webSocketState, s, ERR_TOO_BIG_MESSAGE_INFLATION);
return true;
Expand Down Expand Up @@ -124,7 +131,14 @@ struct WebSocketContext {
)
);

auto inflatedFrame = loopData->inflationStream->inflate(loopData->zlibContext, {webSocketData->fragmentBuffer.data(), webSocketData->fragmentBuffer.length() - 9}, webSocketContextData->maxPayloadLength);
/* Decompress using shared or dedicated decompressor */
std::optional<std::string_view> inflatedFrame;
if (webSocketData->inflationStream) {
inflatedFrame = webSocketData->inflationStream->inflate(loopData->zlibContext, {webSocketData->fragmentBuffer.data(), webSocketData->fragmentBuffer.length() - 9}, webSocketContextData->maxPayloadLength, false);
} else {
inflatedFrame = loopData->inflationStream->inflate(loopData->zlibContext, {webSocketData->fragmentBuffer.data(), webSocketData->fragmentBuffer.length() - 9}, webSocketContextData->maxPayloadLength, true);
}

if (!inflatedFrame.has_value()) {
forceClose(webSocketState, s, ERR_TOO_BIG_MESSAGE_INFLATION);
return true;
Expand Down
14 changes: 12 additions & 2 deletions src/WebSocketData.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,34 @@ struct WebSocketData : AsyncSocketData<false>, WebSocketState<true> {

/* We might have a dedicated compressor */
DeflationStream *deflationStream = nullptr;
/* And / or a dedicated decompressor */
InflationStream *inflationStream = nullptr;

/* We could be a subscriber */
Subscriber *subscriber = nullptr;
public:
WebSocketData(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) : AsyncSocketData<false>(std::move(backpressure)), WebSocketState<true>() {
compressionStatus = perMessageDeflate ? ENABLED : DISABLED;

/* Initialize the dedicated sliding window */
if (perMessageDeflate && (compressOptions != CompressOptions::SHARED_COMPRESSOR)) {
/* Initialize the dedicated sliding window(s) */
if (perMessageDeflate && (0 == (compressOptions & CompressOptions::SHARED_COMPRESSOR))) {
deflationStream = new DeflationStream(compressOptions);
}

if (perMessageDeflate && (compressOptions & CompressOptions::DEDICATED_DECOMPRESSOR)) {
inflationStream = new InflationStream();
}
}

~WebSocketData() {
if (deflationStream) {
delete deflationStream;
}

if (inflationStream) {
delete inflationStream;
}

if (subscriber) {
delete subscriber;
}
Expand Down

0 comments on commit 415fec3

Please sign in to comment.