From 205021473b591838afef17ed4ed3d7ec3131669b Mon Sep 17 00:00:00 2001 From: michalbiesek Date: Wed, 3 Jul 2019 10:00:51 +0200 Subject: [PATCH] Add support for graceful shutdown for twemcache/slimcache - add new admin command "shutdown" to support this operation - add support for SIGTERM which should allow calling properly teardown methods Co-authored-by: Piotr Balcer --- deps/ccommon/include/channel/cc_channel.h | 1 + src/core/admin/admin.c | 18 ++++++++++++ src/core/admin/admin.h | 2 +- src/core/core.c | 28 ++++++++++++++++-- src/core/core.h | 3 +- src/core/data/server.c | 8 ++++-- src/core/data/worker.c | 6 ++-- src/core/data/worker.h | 1 + src/protocol/admin/parse.c | 8 ++++++ src/protocol/admin/request.h | 3 +- src/server/cdb/main.c | 3 +- src/server/pingserver/main.c | 3 +- src/server/rds/main.c | 3 +- src/server/slimcache/main.c | 34 +++++++++++++++++++--- src/server/slimrds/main.c | 3 +- src/server/twemcache/main.c | 35 ++++++++++++++++++++--- test/protocol/admin/check_admin.c | 25 ++++++++++++++++ 17 files changed, 160 insertions(+), 24 deletions(-) diff --git a/deps/ccommon/include/channel/cc_channel.h b/deps/ccommon/include/channel/cc_channel.h index 1546d87c5..10721a089 100644 --- a/deps/ccommon/include/channel/cc_channel.h +++ b/deps/ccommon/include/channel/cc_channel.h @@ -111,6 +111,7 @@ enum { CHANNEL_OPEN, /* opening */ CHANNEL_ESTABLISHED, CHANNEL_TERM, /* to be closed, don't need a closing state yet */ + CHANNEL_TERM_RESET_DB, CHANNEL_ERROR, /* unrecoverable error occurred */ CHANNEL_SENTINEL diff --git a/src/core/admin/admin.c b/src/core/admin/admin.c index 8449c8074..3275dc84a 100644 --- a/src/core/admin/admin.c +++ b/src/core/admin/admin.c @@ -142,6 +142,12 @@ _admin_post_read(struct buf_sock *s) goto done; } + if (req.type == REQ_SHUTDOWN) { + log_info("peer called shutdown"); + s->ch->state = CHANNEL_TERM_RESET_DB; + goto done; + } + admin_response_reset(&rsp); admin_process_request(&rsp, &req); @@ -193,6 +199,12 @@ _admin_event(void *arg, uint32_t events) NOT_REACHED(); } + if (s->ch->state == CHANNEL_TERM_RESET_DB) { + _admin_close(s); + raise(SIGTERM); + return; + } + if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) { _admin_close(s); } @@ -310,6 +322,12 @@ core_admin_register(uint64_t intvl_ms, timeout_cb_fn cb, void *arg) return timing_wheel_insert(tw, &delay, true, cb, arg); } +void +core_admin_unregister(struct timeout_event *tev) +{ + timing_wheel_remove(tw, &tev); +} + static rstatus_i _admin_evwait(void) { diff --git a/src/core/admin/admin.h b/src/core/admin/admin.h index 9c1d36e8c..d4079ee92 100644 --- a/src/core/admin/admin.h +++ b/src/core/admin/admin.h @@ -39,5 +39,5 @@ void core_admin_teardown(void); /* add a periodic action to be executed on the admin thread, which uses timing wheel */ struct timeout_event * core_admin_register(uint64_t intvl_ms, timeout_cb_fn cb, void *arg); - +void core_admin_unregister(struct timeout_event *tev); void core_admin_evloop(void); diff --git a/src/core/core.c b/src/core/core.c index a4ea7433b..fe1acba8f 100644 --- a/src/core/core.c +++ b/src/core/core.c @@ -9,10 +9,10 @@ #include #include +pthread_t worker, server; void -core_run(void *arg_worker) +core_run(void *arg_worker, void *arg_server) { - pthread_t worker, server; int ret; if (!admin_init || !server_init || !worker_init) { @@ -26,7 +26,7 @@ core_run(void *arg_worker) goto error; } - ret = pthread_create(&server, NULL, core_server_evloop, NULL); + ret = pthread_create(&server, NULL, core_server_evloop, arg_server); if (ret != 0) { log_crit("pthread create failed for server thread: %s", strerror(ret)); goto error; @@ -37,3 +37,25 @@ core_run(void *arg_worker) error: exit(EX_OSERR); } + +void core_destroy(void) +{ + int ret; + + if (!server_init || !worker_init) { + log_crit("cannot run: server/worker have to be initialized"); + return; + } + + ret = pthread_join(server, NULL); + if (ret != 0) { + log_crit("pthread join failed for worker thread: %s", strerror(ret)); + exit(EX_OSERR); + } + + ret = pthread_join(worker, NULL); + if (ret != 0) { + log_crit("pthread join failed for server thread: %s", strerror(ret)); + exit(EX_OSERR); + } +} diff --git a/src/core/core.h b/src/core/core.h index c05542c13..f405e6c95 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -9,4 +9,5 @@ #include "data/server.h" #include "data/worker.h" -void core_run(void *arg_worker); +void core_run(void *arg_worker, void *arg_server); +void core_destroy(void); diff --git a/src/core/data/server.c b/src/core/data/server.c index ef98dc55d..3a7860b39 100644 --- a/src/core/data/server.c +++ b/src/core/data/server.c @@ -357,12 +357,14 @@ _server_evwait(void) void * core_server_evloop(void *arg) { - for(;;) { + bool *running = arg; + + while (__atomic_load_n(running, __ATOMIC_ACQUIRE)) { if (_server_evwait() != CC_OK) { log_crit("server core event loop exited due to failure"); - break; + exit(1); } } - exit(1); + return NULL; } diff --git a/src/core/data/worker.c b/src/core/data/worker.c index ea234ce44..0615b95e9 100644 --- a/src/core/data/worker.c +++ b/src/core/data/worker.c @@ -295,12 +295,12 @@ core_worker_evloop(void *arg) { processor = arg; - for(;;) { + while (__atomic_load_n(&processor->running, __ATOMIC_ACQUIRE)) { if (_worker_evwait() != CC_OK) { log_crit("worker core event loop exited due to failure"); - break; + exit(1); } } - exit(1); + return NULL; } diff --git a/src/core/data/worker.h b/src/core/data/worker.h index 4aa01693f..879494d0f 100644 --- a/src/core/data/worker.h +++ b/src/core/data/worker.h @@ -42,6 +42,7 @@ struct data_processor { data_fn read; data_fn write; data_fn error; + bool running; }; void core_worker_setup(worker_options_st *options, worker_metrics_st *metrics); diff --git a/src/protocol/admin/parse.c b/src/protocol/admin/parse.c index ecf40befb..121b3cd89 100644 --- a/src/protocol/admin/parse.c +++ b/src/protocol/admin/parse.c @@ -49,6 +49,14 @@ _get_req_type(struct request *req, struct bstring *type) break; } + break; + + case 8: + if (str8cmp(type->data, 's', 'h', 'u', 't', 'd', 'o', 'w', 'n')) { + req->type = REQ_SHUTDOWN; + break; + } + break; } diff --git a/src/protocol/admin/request.h b/src/protocol/admin/request.h index 866b9bb2d..1519e32cf 100644 --- a/src/protocol/admin/request.h +++ b/src/protocol/admin/request.h @@ -25,7 +25,8 @@ ACTION( REQ_UNKNOWN, "" )\ ACTION( REQ_STATS, "stats" )\ ACTION( REQ_VERSION, "version" )\ - ACTION( REQ_QUIT, "quit" ) + ACTION( REQ_QUIT, "quit" )\ + ACTION( REQ_SHUTDOWN, "shutdown" ) #define GET_TYPE(_name, _str) _name, typedef enum request_type { diff --git a/src/server/cdb/main.c b/src/server/cdb/main.c index 201a14c74..7007b977d 100644 --- a/src/server/cdb/main.c +++ b/src/server/cdb/main.c @@ -23,6 +23,7 @@ struct data_processor worker_processor = { cdb_process_read, cdb_process_write, cdb_process_error, + .running = true }; static void @@ -239,7 +240,7 @@ main(int argc, char **argv) setup(); option_print_all((struct option *)&setting, nopt); - core_run(&worker_processor); + core_run(&worker_processor, &worker_processor.running); exit(EX_OK); } diff --git a/src/server/pingserver/main.c b/src/server/pingserver/main.c index f823f3c30..7556c0194 100644 --- a/src/server/pingserver/main.c +++ b/src/server/pingserver/main.c @@ -17,6 +17,7 @@ struct data_processor worker_processor = { pingserver_process_read, pingserver_process_write, pingserver_process_error, + .running = true }; static void @@ -195,7 +196,7 @@ main(int argc, char **argv) setup(); option_print_all((struct option *)&setting, nopt); - core_run(&worker_processor); + core_run(&worker_processor, &worker_processor.running); exit(EX_OK); } diff --git a/src/server/rds/main.c b/src/server/rds/main.c index 888a64a4b..870a57592 100644 --- a/src/server/rds/main.c +++ b/src/server/rds/main.c @@ -17,6 +17,7 @@ struct data_processor worker_processor = { rds_process_read, rds_process_write, rds_process_error, + .running = true }; static void @@ -197,7 +198,7 @@ main(int argc, char **argv) setup(); option_print_all((struct option *)&setting, nopt); - core_run(&worker_processor); + core_run(&worker_processor, &worker_processor.running); exit(EX_OK); } diff --git a/src/server/slimcache/main.c b/src/server/slimcache/main.c index e0d38836a..7719606d9 100644 --- a/src/server/slimcache/main.c +++ b/src/server/slimcache/main.c @@ -13,10 +13,19 @@ #include #include +enum slimcache_timeout_event_type { + DLOG_TIMEOUT_EV, + KLOG_TIMEOUT_EV, + MAX_TIMEOUT_EV +}; + +static struct timeout_event *slimcache_tev[MAX_TIMEOUT_EV]; + struct data_processor worker_processor = { slimcache_process_read, slimcache_process_write, - slimcache_process_error + slimcache_process_error, + .running = true }; static void @@ -80,6 +89,18 @@ teardown(void) log_teardown(); } +static void +_shutdown(int signo) +{ + log_stderr("_shutdown received signal %d", signo); + __atomic_store_n(&worker_processor.running, false, __ATOMIC_RELEASE); + core_destroy(); + for (int i = DLOG_TIMEOUT_EV; i < MAX_TIMEOUT_EV; ++i) { + core_admin_unregister(slimcache_tev[i]); + } + exit(EX_OK); +} + static void setup(void) { @@ -91,6 +112,11 @@ setup(void) exit(EX_OSERR); /* only failure comes from NOMEM */ } + if (signal_override(SIGTERM, "perform shutdown", 0, 0, _shutdown) < 0) { + log_stderr("cannot override signal"); + exit(EX_OSERR); + } + /* Setup logging first */ log_setup(&stats.log); if (debug_setup(&setting.debug) < 0) { @@ -134,13 +160,13 @@ setup(void) /* adding recurring events to maintenance/admin thread */ intvl = option_uint(&setting.slimcache.dlog_intvl); - if (core_admin_register(intvl, debug_log_flush, NULL) == NULL) { + if ((slimcache_tev[DLOG_TIMEOUT_EV] = core_admin_register(intvl, debug_log_flush, NULL)) == NULL) { log_stderr("Could not register timed event to flush debug log"); goto error; } intvl = option_uint(&setting.slimcache.klog_intvl); - if (core_admin_register(intvl, klog_flush, NULL) == NULL) { + if ((slimcache_tev[KLOG_TIMEOUT_EV] = core_admin_register(intvl, klog_flush, NULL)) == NULL) { log_error("Could not register timed event to flush command log"); goto error; } @@ -213,7 +239,7 @@ main(int argc, char **argv) setup(); option_print_all((struct option *)&setting, nopt); - core_run(&worker_processor); + core_run(&worker_processor, &worker_processor.running); exit(EX_OK); } diff --git a/src/server/slimrds/main.c b/src/server/slimrds/main.c index 702476948..2c354212d 100644 --- a/src/server/slimrds/main.c +++ b/src/server/slimrds/main.c @@ -17,6 +17,7 @@ struct data_processor worker_processor = { slimrds_process_read, slimrds_process_write, slimrds_process_error, + .running = true }; static void @@ -197,7 +198,7 @@ main(int argc, char **argv) setup(); option_print_all((struct option *)&setting, nopt); - core_run(&worker_processor); + core_run(&worker_processor, &worker_processor.running); exit(EX_OK); } diff --git a/src/server/twemcache/main.c b/src/server/twemcache/main.c index a0dc85a89..50a9383b3 100644 --- a/src/server/twemcache/main.c +++ b/src/server/twemcache/main.c @@ -13,10 +13,20 @@ #include #include +enum twemcache_timeout_event_type { + DLOG_TIMEOUT_EV, + KLOG_TIMEOUT_EV, + STATS_TIMEOUT_EV, + MAX_TIMEOUT_EV +}; + +static struct timeout_event *twemcache_tev[MAX_TIMEOUT_EV]; + struct data_processor worker_processor = { twemcache_process_read, twemcache_process_write, twemcache_process_error, + .running = true }; static void @@ -77,6 +87,18 @@ teardown(void) log_teardown(); } +static void +_shutdown(int signo) +{ + log_stderr("_shutdown received signal %d", signo); + __atomic_store_n(&worker_processor.running, false, __ATOMIC_RELEASE); + core_destroy(); + for (int i = DLOG_TIMEOUT_EV; i < MAX_TIMEOUT_EV; ++i) { + core_admin_unregister(twemcache_tev[i]); + } + exit(EX_OK); +} + static void setup(void) { @@ -88,6 +110,11 @@ setup(void) exit(EX_OSERR); /* only failure comes from NOMEM */ } + if (signal_override(SIGTERM, "perform shutdown", 0, 0, _shutdown) < 0) { + log_stderr("cannot override signal"); + exit(EX_OSERR); + } + /* Setup logging first */ log_setup(&stats.log); if (debug_setup(&setting.debug) != CC_OK) { @@ -132,19 +159,19 @@ setup(void) /* adding recurring events to maintenance/admin thread */ intvl = option_uint(&setting.twemcache.dlog_intvl); - if (core_admin_register(intvl, debug_log_flush, NULL) == NULL) { + if ((twemcache_tev[DLOG_TIMEOUT_EV] = core_admin_register(intvl, debug_log_flush, NULL)) == NULL) { log_stderr("Could not register timed event to flush debug log"); goto error; } intvl = option_uint(&setting.twemcache.klog_intvl); - if (core_admin_register(intvl, klog_flush, NULL) == NULL) { + if ((twemcache_tev[KLOG_TIMEOUT_EV] = core_admin_register(intvl, klog_flush, NULL)) == NULL) { log_error("Could not register timed event to flush command log"); goto error; } intvl = option_uint(&setting.twemcache.stats_intvl); - if (core_admin_register(intvl, stats_dump, NULL) == NULL) { + if ((twemcache_tev[STATS_TIMEOUT_EV] = core_admin_register(intvl, stats_dump, NULL)) == NULL) { log_error("Could not register timed event to dump stats"); goto error; } @@ -216,7 +243,7 @@ main(int argc, char **argv) setup(); option_print_all((struct option *)&setting, nopt); - core_run(&worker_processor); + core_run(&worker_processor, &worker_processor.running); exit(EX_OK); } diff --git a/test/protocol/admin/check_admin.c b/test/protocol/admin/check_admin.c index 4c0edc90e..4d27f11ec 100644 --- a/test/protocol/admin/check_admin.c +++ b/test/protocol/admin/check_admin.c @@ -69,6 +69,30 @@ START_TEST(test_quit) } END_TEST +START_TEST(test_shutdown) +{ +#define SERIALIZED "shutdown\r\n" + int ret; + int len = sizeof(SERIALIZED) - 1; + + test_reset(); + + /* compose */ + req->type = REQ_SHUTDOWN; + ret = admin_compose_req(&buf, req); + ck_assert_msg(ret == len, "expected: %d, returned: %d", len, ret); + ck_assert_int_eq(cc_bcmp(buf->rpos, SERIALIZED, ret), 0); + + /* parse */ + admin_request_reset(req); + ret = admin_parse_req(req, buf); + ck_assert_int_eq(ret, PARSE_OK); + ck_assert(req->state == REQ_PARSED); + ck_assert(req->type = REQ_SHUTDOWN); +#undef SERIALIZED +} +END_TEST + START_TEST(test_stats) { #define SERIALIZED "stats\r\n" @@ -132,6 +156,7 @@ admin_suite(void) tcase_add_test(tc_basic_req, test_quit); tcase_add_test(tc_basic_req, test_stats); tcase_add_test(tc_basic_req, test_version); + tcase_add_test(tc_basic_req, test_shutdown); return s; }