Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC]Add support for graceful shutdown for twemcache/slimcache #225

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/ccommon/include/channel/cc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ enum {
CHANNEL_OPEN, /* opening */
CHANNEL_ESTABLISHED,
CHANNEL_TERM, /* to be closed, don't need a closing state yet */
CHANNEL_TERM_RESET_DB,
CHANNEL_ERROR, /* unrecoverable error occurred */

CHANNEL_SENTINEL
Expand Down
18 changes: 18 additions & 0 deletions src/core/admin/admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ _admin_post_read(struct buf_sock *s)
goto done;
}

if (req.type == REQ_SHUTDOWN) {
log_info("peer called shutdown");
s->ch->state = CHANNEL_TERM_RESET_DB;
goto done;
}

admin_response_reset(&rsp);

admin_process_request(&rsp, &req);
Expand Down Expand Up @@ -193,6 +199,12 @@ _admin_event(void *arg, uint32_t events)
NOT_REACHED();
}

if (s->ch->state == CHANNEL_TERM_RESET_DB) {
_admin_close(s);
raise(SIGTERM);
return;
}

if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) {
_admin_close(s);
}
Expand Down Expand Up @@ -310,6 +322,12 @@ core_admin_register(uint64_t intvl_ms, timeout_cb_fn cb, void *arg)
return timing_wheel_insert(tw, &delay, true, cb, arg);
}

void
core_admin_unregister(struct timeout_event *tev)
{
timing_wheel_remove(tw, &tev);
}

static rstatus_i
_admin_evwait(void)
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ void core_admin_teardown(void);
/* add a periodic action to be executed on the admin thread, which uses timing wheel */
struct timeout_event *
core_admin_register(uint64_t intvl_ms, timeout_cb_fn cb, void *arg);

void core_admin_unregister(struct timeout_event *tev);
void core_admin_evloop(void);
28 changes: 25 additions & 3 deletions src/core/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include <string.h>
#include <sysexits.h>

