kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit efc7b3d9a63845bf5fd8451c49346f935efaa030
parent 155c7dfbde713a244a68707c680d117660eb9c0d
Author: Joris Vink <joris@coders.se>
Date:   Wed,  3 Nov 2021 17:23:05 +0100

Improve how the parent handles workers.

- Make sure we drain the worker log channel if it dies
  so we can flush out any lingering log messages.

- Get rid of the raise() in the parent to signal ourselves
  we should terminate. Instead depend on the new kore_quit.

- Always attempt to reap children one way or the other.

Diffstat:
include/kore/kore.h | 1+
src/kore.c | 27++++++++++++++++++---------
src/msg.c | 16++--------------
src/worker.c | 64++++++++++++++++++++++++++++++++--------------------------------
4 files changed, 53 insertions(+), 55 deletions(-)

diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -709,6 +709,7 @@ extern char *config_file; #endif extern pid_t kore_pid; +extern int kore_quit; extern int kore_quiet; extern int kore_debug; extern int skip_chroot; diff --git a/src/kore.c b/src/kore.c @@ -54,6 +54,7 @@ volatile sig_atomic_t sig_recv; struct kore_server_list kore_servers; u_int8_t nlisteners; int kore_argc = 0; +int kore_quit = 0; pid_t kore_pid = -1; u_int16_t cpu_count = 1; int kore_debug = 0; @@ -80,6 +81,7 @@ static void version(void); static void kore_write_kore_pid(void); static void kore_proctitle_setup(void); static void kore_server_sslstart(void); +static void kore_server_shutdown(void); static void kore_server_start(int, char *[]); static void kore_call_parent_configure(int, char **); @@ -271,11 +273,7 @@ main(int argc, char *argv[]) kore_signal_setup(); kore_server_start(argc, argv); - - if (!kore_quiet) - kore_log(LOG_INFO, "server shutting down"); - - kore_worker_shutdown(); + kore_server_shutdown(); rcall = kore_runtime_getcall(parent_teardown_hook); if (rcall != NULL) { @@ -858,7 +856,7 @@ kore_server_start(int argc, char *argv[]) u_int32_t tmp; struct kore_server *srv; u_int64_t netwait; - int quit, last_sig; + int last_sig; #if defined(KORE_SINGLE_BINARY) struct kore_runtime_call *rcall; #endif @@ -951,7 +949,6 @@ kore_server_start(int argc, char *argv[]) kore_platform_event_init(); kore_msg_parent_init(); - quit = 0; worker_max_connections = tmp; kore_timer_init(); @@ -963,7 +960,7 @@ kore_server_start(int argc, char *argv[]) kore_msg_unregister(KORE_PYTHON_SEND_OBJ); #endif - while (quit != 1) { + while (kore_quit != 1) { if (sig_recv != 0) { last_sig = sig_recv; @@ -975,7 +972,7 @@ kore_server_start(int argc, char *argv[]) case SIGINT: case SIGQUIT: case SIGTERM: - quit = 1; + kore_quit = 1; kore_worker_dispatch_signal(sig_recv); continue; case SIGUSR1: @@ -998,8 +995,20 @@ kore_server_start(int argc, char *argv[]) kore_platform_event_wait(netwait); kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); kore_timer_run(kore_time_ms()); + kore_worker_reap(); } + kore_worker_dispatch_signal(SIGQUIT); +} + +static void +kore_server_shutdown(void) +{ + if (!kore_quiet) + kore_log(LOG_INFO, "server shutting down"); + + kore_worker_shutdown(); + #if !defined(KORE_NO_HTTP) kore_accesslog_gather(NULL, kore_time_ms(), 1); #endif diff --git a/src/msg.c b/src/msg.c @@ -33,9 +33,8 @@ struct msg_type { }; static struct msg_type *msg_type_lookup(u_int8_t); -static int msg_recv_packet(struct netbuf *); static int msg_recv_data(struct netbuf *); -static void msg_disconnected_parent(struct connection *); +static int msg_recv_packet(struct netbuf *); static void msg_disconnected_worker(struct connection *); static void msg_type_shutdown(struct kore_msg *, const void *); @@ -113,7 +112,6 @@ kore_msg_worker_init(void) worker->msg[1]->write = net_write; worker->msg[1]->proto = CONN_PROTO_MSG; worker->msg[1]->state = CONN_STATE_ESTABLISHED; - worker->msg[1]->disconnect = msg_disconnected_parent; worker->msg[1]->handle = kore_connection_handle; worker->msg[1]->evt.flags = KORE_EVENT_WRITE; @@ -246,16 +244,6 @@ msg_recv_data(struct netbuf *nb) } static void -msg_disconnected_parent(struct connection *c) -{ - if (!kore_quiet) - kore_log(LOG_ERR, "parent gone, shutting down"); - - if (kill(worker->pid, SIGQUIT) == -1) - kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s); -} - -static void msg_disconnected_worker(struct connection *c) { c->hdlr_extra = NULL; @@ -269,7 +257,7 @@ msg_type_shutdown(struct kore_msg *msg, const void *data) "shutdown requested by worker %u, going down", msg->src); } - (void)raise(SIGQUIT); + kore_quit = 1; } #if !defined(KORE_NO_HTTP) diff --git a/src/worker.c b/src/worker.c @@ -297,9 +297,13 @@ kore_worker_shutdown(void) kw->pid = 0; kw->running = 0; + kw->msg[0]->evt.flags |= KORE_EVENT_READ; + net_recv_flush(kw->msg[0]); + if (!kore_quiet) { - kore_log(LOG_NOTICE, "worker %s exited", - kore_worker_name(kw->id)); + kore_log(LOG_NOTICE, + "worker %s exited (%d)", + kore_worker_name(kw->id), status); } } } @@ -610,7 +614,6 @@ kore_worker_entry(struct kore_worker *kw) kore_free(rcall); } - kore_msg_send(KORE_MSG_PARENT, KORE_MSG_SHUTDOWN, NULL, 0); kore_server_cleanup(); kore_platform_event_cleanup(); @@ -642,24 +645,19 @@ kore_worker_reap(void) pid_t pid; int status; - for (;;) { - pid = waitpid(WAIT_ANY, &status, WNOHANG); + pid = waitpid(WAIT_ANY, &status, WNOHANG); - if (pid == -1) { - if (errno == ECHILD) - return; - if (errno == EINTR) - continue; - kore_log(LOG_ERR, - "failed to wait for children: %s", errno_s); + if (pid == -1) { + if (errno == ECHILD || errno == EINTR) return; - } + kore_log(LOG_ERR, "%s: waitpid(): %s", __func__, errno_s); + return; + } - if (pid == 0) - return; + if (pid == 0) + return; - worker_reaper(pid, status); - } + worker_reaper(pid, status); } void @@ -785,6 +783,9 @@ worker_reaper(pid_t pid, int status) if (kw->pid != pid) continue; + kw->msg[0]->evt.flags |= KORE_EVENT_READ; + net_recv_flush(kw->msg[0]); + if (!kore_quiet) { kore_log(LOG_NOTICE, "worker %s (%d) exited with status %d", @@ -819,10 +820,7 @@ worker_reaper(pid_t pid, int status) kore_log(LOG_CRIT, "keymgr or acme process gone, stopping"); kw->pid = 0; - if (raise(SIGTERM) != 0) { - kore_log(LOG_WARNING, - "failed to raise SIGTERM signal"); - } + kore_quit = 1; break; } @@ -844,22 +842,24 @@ worker_reaper(pid_t pid, int status) kw->pid = 0; kore_log(LOG_NOTICE, "worker policy is 'terminate', stopping"); - if (raise(SIGTERM) != 0) { - kore_log(LOG_WARNING, - "failed to raise SIGTERM signal"); - } + kore_quit = 1; break; } - kore_log(LOG_NOTICE, "restarting worker %d", kw->id); - kw->restarted = 1; - kore_msg_parent_remove(kw); + if (kore_quit == 0) { + kore_log(LOG_NOTICE, "restarting worker %d", kw->id); + kw->restarted = 1; + kore_msg_parent_remove(kw); - if (!kore_worker_spawn(idx, kw->id, kw->cpu)) - (void)raise(SIGQUIT); + if (!kore_worker_spawn(idx, kw->id, kw->cpu)) { + kore_quit = 1; + kore_log(LOG_ERR, "failed to restart worker"); + } else { + kore_msg_parent_add(kw); + } - kore_msg_parent_add(kw); - break; + break; + } } }