Skip to content

Commit

Permalink
fix grpc client and server caused crash && fix pro-node initialize (#30)
Browse files Browse the repository at this point in the history
* fix build_ppc script

* fix pro-node initialize

* fix grpc client and server caused crash

* update  gateway-router-table when the node become unreachable
  • Loading branch information
cyjseagull authored Sep 10, 2024
1 parent 7c679e7 commit a2b1126
Show file tree
Hide file tree
Showing 43 changed files with 394 additions and 132 deletions.
2 changes: 1 addition & 1 deletion cpp/ppc-framework/front/IFront.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class IFront : virtual public IFrontClient
virtual ppc::protocol::Message::Ptr peek(std::string const& topic) = 0;

virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) = 0;

/**
* @brief register the nodeInfo to the gateway
Expand Down
2 changes: 1 addition & 1 deletion cpp/ppc-framework/gateway/IGateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class IGateway

virtual void asyncGetPeers(std::function<void(bcos::Error::Ptr, std::string)> callback) = 0;
virtual void asyncGetAgencies(
std::function<void(bcos::Error::Ptr, std::vector<std::string>)> callback) = 0;
std::function<void(bcos::Error::Ptr, std::set<std::string>)> callback) = 0;

virtual bcos::Error::Ptr registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0;
virtual bcos::Error::Ptr unRegisterNodeInfo(bcos::bytesConstRef nodeID) = 0;
Expand Down
4 changes: 4 additions & 0 deletions cpp/ppc-framework/protocol/GrpcConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ class GrpcConfig

bool enableHealthCheck() const { return m_enableHealthCheck; }
void setEnableHealthCheck(bool enableHealthCheck) { m_enableHealthCheck = enableHealthCheck; }
void setEnableDnslookup(bool enableDnslookup) { m_enableDnslookup = enableDnslookup; }

bool enableDnslookup() const { return m_enableDnslookup; }

protected:
bool m_enableHealthCheck = false;
std::string m_loadBalancePolicy = "round_robin";
bool m_enableDnslookup = false;
};
} // namespace ppc::protocol
11 changes: 6 additions & 5 deletions cpp/tools/build_ppc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ default_version="v1.1.0"
compatibility_version=${default_version}
command="deploy"

disable_ra2018="false"
disable_ra2018="true"

LOG_WARN() {
local content=${1}
Expand Down Expand Up @@ -420,9 +420,9 @@ generate_node_config_ini() {
; the threadPoolSize
thread_count = 4
; the gatewayService endpoint information
service.gateway_target =
gateway_target =
; the components
service.components =
components =
nodeid=${nodeid}
[crypto]
Expand Down Expand Up @@ -524,8 +524,9 @@ generate_node_config_ini() {
; MB
max_log_file_size=200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
enable_rotate_by_hour=false
;format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
format=%Severity%|%TimeStamp%|%Message%
enable_rotate_by_hour=true
log_name_pattern=ppcs-psi4ef.log
; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log
rotate_name_pattern=log_%Y%m%d.%H%M.log
Expand Down
5 changes: 3 additions & 2 deletions cpp/tools/build_wedpr_cem.sh
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,9 @@ generate_config_ini() {
; MB
max_log_file_size=200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
enable_rotate_by_hour=false
;format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
format=%Severity%|%TimeStamp%|%Message%
enable_rotate_by_hour=true
log_name_pattern=ppcs-psi4ef.log
; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log
rotate_name_pattern=log_%Y%m%d.%H%M.log
Expand Down
4 changes: 2 additions & 2 deletions cpp/tools/ppc-builder/build_ppc.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# Note: here can't be refactored by autopep
import sys
sys.path.append("src/")
from controller import commandline_helper
from common import utilities
import traceback
import sys
sys.path.append("src/")


def main():
Expand Down
4 changes: 2 additions & 2 deletions cpp/tools/ppc-builder/conf/config-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ holding_msg_minutes = 30
# configuration for the ppc-node
[[agency.node]]
# disable the ra2018 psi or not, default enable ra2018
disable_ra2018 = false
disable_ra2018 = true
deploy_ip=["127.0.0.1:2"]
# node name, Notice: node_name in the same agency and group must be unique
node_name = "node0"
Expand Down Expand Up @@ -128,7 +128,7 @@ holding_msg_minutes = 30
# configuration for the ppc-node
[[agency.node]]
# disable the ra2018 psi or not, default enable ra2018
disable_ra2018 = false
disable_ra2018 = true
deploy_ip=["127.0.0.1:2"]
# node name, Notice: node_name in the same agency and group must be unique
node_name = "node0"
Expand Down
14 changes: 7 additions & 7 deletions cpp/tools/ppc-builder/src/config/ppc_node_config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __generate_single_node_inner_config__(self, tpl_config_path, node_path, priv
config_content, node_config.rpc_config, node_index)
# load the transport config
self.__generate_transport_config__(config_content,
node_config, node_id, ip)
node_config, node_id, ip, node_index)
# load the storage config
self.__generate_storage_config__(
config_content, node_config.storage_config)
Expand Down Expand Up @@ -192,7 +192,7 @@ def __generate_hdfs_storage_config__(self, config_content, hdfs_storage_config):
hdfs_storage_config.name_node_port)
config_content[section_name]["token"] = hdfs_storage_config.token

def __generate_transport_config__(self, config_content, node_config, node_id, deploy_ip):
def __generate_transport_config__(self, config_content, node_config, node_id, deploy_ip, node_index):
"""_summary_
Args:
Expand All @@ -203,18 +203,18 @@ def __generate_transport_config__(self, config_content, node_config, node_id, de
; the threadPoolSize
thread_count = 4
; the gatewayService endpoint information
service.gateway_target =
gateway_target =
; the components
service.components =
components =
nodeid=
"""
section = "transport"
config_content[section]["listen_ip"] = node_config.grpc_listen_ip
config_content[section]["listen_port"] = str(
node_config.grpc_listen_port)
node_config.grpc_listen_port + node_index)
config_content[section]["host_ip"] = deploy_ip
config_content[section]["service.gateway_target"] = node_config.gateway_config.gateway_grpc_target
config_content[section]["service.components"] = node_config.components
config_content[section]["gateway_target"] = node_config.gateway_config.gateway_grpc_target
config_content[section]["components"] = node_config.components
config_content[section]["nodeid"] = node_id

def __generate_ra2018psi_config__(self, config_content, ra2018psi_config):
Expand Down
5 changes: 3 additions & 2 deletions cpp/tools/ppc-builder/src/tpl/config.ini.gateway
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
; MB
max_log_file_size=200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
enable_rotate_by_hour=false
;format=%Severity%|ppcs-gateway|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
format=%Severity%|%TimeStamp%|%Message%
enable_rotate_by_hour=true
log_name_pattern=ppcs-gateway.log
; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log
rotate_name_pattern=log_%Y%m%d.%H%M.log
Expand Down
11 changes: 6 additions & 5 deletions cpp/tools/ppc-builder/src/tpl/config.ini.node
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
; the private key path for the psi-server
private_key_path = conf/node.pem
; disable the ra2018 or not, default enable ra2018
disable_ra2018 = false
disable_ra2018 = true
; the path that allows programs to access
; data_location = data
; task_timeout_minutes = 180
Expand Down Expand Up @@ -38,9 +38,9 @@
; the threadPoolSize
thread_count = 4
; the gatewayService endpoint information
service.gateway_target =
gateway_target =
; the components
service.components =
components =
nodeid=

[storage]
Expand Down Expand Up @@ -101,8 +101,9 @@
; MB
max_log_file_size=200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
enable_rotate_by_hour=false
;format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
format=%Severity%|%TimeStamp%|%Message%
enable_rotate_by_hour=true
log_name_pattern=ppcs-psi4ef.log
; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log
rotate_name_pattern=log_%Y%m%d.%H%M.log
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-computing/ppc-cem/tests/data/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
max_log_file_size=200
; LineID, TimeStamp, ProcessID, ThreadName, ThreadID and Message
#format=%Severity%|ppcs-psi4ef|system-id|%TimeStamp%|%ThreadName%-%ThreadID%|%Message%
#enable_rotate_by_hour=false
#enable_rotate_by_hour=true
#log_name_pattern=ppcs-psi4ef.log
; Y,m,d,H,M,S are supported, N is the sequence number log_%Y%m%d.%H%M%S.%N.log
#rotate_name_pattern=log_%Y%m%d.%H%M.log
Expand Down
14 changes: 9 additions & 5 deletions cpp/wedpr-helper/ppc-tools/src/config/PPCConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,27 @@ void PPCConfig::loadFrontConfig(bool requireTransport,
}
m_frontConfig->setNodeID(nodeID);
m_frontConfig->setThreadPoolSize(threadCount);

PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig and not require the transport")
<< printFrontDesc(m_frontConfig);
if (!requireTransport)
{
return;
}

loadEndpointConfig(m_frontConfig->mutableSelfEndPoint(), true, "transport", pt);
// the gateway targets
auto gatewayTargets = pt.get<std::string>("transport.service.gateway_target", "");
auto gatewayTargets = pt.get<std::string>("transport.gateway_target", "");
if (gatewayTargets.empty())
{
BOOST_THROW_EXCEPTION(InvalidConfig() << errinfo_comment(
"Must specify the transport.service.gateway_target!"));
BOOST_THROW_EXCEPTION(
InvalidConfig() << errinfo_comment("Must specify the transport.gateway_target!"));
}
m_frontConfig->setGatewayGrpcTarget(gatewayTargets);
// the components
auto components = pt.get<std::string>("transport.service.components", "");
auto components = pt.get<std::string>("transport.components", "");
boost::split(m_frontConfig->mutableComponents(), components, boost::is_any_of(","));

PPCConfig_LOG(INFO) << LOG_DESC("loadFrontConfig") << printFrontDesc(m_frontConfig);
}

void PPCConfig::setPrivateKey(bcos::bytes const& _privateKey)
Expand Down
2 changes: 1 addition & 1 deletion cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ inline std::string generateUUID()
return boost::uuids::to_string(uuid_gen());
}
template <typename T>
inline std::string printVector(std::vector<T> const& list)
inline std::string printVector(T const& list)
{
std::stringstream oss;
for (auto const& it : list)
Expand Down
8 changes: 8 additions & 0 deletions cpp/wedpr-initializer/Initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ void Initializer::registerRpcHandler(ppc::rpc::RpcInterface::Ptr const& _rpc)

void Initializer::start()
{
if (m_transport)
{
m_transport->start();
}
if (m_ppcFront)
{
m_ppcFront->start();
Expand Down Expand Up @@ -455,6 +459,10 @@ void Initializer::start()

void Initializer::stop()
{
if (m_transport)
{
m_transport->stop();
}
// stop the network firstly
if (m_ppcFront)
{
Expand Down
12 changes: 11 additions & 1 deletion cpp/wedpr-protocol/grpc/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@ inline grpc::ChannelArguments toChannelConfig(ppc::protocol::GrpcConfig::Ptr con
{
return args;
}
args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
// TODO: when enable round_robin load-balance policy, the program will be exited on dns resolver
// args.SetLoadBalancingPolicyName(grpcConfig->loadBalancePolicy());
if (grpcConfig->enableHealthCheck())
{
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"\"}}");
}
// disable dns lookup
if (!grpcConfig->enableDnslookup())
{
args.SetInt("grpc.enable_dns_srv_lookup", 0);
}
else
{
args.SetInt("grpc.enable_dns_srv_lookup", 1);
}
return args;
}
} // namespace ppc::protocol
1 change: 1 addition & 0 deletions cpp/wedpr-protocol/grpc/client/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@

#define GRPC_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GRPC][CLIENT]"
#define GATEWAY_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][CLIENT]"
#define FRONT_CLIENT_LOG(LEVEL) BCOS_LOG(LEVEL) << "[FRONT][CLIENT]"
#define HEALTH_LOG(LEVEL) BCOS_LOG(LEVEL) << "[HEALTH]"
7 changes: 4 additions & 3 deletions cpp/wedpr-protocol/grpc/client/FrontClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @date 2024-09-02
*/
#include "FrontClient.h"
#include "Common.h"
#include "protobuf/src/RequestConverter.h"
#include "wedpr-protocol/protobuf/src/Common.h"

Expand All @@ -33,8 +34,8 @@ void FrontClient::onReceiveMessage(ppc::protocol::Message::Ptr const& msg, Recei
msg->encode(encodedData);
receivedMsg.set_data(encodedData.data(), encodedData.size());

ClientContext context;
auto context = std::make_shared<ClientContext>();
auto response = std::make_shared<Error>();
m_stub->async()->onReceiveMessage(&context, &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, std::move(*response))); });
m_stub->async()->onReceiveMessage(context.get(), &receivedMsg, response.get(),
[response, callback](Status status) { callback(toError(status, *response)); });
}
Loading

0 comments on commit a2b1126

Please sign in to comment.