kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 4238431b9e3761b3aadd532a7cafe3c5148dfc00
parent cd80685d9d9cddee69d3b54e59c640f87b2031f5
Author: Joris Vink <joris@coders.se>
Date:   Fri, 22 Mar 2019 09:49:50 +0100

Add worker_death_policy setting.

By default kore will restart worker processes if they terminate
unexpected. However in certain scenarios you may want to bring down
an entire kore instance if a worker process fails.

By setting worker_death_policy to "terminate" the Kore server will
completely stop if a worker exits unexpected.

Diffstat:
conf/kore.conf.example | 8++++++++
include/kore/kore.h | 7+++++--
src/config.c | 17+++++++++++++++++
src/kore.c | 11++++++++---
src/worker.c | 55+++++++++++++++++++++++++++++++++++++++----------------
5 files changed, 77 insertions(+), 21 deletions(-)

diff --git a/conf/kore.conf.example b/conf/kore.conf.example @@ -53,6 +53,14 @@ workers 4 # before releasing the lock to others. #worker_accept_threshold 16 +# What should the Kore parent process do if a worker +# process unexpectly exits. The default policy is that +# the worker process is automatically restarted. +# +# If you want the kore server to exit if a worker dies +# you can swap the policy to "terminate-server". +#worker_death_policy restart-worker + # Workers bind themselves to a single CPU by default. # Turn this off by setting this option to 0 #worker_set_affinity 1 diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -486,7 +486,9 @@ struct kore_timer { TAILQ_ENTRY(kore_timer) list; }; -#define KORE_WORKER_KEYMGR 0 +#define KORE_WORKER_KEYMGR 0 +#define KORE_WORKER_POLICY_RESTART 1 +#define KORE_WORKER_POLICY_TERMINATE 2 /* Reserved message ids, registered on workers. */ #define KORE_MSG_WEBSOCKET 1 @@ -557,6 +559,7 @@ extern u_int8_t nlisteners; extern u_int16_t cpu_count; extern u_int8_t worker_count; extern const char *kore_version; +extern int worker_policy; extern u_int8_t worker_set_affinity; extern u_int32_t worker_rlimit_nofiles; extern u_int32_t worker_max_connections; @@ -575,7 +578,7 @@ extern struct kore_pool nb_pool; void kore_signal(int); void kore_shutdown(void); void kore_signal_setup(void); -void kore_worker_wait(int); +void kore_worker_reap(void); void kore_worker_init(void); void kore_worker_make_busy(void); void kore_worker_shutdown(void); diff --git a/src/config.c b/src/config.c @@ -62,6 +62,7 @@ static int configure_pidfile(char *); static int configure_rlimit_nofiles(char *); static int configure_max_connections(char *); static int configure_accept_threshold(char *); +static int configure_death_policy(char *); static int configure_set_affinity(char *); static int configure_socket_backlog(char *); @@ -146,6 +147,7 @@ static struct { { "worker_max_connections", configure_max_connections }, { "worker_rlimit_nofiles", configure_rlimit_nofiles }, { "worker_accept_threshold", configure_accept_threshold }, + { "worker_death_policy", configure_death_policy }, { "worker_set_affinity", configure_set_affinity }, { "pidfile", configure_pidfile }, { "socket_backlog", configure_socket_backlog }, @@ -1313,6 +1315,21 @@ configure_accept_threshold(char *option) } static int +configure_death_policy(char *option) +{ + if (!strcmp(option, "restart")) { + worker_policy = KORE_WORKER_POLICY_RESTART; + } else if (!strcmp(option, "terminate")) { + worker_policy = KORE_WORKER_POLICY_TERMINATE; + } else { + printf("bad value for worker_death_policy: %s\n", option); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} + +static int configure_set_affinity(char *option) { int err; diff --git a/src/kore.c b/src/kore.c @@ -596,9 +596,9 @@ static void kore_server_start(int argc, char *argv[]) { u_int32_t tmp; - int quit; struct kore_runtime_call *rcall; u_int64_t netwait; + int quit, last_sig; if (foreground == 0) { if (daemon(1, 0) == -1) @@ -662,6 +662,8 @@ kore_server_start(int argc, char *argv[]) while (quit != 1) { if (sig_recv != 0) { + last_sig = sig_recv; + switch (sig_recv) { case SIGHUP: kore_worker_dispatch_signal(sig_recv); @@ -677,13 +679,16 @@ kore_server_start(int argc, char *argv[]) kore_worker_dispatch_signal(sig_recv); break; case SIGCHLD: - kore_worker_wait(0); + kore_worker_reap(); break; default: break; } - sig_recv = 0; + if (sig_recv == last_sig) + sig_recv = 0; + else + continue; } netwait = kore_timer_next_run(kore_time_ms()); diff --git a/src/worker.c b/src/worker.c @@ -94,6 +94,7 @@ u_int32_t worker_accept_threshold = 16; u_int32_t worker_rlimit_nofiles = 768; u_int32_t worker_max_connections = 512; u_int32_t worker_active_connections = 0; +int worker_policy = KORE_WORKER_POLICY_RESTART; void kore_worker_init(void) @@ -191,6 +192,8 @@ void kore_worker_shutdown(void) { struct kore_worker *kw; + pid_t pid; + int status; u_int16_t id, done; if (!kore_quiet) { @@ -199,12 +202,20 @@ kore_worker_shutdown(void) } for (;;) { + for (id = 0; id < worker_count; id++) { + kw = WORKER(id); + if (kw->pid != 0) { + pid = waitpid(kw->pid, &status, 0); + if (pid == -1) + continue; + kw->pid = 0; + } + } + done = 0; for (id = 0; id < worker_count; id++) { kw = WORKER(id); - if (kw->pid != 0) - kore_worker_wait(1); - else + if (kw->pid == 0) done++; } @@ -501,7 +512,7 @@ kore_worker_entry(struct kore_worker *kw) } void -kore_worker_wait(int final) +kore_worker_reap(void) { u_int16_t id; pid_t pid; @@ -509,14 +520,20 @@ kore_worker_wait(int final) const char *func; int status; - if (final) - pid = waitpid(WAIT_ANY, &status, 0); - else + for (;;) { pid = waitpid(WAIT_ANY, &status, WNOHANG); - if (pid == -1) { - kore_debug("waitpid(): %s", errno_s); - return; + if (pid == -1) { + if (errno == ECHILD) + return; + if (errno == EINTR) + continue; + kore_log(LOG_ERR, + "failed to wait for children: %s", errno_s); + return; + } + + break; } if (pid == 0) @@ -527,16 +544,11 @@ kore_worker_wait(int final) if (kw->pid != pid) continue; - if (final == 0 || (final == 1 && !kore_quiet)) { + if (!kore_quiet) { kore_log(LOG_NOTICE, "worker %d (%d)-> status %d", kw->id, pid, status); } - if (final) { - kw->pid = 0; - break; - } - if (WEXITSTATUS(status) || WTERMSIG(status) || WCOREDUMP(status)) { func = "none"; @@ -574,6 +586,17 @@ kore_worker_wait(int final) } #endif + if (worker_policy == KORE_WORKER_POLICY_TERMINATE) { + kw->pid = 0; + kore_log(LOG_NOTICE, + "worker policy is 'terminate', stopping"); + if (raise(SIGTERM) != 0) { + kore_log(LOG_WARNING, + "failed to raise SIGTERM signal"); + } + break; + } + kore_log(LOG_NOTICE, "restarting worker %d", kw->id); kw->restarted = 1; kore_msg_parent_remove(kw);