Skip to content

Commit

Permalink
MDEV-26851 Add interface to monitor connections in Galera
Browse files Browse the repository at this point in the history
  • Loading branch information
janlindstrom committed Oct 8, 2024
1 parent 8a5fc7c commit 2b47991
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 4 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Authors from Codership Oy:
* Daniele Sciascia <[email protected]>, Codership Oy
* Philip Stoev <[email protected]>, Codership Oy
* Mario Karuza <[email protected]>, Codership Oy
* Jan Lindström <[email protected]>, Codership Oy
[Codership employees, add name and email/username above this line, but leave this line intact]

Other contributors:
Expand Down
2 changes: 2 additions & 0 deletions galera/src/galera-sym.map
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
wsrep_init_config_service_v1;
wsrep_deinit_config_service_v1;
wsrep_node_isolation_mode_set_v1;
wsrep_init_connection_monitor_service_v1;
wsrep_deinit_connection_monitor_service_v1;

local: *;
};
13 changes: 12 additions & 1 deletion galera/src/wsrep_provider.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2010-2021 Codership Oy <[email protected]>
// Copyright (C) 2010-2024 Codership Oy <[email protected]>
//

#include "wsrep_api.h"
Expand All @@ -23,6 +23,7 @@
#include "gu_event_service.hpp"
#include "wsrep_config_service.h"
#include "wsrep_node_isolation.h"
#include "wsrep_connection_monitor_service.h"

#include <cassert>

Expand Down Expand Up @@ -1941,4 +1942,14 @@ wsrep_node_isolation_mode_set_v1(enum wsrep_node_isolation_mode mode)
return WSREP_NODE_ISOLATION_SUCCESS;
}

extern "C"
int wsrep_init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t *connection_monitor_service)
{
return gu::init_connection_monitor_service_v1(connection_monitor_service);
}

extern "C" void wsrep_deinit_connection_monitor_service_v1()
{
gu::deinit_connection_monitor_service_v1();
}

72 changes: 71 additions & 1 deletion galerautils/src/gu_asio.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2014-2020 Codership Oy <[email protected]>
// Copyright (C) 2014-2024 Codership Oy <[email protected]>
//

#include "gu_config.hpp"
Expand Down Expand Up @@ -53,6 +53,8 @@ static wsrep_tls_service_v1_t* gu_tls_service(0);

static wsrep_allowlist_service_v1_t* gu_allowlist_service(0);

static wsrep_connection_monitor_service_v1_t* gu_connection_monitor_service(0);

//
// AsioIpAddress wrapper
//
Expand Down Expand Up @@ -955,3 +957,71 @@ void gu::deinit_allowlist_service_v1()
std::atomic<enum wsrep_node_isolation_mode> gu::gu_asio_node_isolation_mode{
WSREP_NODE_ISOLATION_NOT_ISOLATED
};

//
// ConnectionMonitor
//

static std::mutex gu_connection_monitor_service_init_mutex;
static size_t gu_connection_monitor_service_usage;

int gu::init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t* connection_monitor)
{
log_info << "init_connection_monitor_service_v1";
std::lock_guard<std::mutex> lock(gu_connection_monitor_service_init_mutex);
++gu_connection_monitor_service_usage;
if (gu_connection_monitor_service)
{
assert(gu_connection_monitor_service == connection_monitor);
return 0;
}
gu_connection_monitor_service = connection_monitor;
return 0;
}

void gu::deinit_connection_monitor_service_v1()
{
log_info << "deinit_connection_monitor_service_v1";
std::lock_guard<std::mutex> lock(gu_connection_monitor_service_init_mutex);
assert(gu_connection_monitor_service_usage > 0);
--gu_connection_monitor_service_usage;
if (gu_connection_monitor_service_usage == 0)
gu_connection_monitor_service = 0;
}

void gu::connection_monitor_connect(wsrep_connection_key_t id,
const std::string& scheme,
const std::string& local_addr,
const std::string& remote_uuid,
const std::string& remote_addr)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

wsrep_buf_t const remote = {remote_uuid.c_str(), remote_uuid.length() };
wsrep_buf_t const lscheme = {scheme.c_str(), scheme.length() };
wsrep_buf_t const raddr = {remote_addr.c_str(), remote_addr.length() };
wsrep_buf_t const laddr = {local_addr.c_str(), local_addr.length() };

gu_connection_monitor_service->connection_monitor_connect_cb(
gu_connection_monitor_service->context,
id,
&lscheme,
&laddr,
&remote,
&raddr);
}

void gu::connection_monitor_disconnect(wsrep_connection_key_t id)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

