From 0600c1de8025d6540f452321a101a50ff126a681 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 7 Oct 2024 17:45:52 -0500 Subject: [PATCH 1/9] GH-819 Provide lambda for socket executor --- .../libfc/include/fc/network/listener.hpp | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/libraries/libfc/include/fc/network/listener.hpp b/libraries/libfc/include/fc/network/listener.hpp index a1f6287fa9..1a867fa8d4 100644 --- a/libraries/libfc/include/fc/network/listener.hpp +++ b/libraries/libfc/include/fc/network/listener.hpp @@ -58,30 +58,43 @@ struct listener_base { /// @note Users should use fc::create_listener() instead, this class is the implementation /// detail for fc::create_listener(). /// +/// @tparam Protocol either \c boost::asio::ip::tcp or \c boost::asio::local::stream_protocol +/// @tparam Executor The executor for the acceptor and acceptor timer +/// @tparam SocketExecutor Lambda that returns executor to be used for the newly accepted socket: +/// ExecutorType(const endpoint_type&) +/// @tparam CreateSession Callback to call after each accepted connection: void(Socket&& socket) +/// ///////////////////////////////////////////////////////////////////////////////////////////// -template -struct listener : listener_base, std::enable_shared_from_this> { +template +struct listener : listener_base, std::enable_shared_from_this> { private: typename Protocol::acceptor acceptor_; boost::asio::deadline_timer accept_error_timer_; boost::posix_time::time_duration accept_timeout_; logger& logger_; std::string extra_listening_log_info_; + SocketExecutor socket_executor_; CreateSession create_session_; public: using endpoint_type = typename Protocol::endpoint; listener(Executor& executor, logger& logger, boost::posix_time::time_duration accept_timeout, const std::string& local_address, const endpoint_type& endpoint, - const std::string& extra_listening_log_info, const CreateSession& create_session) + const std::string& extra_listening_log_info, + SocketExecutor socket_executor, CreateSession create_session) : listener_base(local_address), acceptor_(executor, endpoint), accept_error_timer_(executor), accept_timeout_(accept_timeout), logger_(logger), extra_listening_log_info_(extra_listening_log_info), - create_session_(create_session) {} + socket_executor_(std::move(socket_executor)), create_session_(std::move(create_session)) {} const auto& acceptor() const { return acceptor_; } void do_accept() { - acceptor_.async_accept([self = this->shared_from_this()](boost::system::error_code ec, auto&& peer_socket) { + boost::system::error_code ec; + auto ep = acceptor_.local_endpoint(ec); + if (ec) { + fc_wlog(logger_, "Unable to retrieve local_endpoint of acceptor, error: ${e}", ("e", ec.message())); + } + acceptor_.async_accept(socket_executor_(ep), [self = this->shared_from_this()](boost::system::error_code ec, auto&& peer_socket) { self->on_accept(ec, std::forward(peer_socket)); }); } @@ -152,16 +165,23 @@ struct listener : listener_base, std::enable_shared_from_this +template void create_listener(Executor& executor, logger& logger, boost::posix_time::time_duration accept_timeout, const std::string& address, const std::string& extra_listening_log_info, + const SocketExecutor& socket_executor, const CreateSession& create_session) { using tcp = boost::asio::ip::tcp; if constexpr (std::is_same_v) { @@ -186,8 +206,8 @@ void create_listener(Executor& executor, logger& logger, boost::posix_time::time auto create_listener = [&](const auto& endpoint) { const auto& ip_addr = endpoint.address(); try { - auto listener = std::make_shared>( - executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, create_session); + auto listener = std::make_shared>( + executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor, create_session); listener->log_listening(endpoint, address); listener->do_accept(); ++listened; @@ -256,8 +276,8 @@ void create_listener(Executor& executor, logger& logger, boost::posix_time::time fs::remove(sock_path); } - auto listener = std::make_shared>( - executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, create_session); + auto listener = std::make_shared>( + executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor, create_session); listener->log_listening(endpoint, address); listener->do_accept(); } From 65f76dc23ebd8740269987d4c4806e65750a0851 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 7 Oct 2024 17:46:21 -0500 Subject: [PATCH 2/9] GH-819 Update to provide required lambda for socket executor --- plugins/http_plugin/http_plugin.cpp | 4 ++- plugins/net_plugin/net_plugin.cpp | 7 +++- .../state_history_plugin.cpp | 36 ++++++++++--------- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index 892950e445..48e0ebf8f6 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -233,7 +233,9 @@ namespace eosio { }; fc::create_listener(plugin_state->thread_pool.get_executor(), logger(), accept_timeout, address, - extra_listening_log_info, create_session); + extra_listening_log_info, + [this](const auto&) -> boost::asio::io_context& { return plugin_state->thread_pool.get_executor(); }, + create_session); } void create_beast_server(const std::string& address, api_category_set categories) { diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 0d3eeab6cc..afab2dcffd 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -4514,7 +4514,12 @@ namespace eosio { fc::create_listener( my->thread_pool.get_executor(), logger, accept_timeout, listen_addr, extra_listening_log_info, - [my = my, addr = p2p_addr, block_sync_rate_limit = block_sync_rate_limit](tcp::socket&& socket) { fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}", ("addr", addr)("limit", block_sync_rate_limit)); my->create_session(std::move(socket), addr, block_sync_rate_limit); }); + [my = my](const auto&) { return boost::asio::make_strand(my->thread_pool.get_executor()); }, + [my = my, addr = p2p_addr, block_sync_rate_limit = block_sync_rate_limit](tcp::socket&& socket) { + fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}", + ("addr", addr)("limit", block_sync_rate_limit)); + my->create_session(std::move(socket), addr, block_sync_rate_limit); + }); } catch (const plugin_config_exception& e) { fc_elog( logger, "${msg}", ("msg", e.top_message())); app().quit(); diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 8a65d7c0a8..360b8d02ed 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -101,25 +101,27 @@ struct state_history_plugin_impl { void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); // connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread - fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { - boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { - catch_and_log([this, &socket]() { - connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), - trace_log, chain_state_log, finality_data_log, - [this](const chain::block_num_type block_num) { - return get_block_id(block_num); - }, - [this](const chain::block_id_type& block_id) { - return chain_plug->chain().fetch_block_by_id(block_id); - }, - [this](session_base* conn) { - boost::asio::post(app().get_io_service(), [conn, this]() { - connections.erase(connections.find(conn)); - }); - }, _log)); + fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", + [this](const auto&) { return boost::asio::make_strand(thread_pool.get_executor()); }, + [this](Protocol::socket&& socket) { + boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { + catch_and_log([this, &socket]() { + connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), + trace_log, chain_state_log, finality_data_log, + [this](const chain::block_num_type block_num) { + return get_block_id(block_num); + }, + [this](const chain::block_id_type& block_id) { + return chain_plug->chain().fetch_block_by_id(block_id); + }, + [this](session_base* conn) { + boost::asio::post(app().get_io_service(), [conn, this]() { + connections.erase(connections.find(conn)); + }); + }, _log)); + }); }); }); - }); } void listen(){ From e677e41635689684828c8fc7a305a019ad36cb80 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 08:34:05 -0500 Subject: [PATCH 3/9] GH-819 Add better comments --- .../libfc/include/fc/network/listener.hpp | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/libraries/libfc/include/fc/network/listener.hpp b/libraries/libfc/include/fc/network/listener.hpp index 1a867fa8d4..db46fd6d33 100644 --- a/libraries/libfc/include/fc/network/listener.hpp +++ b/libraries/libfc/include/fc/network/listener.hpp @@ -60,20 +60,23 @@ struct listener_base { /// /// @tparam Protocol either \c boost::asio::ip::tcp or \c boost::asio::local::stream_protocol /// @tparam Executor The executor for the acceptor and acceptor timer -/// @tparam SocketExecutor Lambda that returns executor to be used for the newly accepted socket: -/// ExecutorType(const endpoint_type&) -/// @tparam CreateSession Callback to call after each accepted connection: void(Socket&& socket) +/// @tparam SocketExecutorFn Function that returns executor to be used for the newly accepted socket: +/// ExecutorType(const endpoint_type&) +/// @tparam CreateSession Callback Function to call after each accepted connection, provides newly created socket +/// and the Executor of the newly accepted socket: void(Socket&& socket, ExecutorType& ex) +/// Note that the ExecutorType provided by SocketExecutorFn can be retrieved from the socket +/// via socket.get_executor(). /// ///////////////////////////////////////////////////////////////////////////////////////////// -template -struct listener : listener_base, std::enable_shared_from_this> { +template +struct listener : listener_base, std::enable_shared_from_this> { private: typename Protocol::acceptor acceptor_; boost::asio::deadline_timer accept_error_timer_; boost::posix_time::time_duration accept_timeout_; logger& logger_; std::string extra_listening_log_info_; - SocketExecutor socket_executor_; + SocketExecutorFn socket_executor_fn_; CreateSession create_session_; public: @@ -81,10 +84,10 @@ struct listener : listener_base, std::enable_shared_from_this(local_address), acceptor_(executor, endpoint), accept_error_timer_(executor), accept_timeout_(accept_timeout), logger_(logger), extra_listening_log_info_(extra_listening_log_info), - socket_executor_(std::move(socket_executor)), create_session_(std::move(create_session)) {} + socket_executor_fn_(std::move(socket_executor_fn)), create_session_(std::move(create_session)) {} const auto& acceptor() const { return acceptor_; } @@ -94,7 +97,8 @@ struct listener : listener_base, std::enable_shared_from_thisshared_from_this()](boost::system::error_code ec, auto&& peer_socket) { + acceptor_.async_accept(socket_executor_fn_(ep), + [self = this->shared_from_this()](boost::system::error_code ec, auto&& peer_socket) { self->on_accept(ec, std::forward(peer_socket)); }); } @@ -165,7 +169,7 @@ struct listener : listener_base, std::enable_shared_from_this, std::enable_shared_from_this +template void create_listener(Executor& executor, logger& logger, boost::posix_time::time_duration accept_timeout, const std::string& address, const std::string& extra_listening_log_info, - const SocketExecutor& socket_executor, + const SocketExecutorFn& socket_executor_fn, const CreateSession& create_session) { using tcp = boost::asio::ip::tcp; if constexpr (std::is_same_v) { @@ -206,8 +213,8 @@ void create_listener(Executor& executor, logger& logger, boost::posix_time::time auto create_listener = [&](const auto& endpoint) { const auto& ip_addr = endpoint.address(); try { - auto listener = std::make_shared>( - executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor, create_session); + auto listener = std::make_shared>( + executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor_fn, create_session); listener->log_listening(endpoint, address); listener->do_accept(); ++listened; @@ -276,8 +283,8 @@ void create_listener(Executor& executor, logger& logger, boost::posix_time::time fs::remove(sock_path); } - auto listener = std::make_shared>( - executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor, create_session); + auto listener = std::make_shared>( + executor, logger, accept_timeout, address, endpoint, extra_listening_log_info, socket_executor_fn, create_session); listener->log_listening(endpoint, address); listener->do_accept(); } From b249840884c4bc48e8605453a5be0c36bd415022 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 08:54:34 -0500 Subject: [PATCH 4/9] GH-819 Use socket executor which was a strand provided to the socket --- .../include/eosio/state_history_plugin/session.hpp | 12 ++++++------ .../state_history_plugin/state_history_plugin.cpp | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 324c5db818..53f7ac8ed5 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -28,17 +28,17 @@ class session_base { virtual ~session_base() = default; }; -template +template requires std::is_same_v || std::is_same_v class session final : public session_base { using coro_throwing_stream = boost::asio::use_awaitable_t<>::as_default_on_t>; using coro_nonthrowing_steadytimer = boost::asio::as_tuple_t>::as_default_on_t; public: - session(SocketType&& s, Executor&& st, chain::controller& controller, - std::optional& trace_log, std::optional& chain_state_log, std::optional& finality_data_log, - GetBlockID&& get_block_id, GetBlock&& get_block, OnDone&& on_done, fc::logger& logger) : - strand(std::move(st)), stream(std::move(s)), wake_timer(strand), controller(controller), + session(SocketType&& s, chain::controller& controller, + std::optional& trace_log, std::optional& chain_state_log, std::optional& finality_data_log, + GetBlockID&& get_block_id, GetBlock&& get_block, OnDone&& on_done, fc::logger& logger) : + strand(s.get_executor()), stream(std::move(s)), wake_timer(strand), controller(controller), trace_log(trace_log), chain_state_log(chain_state_log), finality_data_log(finality_data_log), get_block_id(get_block_id), get_block(get_block), on_done(on_done), logger(logger), remote_endpoint_string(get_remote_endpoint_string()) { fc_ilog(logger, "incoming state history connection from ${a}", ("a", remote_endpoint_string)); @@ -307,7 +307,7 @@ class session final : public session_base { private: ///these items must only ever be touched by the session's strand - Executor strand; + SocketType::executor_type strand; coro_throwing_stream stream; coro_nonthrowing_steadytimer wake_timer; unsigned coros_running = 0; diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 360b8d02ed..a627937e39 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -106,7 +106,7 @@ struct state_history_plugin_impl { [this](Protocol::socket&& socket) { boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { catch_and_log([this, &socket]() { - connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), + connections.emplace(new session(std::move(socket), chain_plug->chain(), trace_log, chain_state_log, finality_data_log, [this](const chain::block_num_type block_num) { return get_block_id(block_num); From 7698f36b86fa3405c5266049954d1cdbc44a9d5e Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 08:55:14 -0500 Subject: [PATCH 5/9] GH-819 Use socket executor which was provided as a strand to the socket --- plugins/net_plugin/net_plugin.cpp | 40 +++++++++++++++---------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index afab2dcffd..24e0dab7e8 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -918,8 +918,8 @@ namespace eosio { std::atomic remote_endpoint_port{0}; public: - boost::asio::io_context::strand strand; - std::shared_ptr socket; // only accessed through strand after construction + boost::asio::strand strand; + std::shared_ptr socket; // only accessed through strand after construction fc::message_buffer<1024*1024> pending_message_buffer; std::size_t outstanding_read_bytes{0}; // accessed only from strand threads @@ -1268,8 +1268,8 @@ namespace eosio { connection::connection( const string& endpoint, const string& listen_address ) : peer_addr( endpoint ), - strand( my_impl->thread_pool.get_executor() ), - socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ), + strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ), + socket( new tcp::socket( strand ) ), listen_address( listen_address ), log_p2p_address( endpoint ), connection_id( ++my_impl->current_connection_id ), @@ -1286,7 +1286,7 @@ namespace eosio { connection::connection(tcp::socket&& s, const string& listen_address, size_t block_sync_rate_limit) : peer_addr(), block_sync_rate_limit(block_sync_rate_limit), - strand( my_impl->thread_pool.get_executor() ), + strand( s.get_executor() ), socket( new tcp::socket( std::move(s) ) ), listen_address( listen_address ), connection_id( ++my_impl->current_connection_id ), @@ -1445,7 +1445,7 @@ namespace eosio { void connection::close( bool reconnect, bool shutdown ) { set_state(connection_state::closing); - strand.post( [self = shared_from_this(), reconnect, shutdown]() { + boost::asio::post(strand, [self = shared_from_this(), reconnect, shutdown]() { self->_close( reconnect, shutdown ); }); } @@ -1583,7 +1583,7 @@ namespace eosio { void connection::send_handshake() { if (closed()) return; - strand.post( [c = shared_from_this()]() { + boost::asio::post(strand, [c = shared_from_this()]() { fc::unique_lock g_conn( c->conn_mtx ); if( c->populate_handshake( c->last_handshake_sent ) ) { static_assert( std::is_same_vsent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" ); @@ -1676,7 +1676,7 @@ namespace eosio { std::vector bufs; buffer_queue.fill_out_buffer( bufs ); - strand.post( [c{std::move(c)}, bufs{std::move(bufs)}]() { + boost::asio::post(strand, [c{std::move(c)}, bufs{std::move(bufs)}]() { boost::asio::async_write( *c->socket, bufs, boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) { try { @@ -2152,7 +2152,7 @@ namespace eosio { sync_source = new_sync_source; request_sent = true; sync_active_time = std::chrono::steady_clock::now(); - new_sync_source->strand.post( [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() { + boost::asio::post(new_sync_source->strand, [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() { peer_ilog( new_sync_source, "requesting range ${s} to ${e}, fhead ${h}, lib ${lib}", ("s", start)("e", end)("h", fork_head_num)("lib", lib) ); new_sync_source->request_sync_blocks( start, end ); } ); @@ -2736,7 +2736,7 @@ namespace eosio { send_buffer_type sb = buff_factory.get_send_buffer( b ); - cp->strand.post( [cp, bnum, sb{std::move(sb)}]() { + boost::asio::post(cp->strand, [cp, bnum, sb{std::move(sb)}]() { cp->latest_blk_time = std::chrono::steady_clock::now(); bool has_block = cp->peer_lib_num >= bnum; if( !has_block ) { @@ -2754,7 +2754,7 @@ namespace eosio { my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; if( cp->connection_id == exclude_peer ) return true; - cp->strand.post( [cp, msg]() { + boost::asio::post(cp->strand, [cp, msg]() { if (cp->protocol_version >= proto_savanna) { if (vote_logger.is_enabled(fc::log_level::debug)) peer_dlog(cp, "sending vote msg"); @@ -2787,7 +2787,7 @@ namespace eosio { send_buffer_type sb = buff_factory.get_send_buffer( trx ); fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) ); - cp->strand.post( [cp, sb{std::move(sb)}]() { + boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() { cp->enqueue_buffer( sb, no_reason ); } ); } ); @@ -2835,7 +2835,7 @@ namespace eosio { } } connection_ptr c = shared_from_this(); - strand.post([c]() { + boost::asio::post(strand, [c]() { my_impl->connections.connect(c); }); return true; @@ -2912,7 +2912,7 @@ namespace eosio { }); connection_ptr new_connection = std::make_shared(std::move(socket), listen_address, limit); - new_connection->strand.post([new_connection, this]() { + boost::asio::post(new_connection->strand, [new_connection, this]() { if (new_connection->start_session()) { connections.add(new_connection); } @@ -3798,7 +3798,7 @@ namespace eosio { // may have come in on a different connection and posted into dispatcher strand before this one if( my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe my_impl->dispatcher.add_peer_block( id, c->connection_id ); - c->strand.post( [c, id, ptr{std::move(ptr)}]() { + boost::asio::post(c->strand, [c, id, ptr{std::move(ptr)}]() { const fc::microseconds age(fc::time_point::now() - ptr->timestamp); my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), false, age ); }); @@ -3826,7 +3826,7 @@ namespace eosio { ("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16))); } if( exception ) { - c->strand.post( [c, id, blk_num=ptr->block_num(), close_mode]() { + boost::asio::post(c->strand, [c, id, blk_num=ptr->block_num(), close_mode]() { my_impl->sync_master->rejected_block( c, blk_num, close_mode ); my_impl->dispatcher.rejected_block( id ); }); @@ -3867,7 +3867,7 @@ namespace eosio { fc::microseconds age(fc::time_point::now() - block->timestamp); try { if( blk_num <= lib || cc.validated_block_exists(blk_id) ) { - c->strand.post( [sync_master = my_impl->sync_master.get(), + boost::asio::post(c->strand, [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c, blk_id, blk_num, latency = age]() { dispatcher.add_peer_block( blk_id, c->connection_id ); sync_master->sync_recv_block( c, blk_id, blk_num, true, latency ); @@ -3929,14 +3929,14 @@ namespace eosio { }); } }); - c->strand.post( [sync_master = my_impl->sync_master.get(), + boost::asio::post(c->strand, [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c, blk_id, blk_num, latency = age]() { dispatcher.recv_block( c, blk_id, blk_num ); sync_master->sync_recv_block( c, blk_id, blk_num, true, latency ); }); } else { - c->strand.post( [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c, + boost::asio::post(c->strand, [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c, block{std::move(block)}, blk_id, blk_num, reason]() mutable { if( reason == unlinkable || reason == no_reason ) { dispatcher.add_unlinkable_block( std::move(block), blk_id ); @@ -3981,7 +3981,7 @@ namespace eosio { auto current_time = std::chrono::steady_clock::now(); my->connections.for_each_connection( [current_time]( const connection_ptr& c ) { if( c->socket_is_open() ) { - c->strand.post([c, current_time]() { + boost::asio::post(c->strand, [c, current_time]() { c->check_heartbeat(current_time); } ); } From 6b5ab762d11e3b43f4ef63baf0e304af191dc721 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 10:14:29 -0500 Subject: [PATCH 6/9] GH-819 Simplify code by running listener on the main thread --- .../state_history_plugin.cpp | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index a627937e39..71c2f6c225 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -100,26 +100,24 @@ struct state_history_plugin_impl { template void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); - // connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread - fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", + // connections set must only be modified by main thread; run listener on main thread so callback is on main thread + fc::create_listener(app().get_io_service(), _log, accept_timeout, address, "", [this](const auto&) { return boost::asio::make_strand(thread_pool.get_executor()); }, [this](Protocol::socket&& socket) { - boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { - catch_and_log([this, &socket]() { - connections.emplace(new session(std::move(socket), chain_plug->chain(), - trace_log, chain_state_log, finality_data_log, - [this](const chain::block_num_type block_num) { - return get_block_id(block_num); - }, - [this](const chain::block_id_type& block_id) { - return chain_plug->chain().fetch_block_by_id(block_id); - }, - [this](session_base* conn) { - boost::asio::post(app().get_io_service(), [conn, this]() { - connections.erase(connections.find(conn)); - }); - }, _log)); - }); + catch_and_log([this, &socket]() { + connections.emplace(new session(std::move(socket), chain_plug->chain(), + trace_log, chain_state_log, finality_data_log, + [this](const chain::block_num_type block_num) { + return get_block_id(block_num); + }, + [this](const chain::block_id_type& block_id) { + return chain_plug->chain().fetch_block_by_id(block_id); + }, + [this](session_base* conn) { + boost::asio::post(app().get_io_service(), [conn, this]() { + connections.erase(connections.find(conn)); + }); + }, _log)); }); }); } From db2dcc42d53b09a6475607d9b4e0786ea4ead0ef Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 11:44:35 -0500 Subject: [PATCH 7/9] Revert "GH-819 Simplify code by running listener on the main thread" This reverts commit 6b5ab762d11e3b43f4ef63baf0e304af191dc721. --- .../state_history_plugin.cpp | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 71c2f6c225..a627937e39 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -100,24 +100,26 @@ struct state_history_plugin_impl { template void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); - // connections set must only be modified by main thread; run listener on main thread so callback is on main thread - fc::create_listener(app().get_io_service(), _log, accept_timeout, address, "", + // connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread + fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](const auto&) { return boost::asio::make_strand(thread_pool.get_executor()); }, [this](Protocol::socket&& socket) { - catch_and_log([this, &socket]() { - connections.emplace(new session(std::move(socket), chain_plug->chain(), - trace_log, chain_state_log, finality_data_log, - [this](const chain::block_num_type block_num) { - return get_block_id(block_num); - }, - [this](const chain::block_id_type& block_id) { - return chain_plug->chain().fetch_block_by_id(block_id); - }, - [this](session_base* conn) { - boost::asio::post(app().get_io_service(), [conn, this]() { - connections.erase(connections.find(conn)); - }); - }, _log)); + boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { + catch_and_log([this, &socket]() { + connections.emplace(new session(std::move(socket), chain_plug->chain(), + trace_log, chain_state_log, finality_data_log, + [this](const chain::block_num_type block_num) { + return get_block_id(block_num); + }, + [this](const chain::block_id_type& block_id) { + return chain_plug->chain().fetch_block_by_id(block_id); + }, + [this](session_base* conn) { + boost::asio::post(app().get_io_service(), [conn, this]() { + connections.erase(connections.find(conn)); + }); + }, _log)); + }); }); }); } From abab173b533578a3ea412639d3285865098eea54 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 11:47:29 -0500 Subject: [PATCH 8/9] GH-819 Update comments --- plugins/state_history_plugin/state_history_plugin.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index a627937e39..b504a6dc41 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -100,10 +100,11 @@ struct state_history_plugin_impl { template void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); - // connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread + // run listener on ship thread so that thread_pool.stop() will shutdown the listener since this captures `this` fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](const auto&) { return boost::asio::make_strand(thread_pool.get_executor()); }, [this](Protocol::socket&& socket) { + // connections set must only be modified by the main thread boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { catch_and_log([this, &socket]() { connections.emplace(new session(std::move(socket), chain_plug->chain(), From fea909b7f2a9f480e3ed1ff6c5d4385dfab79aac Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 8 Oct 2024 15:07:58 -0500 Subject: [PATCH 9/9] GH-819 Add requires --- libraries/libfc/include/fc/network/listener.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libraries/libfc/include/fc/network/listener.hpp b/libraries/libfc/include/fc/network/listener.hpp index db46fd6d33..e476d916e2 100644 --- a/libraries/libfc/include/fc/network/listener.hpp +++ b/libraries/libfc/include/fc/network/listener.hpp @@ -69,6 +69,8 @@ struct listener_base { /// ///////////////////////////////////////////////////////////////////////////////////////////// template +requires (std::is_same_v || std::is_same_v) && + std::invocable struct listener : listener_base, std::enable_shared_from_this> { private: typename Protocol::acceptor acceptor_; @@ -186,6 +188,8 @@ struct listener : listener_base, std::enable_shared_from_this +requires (std::is_same_v || std::is_same_v) && + std::invocable void create_listener(Executor& executor, logger& logger, boost::posix_time::time_duration accept_timeout, const std::string& address, const std::string& extra_listening_log_info, const SocketExecutorFn& socket_executor_fn,