Skip to content

Commit

Permalink
Merge pull request #24 from Gooddbird/dev_b
Browse files Browse the repository at this point in the history
fix bug that async rpc call timeout isn't useful
  • Loading branch information
Gooddbird authored Jul 2, 2022
2 parents 368d7c4 + 9884802 commit ffa8c08
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 22 deletions.
4 changes: 2 additions & 2 deletions conf/test_http_server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<max_connect_timeout>75</max_connect_timeout>

<!--count of io threads, at least 1-->
<iothread_num>10</iothread_num>
<iothread_num>2</iothread_num>

<time_wheel>
<bucket_num>3</bucket_num>
Expand All @@ -47,4 +47,4 @@
<protocal>HTTP</protocal>
</server>

</root>
</root>
2 changes: 1 addition & 1 deletion testcases/test_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class AsyncRPCTestServlet: public tinyrpc::HttpServlet {
QueryService_Stub stub(async_channel.get());

tinyrpc::TinyPbRpcController rpc_controller;
rpc_controller.SetTimeout(5000);
rpc_controller.SetTimeout(2000);

AppDebugLog << "AsyncRPCTestServlet begin to call RPC async";
stub.query_age(&rpc_controller, &rpc_req, &rpc_res, NULL);
Expand Down
6 changes: 5 additions & 1 deletion testcases/test_rpc_server1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class QueryServiceImpl : public QueryService {
::google::protobuf::Closure* done) {

AppInfoLog << "QueryServiceImpl.query_age, req={"<< request->ShortDebugString() << "}";
AppInfoLog << "QueryServiceImpl.query_age, sleep 6 s begin";
sleep(6);
AppInfoLog << "QueryServiceImpl.query_age, sleep 6 s end";

response->set_ret_code(0);
response->set_res_info("OK");
response->set_req_no(request->req_no());
Expand All @@ -110,7 +114,7 @@ class QueryServiceImpl : public QueryService {
done->Run();
}

// AppInfoLog << "QueryServiceImpl.query_age, req={"<< request->ShortDebugString() << "}, res={" << response->ShortDebugString() << "}";
AppInfoLog << "QueryServiceImpl.query_age, req={"<< request->ShortDebugString() << "}, res={" << response->ShortDebugString() << "}";
}

};
Expand Down
1 change: 1 addition & 0 deletions tinyrpc/comm/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const int ERROR_SERVICE_NOT_FOUND = SYS_ERROR_PREFIX(0008); // not found serv
const int ERROR_METHOD_NOT_FOUND = SYS_ERROR_PREFIX(0009); // not found method

const int ERROR_PARSE_SERVICE_NAME = SYS_ERROR_PREFIX(0010); // not found service name
const int ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD = SYS_ERROR_PREFIX(0011); // not supoort async rpc call when only have single iothread

} // namespace tinyrpc