pthread_t worker, server;
void
core_run(void *arg_worker)
core_run(void *arg_worker, void *arg_server)
{
pthread_t worker, server;
int ret;

if (!admin_init || !server_init || !worker_init) {
Expand All @@ -26,7 +26,7 @@ core_run(void *arg_worker)
goto error;
}

ret = pthread_create(&server, NULL, core_server_evloop, NULL);
ret = pthread_create(&server, NULL, core_server_evloop, arg_server);
if (ret != 0) {
log_crit("pthread create failed for server thread: %s", strerror(ret));
goto error;
Expand All @@ -37,3 +37,25 @@ core_run(void *arg_worker)
error:
exit(EX_OSERR);
}

void core_destroy(void)
{
int ret;

if (!server_init || !worker_init) {
log_crit("cannot run: server/worker have to be initialized");
return;
}

ret = pthread_join(server, NULL);
if (ret != 0) {
log_crit("pthread join failed for worker thread: %s", strerror(ret));
exit(EX_OSERR);
}

ret = pthread_join(worker, NULL);
if (ret != 0) {
log_crit("pthread join failed for server thread: %s", strerror(ret));
exit(EX_OSERR);
}
}
3 changes: 2 additions & 1 deletion src/core/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
#include "data/server.h"
#include "data/worker.h"

void core_run(void *arg_worker);
void core_run(void *arg_worker, void *arg_server);
void core_destroy(void);
8 changes: 5 additions & 3 deletions src/core/data/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,14 @@ _server_evwait(void)
void *
core_server_evloop(void *arg)
{
for(;;) {
bool *running = arg;

while (__atomic_load_n(running, __ATOMIC_ACQUIRE)) {
if (_server_evwait() != CC_OK) {
log_crit("server core event loop exited due to failure");
break;
exit(1);
}
}

exit(1);
return NULL;
}
6 changes: 3 additions & 3 deletions src/core/data/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ core_worker_evloop(void *arg)
{
processor = arg;

for(;;) {
while (__atomic_load_n(&processor->running, __ATOMIC_ACQUIRE)) {
if (_worker_evwait() != CC_OK) {
log_crit("worker core event loop exited due to failure");
break;
exit(1);
}
}

exit(1);
return NULL;
}
1 change: 1 addition & 0 deletions src/core/data/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct data_processor {
data_fn read;
data_fn write;
data_fn error;
bool running;
};

void core_worker_setup(worker_options_st *options, worker_metrics_st *metrics);
Expand Down
8 changes: 8 additions & 0 deletions src/protocol/admin/parse.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ _get_req_type(struct request *req, struct bstring *type)
break;
}

break;

case 8:
if (str8cmp(type->data, 's', 'h', 'u', 't', 'd', 'o', 'w', 'n')) {
req->type = REQ_SHUTDOWN;
break;
}

break;
}

Expand Down
3 changes: 2 additions & 1 deletion src/protocol/admin/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
ACTION( REQ_UNKNOWN, "" )\
ACTION( REQ_STATS, "stats" )\
ACTION( REQ_VERSION, "version" )\
ACTION( REQ_QUIT, "quit" )
ACTION( REQ_QUIT, "quit" )\
ACTION( REQ_SHUTDOWN, "shutdown" )

#define GET_TYPE(_name, _str) _name,
typedef enum request_type {
Expand Down
3 changes: 2 additions & 1 deletion src/server/cdb/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct data_processor worker_processor = {
cdb_process_read,
cdb_process_write,
cdb_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -239,7 +240,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
3 changes: 2 additions & 1 deletion src/server/pingserver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct data_processor worker_processor = {
pingserver_process_read,
pingserver_process_write,
pingserver_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -195,7 +196,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
3 changes: 2 additions & 1 deletion src/server/rds/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct data_processor worker_processor = {
rds_process_read,
rds_process_write,
rds_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -197,7 +198,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
34 changes: 30 additions & 4 deletions src/server/slimcache/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@
#include <sys/socket.h>
#include <sysexits.h>

enum slimcache_timeout_event_type {
DLOG_TIMEOUT_EV,
KLOG_TIMEOUT_EV,
MAX_TIMEOUT_EV
};

static struct timeout_event *slimcache_tev[MAX_TIMEOUT_EV];

struct data_processor worker_processor = {
slimcache_process_read,
slimcache_process_write,
slimcache_process_error
slimcache_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -80,6 +89,18 @@ teardown(void)
log_teardown();
}

static void
_shutdown(int signo)
{
log_stderr("_shutdown received signal %d", signo);
__atomic_store_n(&worker_processor.running, false, __ATOMIC_RELEASE);
core_destroy();
for (int i = DLOG_TIMEOUT_EV; i < MAX_TIMEOUT_EV; ++i) {
core_admin_unregister(slimcache_tev[i]);
}
exit(EX_OK);
}

static void
setup(void)
{
Expand All @@ -91,6 +112,11 @@ setup(void)
exit(EX_OSERR); /* only failure comes from NOMEM */
}

if (signal_override(SIGTERM, "perform shutdown", 0, 0, _shutdown) < 0) {
log_stderr("cannot override signal");
exit(EX_OSERR);
}

/* Setup logging first */
log_setup(&stats.log);
if (debug_setup(&setting.debug) < 0) {
Expand Down Expand Up @@ -134,13 +160,13 @@ setup(void)

/* adding recurring events to maintenance/admin thread */
intvl = option_uint(&setting.slimcache.dlog_intvl);
if (core_admin_register(intvl, debug_log_flush, NULL) == NULL) {
if ((slimcache_tev[DLOG_TIMEOUT_EV] = core_admin_register(intvl, debug_log_flush, NULL)) == NULL) {
log_stderr("Could not register timed event to flush debug log");
goto error;
}

intvl = option_uint(&setting.slimcache.klog_intvl);
if (core_admin_register(intvl, klog_flush, NULL) == NULL) {
if ((slimcache_tev[KLOG_TIMEOUT_EV] = core_admin_register(intvl, klog_flush, NULL)) == NULL) {
log_error("Could not register timed event to flush command log");
goto error;
}
Expand Down Expand Up @@ -213,7 +239,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
3 changes: 2 additions & 1 deletion src/server/slimrds/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct data_processor worker_processor = {
slimrds_process_read,
slimrds_process_write,
slimrds_process_error,
.running = true
};

static void
Expand Down Expand Up @@ -197,7 +198,7 @@ main(int argc, char **argv)
setup();
option_print_all((struct option *)&setting, nopt);

core_run(&worker_processor);
core_run(&worker_processor, &worker_processor.running);

exit(EX_OK);
}
Loading