gu_connection_monitor_service->connection_monitor_disconnect_cb(
gu_connection_monitor_service->context,
id);
}
12 changes: 12 additions & 0 deletions galerautils/src/gu_asio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "wsrep_tls_service.h"
#include "wsrep_allowlist_service.h"
#include "wsrep_node_isolation.h"
#include "wsrep_connection_monitor_service.h"

#include <netinet/tcp.h> // tcp_info

Expand Down Expand Up @@ -808,6 +809,17 @@ namespace gu
extern std::atomic<enum wsrep_node_isolation_mode>
gu_asio_node_isolation_mode;

/* Init/deinit global connection monitoring service hooks */
int init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t*);
void deinit_connection_monitor_service_v1();
/* Connection monitor connect callback */
void connection_monitor_connect(wsrep_connection_key_t id,
const std::string& scheme,
const std::string& local_addr,
const std::string& remote_uuid,
const std::string& remote_addr);
/* Connection monitor disconnect callback */
void connection_monitor_disconnect(wsrep_connection_key_t id);
}

#endif // GU_ASIO_HPP
18 changes: 18 additions & 0 deletions galerautils/src/gu_asio_stream_react.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ void gu::AsioStreamReact::close() try
{
GU_ASIO_DEBUG(debug_print() << "Socket not open on close");
}

gu::connection_monitor_disconnect((wsrep_connection_key_t)this);
socket_.close();
}
// Catch all the possible exceptions here, not only asio ones.
Expand Down Expand Up @@ -198,11 +200,20 @@ void gu::AsioStreamReact::connect(const gu::URI& uri) try
socket_.connect(resolve_result->endpoint());
connected_ = true;
prepare_engine(false);
assign_addresses();

auto result(engine_->client_handshake());
switch (result)
{
case AsioStreamEngine::success:
{
gu::connection_monitor_connect((wsrep_connection_key_t)this,
scheme_,
local_addr_,
"", // remote uuid
remote_addr_);
return;
}
case AsioStreamEngine::want_read:
case AsioStreamEngine::want_write:
case AsioStreamEngine::eof:
Expand Down Expand Up @@ -389,6 +400,13 @@ void gu::AsioStreamReact::connect_handler(
set_socket_options(socket_);
prepare_engine(true);
assign_addresses();

gu::connection_monitor_connect((wsrep_connection_key_t)this,
scheme_,
local_addr_,
"", // remote uuid
remote_addr_);

GU_ASIO_DEBUG(debug_print()
<< " AsioStreamReact::connect_handler: init handshake");
auto result(engine_->client_handshake());
Expand Down
2 changes: 1 addition & 1 deletion gcomm/src/asio_tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class gcomm::AsioTcpSocket :
virtual std::string local_addr() const GALERA_OVERRIDE;
virtual std::string remote_addr() const GALERA_OVERRIDE;
virtual State state() const GALERA_OVERRIDE { return state_; }
virtual SocketId id() const GALERA_OVERRIDE { return &socket_; }
virtual SocketId id() const GALERA_OVERRIDE { return socket_.get(); }
virtual SocketStats stats() const GALERA_OVERRIDE;
private:
// AsioSocketHandler interface
Expand Down
16 changes: 16 additions & 0 deletions gcomm/src/gmcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,14 @@ void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
segment_,
group_name_);

std::ostringstream os;
os << peer->remote_uuid();
gu::connection_monitor_connect((wsrep_connection_key_t)tp->id(),
get_scheme(pnet_, use_ssl_, dynamic_socket_),
peer->local_addr(),
os.str(),
remote_addr);

std::pair<ProtoMap::iterator, bool> ret =
proto_map_->insert(std::make_pair(tp->id(), peer));

Expand Down Expand Up @@ -676,6 +684,14 @@ void gcomm::GMCast::handle_established(Proto* est)
// UUID checks are handled during protocol handshake
assert(est->remote_uuid() != uuid());

std::ostringstream os;
os << est->remote_uuid();
gu::connection_monitor_connect((wsrep_connection_key_t)est->socket()->id(),
get_scheme(pnet_, use_ssl_, dynamic_socket_),
est->local_addr(),
os.str(),
est->remote_addr());

if (is_evicted(est->remote_uuid()))
{
log_warn << "Closing connection to evicted node " << est->remote_uuid();
Expand Down
1 change: 1 addition & 0 deletions gcomm/src/gmcast_proto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class gcomm::gmcast::Proto
SocketPtr socket() const { return tp_; }

const std::string& remote_addr() const { return remote_addr_; }
const std::string& local_addr() const { return local_addr_; }
const std::string& mcast_addr() const { return mcast_addr_; }
const LinkMap& link_map() const { return link_map_; }

Expand Down
2 changes: 1 addition & 1 deletion wsrep/src

0 comments on commit 2b47991

Please sign in to comment.