Expand Down
5 changes: 5 additions & 0 deletions tinyrpc/net/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ void Reactor::loop() {
ErrorLog << "socket [" << fd << "] occur other unknow event:[" << one_event.events << "], need unregister this socket";
delEventInLoopThread(fd);
} else {
// if timer event, direct excute
if (fd == m_timer_fd) {
read_cb();
continue;
}
if (one_event.events & EPOLLIN) {
// DebugLog << "socket [" << fd << "] occur read event";
Mutex::Lock lock(m_mutex);
Expand Down
24 changes: 21 additions & 3 deletions tinyrpc/net/tcp/io_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "tinyrpc/net/tcp/tcp_server.h"
#include "tinyrpc/net/tcp/tcp_connection_time_wheel.h"
#include "tinyrpc/coroutine/coroutine.h"
#include "tinyrpc/coroutine/coroutine_pool.h"
#include "tinyrpc/comm/config.h"


Expand Down Expand Up @@ -149,17 +150,34 @@ void IOThreadPool::addTaskByIndex(int index, std::function<void()> cb) {
}
}

void IOThreadPool::addCoroutineRandomThread(Coroutine::ptr cor, bool self /* = false*/) {
void IOThreadPool::addCoroutineToRandomThread(Coroutine::ptr cor, bool self /* = false*/) {
srand(time(0));
int i = 0;
while (1) {
i = rand() % (m_size - 1);
i = rand() % (m_size);
if (!self && m_io_threads[i]->getPthreadId() == t_cur_io_thread->getPthreadId()) {
continue;
i++;
if (i == m_size) {
i -= 2;
}
}
break;
}
m_io_threads[i]->getReactor()->addCoroutine(cor, true);
// if (m_io_threads[m_index]->getPthreadId() == t_cur_io_thread->getPthreadId()) {
// m_index++;
// if (m_index == m_size || m_index == -1) {
// m_index = 0;
// }
// }
}


Coroutine::ptr IOThreadPool::addCoroutineToRandomThread(std::function<void()> cb, bool self/* = false*/) {
Coroutine::ptr cor = GetCoroutinePool()->getCoroutineInstanse();
cor->setCallBack(cb);
addCoroutineToRandomThread(cor, self);
return cor;
}


Expand Down
6 changes: 5 additions & 1 deletion tinyrpc/net/tcp/io_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ class IOThreadPool {

void addTaskByIndex(int index, std::function<void()> cb);

void addCoroutineToRandomThread(Coroutine::ptr cor, bool self = false);

// add a coroutine to random thread in io thread pool
// self = false, means random thread cann't be current thread
void addCoroutineRandomThread(Coroutine::ptr cor, bool self = false);
// please free cor, or causes memory leak
// call returnCoroutine(cor) to free coroutine
Coroutine::ptr addCoroutineToRandomThread(std::function<void()> cb, bool self = false);

private:
int m_size {0};
Expand Down
2 changes: 1 addition & 1 deletion tinyrpc/net/tcp/tcp_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ int TcpClient::sendAndRecvTinyPb(const std::string& msg_no, TinyPbStruct::pb_ptr
}

while (!m_connection->getResPackageData(msg_no, res)) {

DebugLog << "redo getResPackageData";
m_connection->input();

if (m_connection->getOverTimerFlag()) {
Expand Down
13 changes: 9 additions & 4 deletions tinyrpc/net/timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void Timer::addTimerEvent(TimerEvent::ptr event, bool need_reset /*=true*/) {
}
m_pending_events.emplace(event->m_arrive_time, event);
if (is_reset && need_reset) {
// DebugLog << "need reset timer";
DebugLog << "need reset timer";
resetArriveTime();
}
// DebugLog << "add timer event succ";
Expand Down Expand Up @@ -114,17 +114,16 @@ void Timer::onTimer() {
int64_t now = getNowMs();
auto it = m_pending_events.begin();
std::vector<TimerEvent::ptr> tmps;
std::vector<std::function<void()>> tasks;
std::vector<std::pair<int64_t, std::function<void()>>> tasks;
for (it = m_pending_events.begin(); it != m_pending_events.end(); ++it) {
if ((*it).first <= now && !((*it).second->m_is_cancled)) {
tmps.push_back((*it).second);
tasks.push_back((*it).second->m_task);
tasks.push_back(std::make_pair((*it).second->m_arrive_time, (*it).second->m_task));
} else {
break;
}
}

m_reactor->addTask(tasks);
m_pending_events.erase(m_pending_events.begin(), it);
for (auto i = tmps.begin(); i != tmps.end(); ++i) {
if ((*i)->m_is_repeated) {
Expand All @@ -134,6 +133,12 @@ void Timer::onTimer() {
}

resetArriveTime();

// m_reactor->addTask(tasks);
for (auto i : tasks) {
// DebugLog << "excute timeevent:" << i.first;
i.second();
}
}

}
Expand Down
14 changes: 9 additions & 5 deletions tinyrpc/net/tinypb/tinypb_rpc_async_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ void TinyPbRpcAsyncChannel::CallMethod(const google::protobuf::MethodDescriptor*
google::protobuf::Message* response,
google::protobuf::Closure* done) {

if (GetServer()->getIOThreadPool()->getIOThreadPoolSize() <= 1) {
ErrorLog << "Error! must have at least 2 iothread when call TinyPbRpcAsyncChannel";
TinyPbRpcController* rpc_controller = dynamic_cast<TinyPbRpcController*>(controller);
rpc_controller->SetError(ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD, "Error! must have at least 2 iothread when call TinyPbRpcAsyncChannel");
m_promise.set_value(true);
return;
}

std::shared_ptr<TinyPbRpcAsyncChannel> s_ptr = shared_from_this();
std::shared_ptr<const google::protobuf::MethodDescriptor> method_ptr;
method_ptr.reset(method);
Expand All @@ -61,11 +69,7 @@ void TinyPbRpcAsyncChannel::CallMethod(const google::protobuf::MethodDescriptor*
DebugLog << "excute rpc call method by this thread finish";
s_ptr->m_promise.set_value(true);
};

m_cor = GetCoroutinePool()->getCoroutineInstanse();
m_cor->setCallBack(cb);

GetServer()->getIOThreadPool()->addCoroutineRandomThread(m_cor, false);
m_cor = GetServer()->getIOThreadPool()->addCoroutineToRandomThread(cb, false);
}

std::future<bool> TinyPbRpcAsyncChannel::getFuture() {
Expand Down
5 changes: 3 additions & 2 deletions tinyrpc/net/tinypb/tinypb_rpc_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

namespace tinyrpc {

TinyPbRpcChannel::TinyPbRpcChannel(NetAddress::ptr addr) {
m_client = std::make_shared<TcpClient>(addr);
TinyPbRpcChannel::TinyPbRpcChannel(NetAddress::ptr addr) : m_addr(addr) {

}

void TinyPbRpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
Expand All @@ -24,6 +24,7 @@ void TinyPbRpcChannel::CallMethod(const google::protobuf::MethodDescriptor* meth
google::protobuf::Message* response,
google::protobuf::Closure* done) {

m_client = std::make_shared<TcpClient>(m_addr);
TinyPbStruct pb_struct;
TinyPbRpcController* rpc_controller = dynamic_cast<TinyPbRpcController*>(controller);
rpc_controller->SetLocalAddr(m_client->getLocalAddr());
Expand Down
5 changes: 3 additions & 2 deletions tinyrpc/net/tinypb/tinypb_rpc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

#include <memory>
#include <google/protobuf/service.h>
#include "../net_address.h"
#include "../tcp/tcp_client.h"
#include "tinyrpc/net/net_address.h"
#include "tinyrpc/net//tcp/tcp_client.h"

namespace tinyrpc {

Expand All @@ -22,6 +22,7 @@ void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::Closure* done);

private:
NetAddress::ptr m_addr;
TcpClient::ptr m_client;

};
Expand Down

0 comments on commit ffa8c08

Please sign in to comment.