kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

worker.c (23289B)



      1 /*
      2  * Copyright (c) 2013-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/types.h>
     19 #include <sys/stat.h>
     20 #include <sys/shm.h>
     21 #include <sys/wait.h>
     22 #include <sys/time.h>
     23 #include <sys/resource.h>
     24 #include <sys/socket.h>
     25 
     26 #include <fcntl.h>
     27 #include <grp.h>
     28 #include <pwd.h>
     29 #include <signal.h>
     30 #include <unistd.h>
     31 
     32 #include "kore.h"
     33 
     34 #if defined(KORE_USE_ACME)
     35 #include "acme.h"
     36 #endif
     37 
     38 #if !defined(KORE_NO_HTTP)
     39 #include "http.h"
     40 #endif
     41 
     42 #if defined(KORE_USE_PGSQL)
     43 #include "pgsql.h"
     44 #endif
     45 
     46 #if defined(KORE_USE_TASKS)
     47 #include "tasks.h"
     48 #endif
     49 
     50 #if defined(KORE_USE_PYTHON)
     51 #include "python_api.h"
     52 #endif
     53 
     54 #if defined(KORE_USE_CURL)
     55 #include "curl.h"
     56 #endif
     57 
     58 #if defined(__linux__)
     59 #include "seccomp.h"
     60 #endif
     61 
     62 #define WORKER_SOLO_COUNT	3
     63 
     64 #define WORKER(id)						\
     65 	(struct kore_worker *)((u_int8_t *)kore_workers +	\
     66 	    (sizeof(struct kore_worker) * id))
     67 
     68 struct wlock {
     69 	volatile int		lock;
     70 	pid_t			current;
     71 };
     72 
     73 static int	worker_trylock(void);
     74 static void	worker_unlock(void);
     75 static void	worker_reaper(pid_t, int);
     76 static void	worker_runtime_teardown(void);
     77 static void	worker_runtime_configure(void);
     78 static void	worker_domain_check(struct kore_domain *);
     79 
     80 static struct kore_runtime_call	*worker_runtime_signal(void);
     81 
     82 static inline int	worker_acceptlock_obtain(void);
     83 static inline void	worker_acceptlock_release(void);
     84 static void		worker_accept_avail(struct kore_msg *, const void *);
     85 
     86 static void	worker_entropy_recv(struct kore_msg *, const void *);
     87 static void	worker_keymgr_response(struct kore_msg *, const void *);
     88 
     89 static pid_t				worker_pgrp;
     90 static int				accept_avail;
     91 static struct kore_worker		*kore_workers;
     92 static int				worker_no_lock;
     93 static int				shm_accept_key;
     94 static struct wlock			*accept_lock;
     95 
     96 struct kore_worker		*worker = NULL;
     97 u_int8_t			worker_set_affinity = 1;
     98 u_int32_t			worker_accept_threshold = 16;
     99 u_int32_t			worker_rlimit_nofiles = 768;
    100 u_int32_t			worker_max_connections = 512;
    101 u_int32_t			worker_active_connections = 0;
    102 u_int8_t			worker_no_accept[KORE_WORKER_MAX];
    103 int				worker_policy = KORE_WORKER_POLICY_RESTART;
    104 
    105 int
    106 kore_worker_init(void)
    107 {
    108 	size_t			len;
    109 	struct kore_worker	*kw;
    110 	u_int16_t		idx, id, cpu;
    111 
    112 	worker_no_lock = 0;
    113 
    114 	if (worker_count == 0)
    115 		worker_count = cpu_count;
    116 
    117 	/* Account for the keymgr/acme even if we don't end up starting it. */
    118 	worker_count += 2;
    119 
    120 	len = sizeof(*accept_lock) +
    121 	    (sizeof(struct kore_worker) * worker_count);
    122 
    123 	shm_accept_key = shmget(IPC_PRIVATE, len, IPC_CREAT | IPC_EXCL | 0700);
    124 	if (shm_accept_key == -1)
    125 		fatal("kore_worker_init(): shmget() %s", errno_s);
    126 	if ((accept_lock = shmat(shm_accept_key, NULL, 0)) == (void *)-1)
    127 		fatal("kore_worker_init(): shmat() %s", errno_s);
    128 
    129 	accept_lock->lock = 0;
    130 	accept_lock->current = 0;
    131 
    132 	kore_workers = (struct kore_worker *)((u_int8_t *)accept_lock +
    133 	    sizeof(*accept_lock));
    134 	memset(kore_workers, 0, sizeof(struct kore_worker) * worker_count);
    135 
    136 	if ((worker_count - 2) > cpu_count)
    137 		kore_log(LOG_NOTICE, "more worker processes than cpu cores");
    138 
    139 	/* Setup log buffers. */
    140 	for (idx = KORE_WORKER_BASE; idx < worker_count; idx++) {
    141 		kw = WORKER(idx);
    142 		kw->lb.offset = 0;
    143 	}
    144 
    145 	if (!kore_quiet)
    146 		kore_log(LOG_INFO, "starting worker processes");
    147 
    148 	if ((worker_pgrp = getpgrp()) == -1)
    149 		fatal("%s: getpgrp(): %s", __func__, errno_s);
    150 
    151 	/* Now start all the workers. */
    152 	id = 1;
    153 	cpu = 1;
    154 	for (idx = KORE_WORKER_BASE; idx < worker_count; idx++) {
    155 		if (cpu >= cpu_count)
    156 			cpu = 0;
    157 		if (!kore_worker_spawn(idx, id++, cpu++))
    158 			return (KORE_RESULT_ERROR);
    159 	}
    160 
    161 	if (kore_keymgr_active) {
    162 #if defined(KORE_USE_ACME)
    163 		/* The ACME process is only started if we need it. */
    164 		if (acme_domains) {
    165 			if (!kore_worker_spawn(KORE_WORKER_ACME_IDX,
    166 			    KORE_WORKER_ACME, 0))
    167 				return (KORE_RESULT_ERROR);
    168 		}
    169 #endif
    170 
    171 		/* Now we can start the keymgr. */
    172 		if (!kore_worker_spawn(KORE_WORKER_KEYMGR_IDX,
    173 		    KORE_WORKER_KEYMGR, 0))
    174 			return (KORE_RESULT_ERROR);
    175 	}
    176 
    177 	if (!kore_quiet)
    178 		kore_log(LOG_INFO, "all worker processes started");
    179 
    180 	return (KORE_RESULT_OK);
    181 }
    182 
    183 int
    184 kore_worker_spawn(u_int16_t idx, u_int16_t id, u_int16_t cpu)
    185 {
    186 	struct kore_worker	*kw;
    187 	int			cnt, maxcnt;
    188 #if defined(__linux__)
    189 	int			status;
    190 #endif
    191 
    192 	if (idx >= KORE_WORKER_MAX)
    193 		fatal("%u > KORE_WORKER_MAX", idx);
    194 
    195 	kw = WORKER(idx);
    196 	kw->id = id;
    197 	kw->cpu = cpu;
    198 	kw->running = 1;
    199 
    200 	kw->ready = 0;
    201 	kw->has_lock = 0;
    202 	kw->active_route = NULL;
    203 	kw->no_accept = worker_no_accept[id];
    204 
    205 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, kw->pipe) == -1)
    206 		fatal("socketpair(): %s", errno_s);
    207 
    208 	if (!kore_connection_nonblock(kw->pipe[0], 0) ||
    209 	    !kore_connection_nonblock(kw->pipe[1], 0))
    210 		fatal("could not set pipe fds to nonblocking: %s", errno_s);
    211 
    212 	maxcnt = 50;
    213 
    214 	switch (id) {
    215 	case KORE_WORKER_KEYMGR:
    216 #if defined(KORE_USE_ACME)
    217 		maxcnt = 600;
    218 #endif
    219 		kw->ps = &keymgr_privsep;
    220 		break;
    221 #if defined(KORE_USE_ACME)
    222 	case KORE_WORKER_ACME:
    223 		kw->ps = &acme_privsep;
    224 		break;
    225 #endif
    226 	default:
    227 		kw->ps = &worker_privsep;
    228 		break;
    229 	}
    230 
    231 	kw->pid = fork();
    232 	if (kw->pid == -1)
    233 		fatal("could not spawn worker child: %s", errno_s);
    234 
    235 	if (kw->pid == 0) {
    236 		kw->pid = getpid();
    237 		kore_worker_entry(kw);
    238 		exit(1);
    239 	} else {
    240 		for (cnt = 0; cnt < maxcnt; cnt++) {
    241 			if (kw->ready == 1)
    242 				break;
    243 			usleep(100000);
    244 #if defined(__linux__)
    245 			/*
    246 			 * If seccomp_tracing is enabled, make sure we
    247 			 * handle the SIGSTOP from the child processes.
    248 			 */
    249 			if (kore_seccomp_tracing) {
    250 				if (waitpid(kw->pid, &status, WNOHANG) > 0)
    251 					kore_seccomp_trace(kw->pid, status);
    252 			}
    253 #endif
    254 		}
    255 
    256 		if (kw->ready == 0) {
    257 			kore_log(LOG_NOTICE,
    258 			    "worker %d failed to start, shutting down",
    259 			    kw->id);
    260 
    261 			return (KORE_RESULT_ERROR);
    262 		}
    263 	}
    264 
    265 	return (KORE_RESULT_OK);
    266 }
    267 
    268 struct kore_worker *
    269 kore_worker_data(u_int8_t idx)
    270 {
    271 	if (idx >= worker_count)
    272 		fatal("idx %u too large for worker count", idx);
    273 
    274 	return (WORKER(idx));
    275 }
    276 
    277 struct kore_worker *
    278 kore_worker_data_byid(u_int16_t id)
    279 {
    280 	struct kore_worker	*kw;
    281 	u_int16_t		idx;
    282 
    283 	for (idx = 0; idx < worker_count; idx++) {
    284 		kw = WORKER(idx);
    285 		if (kw->id == id)
    286 			return (kw);
    287 	}
    288 
    289 	return (NULL);
    290 }
    291 
    292 void
    293 kore_worker_shutdown(void)
    294 {
    295 	struct kore_worker	*kw;
    296 	pid_t			pid;
    297 	int			status;
    298 	u_int16_t		idx, done;
    299 
    300 	if (!kore_quiet) {
    301 		kore_log(LOG_NOTICE,
    302 		    "waiting for workers to drain and shutdown");
    303 	}
    304 
    305 	for (;;) {
    306 		for (idx = 0; idx < worker_count; idx++) {
    307 			kw = WORKER(idx);
    308 			if (kw->running == 0)
    309 				continue;
    310 
    311 			if (kw->pid != 0) {
    312 				pid = waitpid(kw->pid, &status, 0);
    313 				if (pid == -1 && errno != ECHILD)
    314 					continue;
    315 
    316 #if defined(__linux__)
    317 				kore_seccomp_trace(kw->pid, status);
    318 #endif
    319 
    320 				kw->pid = 0;
    321 				kw->running = 0;
    322 
    323 				kw->msg[0]->evt.flags |= KORE_EVENT_READ;
    324 				net_recv_flush(kw->msg[0]);
    325 
    326 				if (!kore_quiet) {
    327 					kore_log(LOG_NOTICE,
    328 					    "worker %s exited (%d)",
    329 					    kore_worker_name(kw->id), status);
    330 				}
    331 			}
    332 		}
    333 
    334 		done = 0;
    335 		for (idx = 0; idx < worker_count; idx++) {
    336 			kw = WORKER(idx);
    337 			if (kw->running == 0) {
    338 				done++;
    339 				continue;
    340 			}
    341 		}
    342 
    343 		if (done == worker_count)
    344 			break;
    345 	}
    346 
    347 	if (shmctl(shm_accept_key, IPC_RMID, NULL) == -1) {
    348 		kore_log(LOG_NOTICE,
    349 		    "failed to deleted shm segment: %s", errno_s);
    350 	}
    351 }
    352 
    353 void
    354 kore_worker_dispatch_signal(int sig)
    355 {
    356 	u_int16_t		idx;
    357 	struct kore_worker	*kw;
    358 
    359 	for (idx = 0; idx < worker_count; idx++) {
    360 		kw = WORKER(idx);
    361 
    362 		if (kw->pid == -1 || kw->pid == 0)
    363 			continue;
    364 
    365 		if (kill(kw->pid, sig) == -1) {
    366 			kore_log(LOG_WARNING, "kill(%d, %d): %s",
    367 			    kw->pid, sig, errno_s);
    368 		}
    369 	}
    370 }
    371 
    372 void
    373 kore_worker_privsep(void)
    374 {
    375 	rlim_t			fd;
    376 	struct rlimit		rl;
    377 	struct passwd		*pw;
    378 
    379 	if (worker == NULL)
    380 		fatalx("%s called with no worker", __func__);
    381 
    382 	if (worker_set_affinity == 1)
    383 		kore_platform_worker_setcpu(worker);
    384 
    385 	pw = NULL;
    386 
    387 	/* Must happen before chroot. */
    388 	if (worker->ps->skip_runas == 0) {
    389 		if (worker->ps->runas == NULL) {
    390 			fatalx("no runas user given for %s",
    391 			    kore_worker_name(worker->id));
    392 		}
    393 
    394 		if ((pw = getpwnam(worker->ps->runas)) == NULL) {
    395 			fatalx("cannot getpwnam(\"%s\") for user: %s",
    396 			    worker->ps->runas, errno_s);
    397 		}
    398 	}
    399 
    400 	if (worker->ps->skip_chroot == 0) {
    401 		if (chroot(worker->ps->root) == -1) {
    402 			fatalx("cannot chroot(\"%s\"): %s",
    403 			    worker->ps->root, errno_s);
    404 		}
    405 
    406 		if (chdir("/") == -1)
    407 			fatalx("cannot chdir(\"/\"): %s", errno_s);
    408 	} else {
    409 		if (chdir(worker->ps->root) == -1) {
    410 			fatalx("cannot chdir(\"%s\"): %s",
    411 			    worker->ps->root, errno_s);
    412 		}
    413 	}
    414 
    415 	if (getrlimit(RLIMIT_NOFILE, &rl) == -1) {
    416 		kore_log(LOG_WARNING, "getrlimit(RLIMIT_NOFILE): %s", errno_s);
    417 	} else {
    418 		for (fd = 0; fd < rl.rlim_cur; fd++) {
    419 			if (fcntl(fd, F_GETFD, NULL) != -1) {
    420 				worker_rlimit_nofiles++;
    421 			}
    422 		}
    423 	}
    424 
    425 	rl.rlim_cur = worker_rlimit_nofiles;
    426 	rl.rlim_max = worker_rlimit_nofiles;
    427 	if (setrlimit(RLIMIT_NOFILE, &rl) == -1) {
    428 		kore_log(LOG_ERR, "setrlimit(RLIMIT_NOFILE, %u): %s",
    429 		    worker_rlimit_nofiles, errno_s);
    430 	}
    431 
    432 	if (worker->ps->skip_runas == 0) {
    433 		if (setgroups(1, &pw->pw_gid) ||
    434 #if defined(__MACH__) || defined(NetBSD)
    435 		    setgid(pw->pw_gid) || setegid(pw->pw_gid) ||
    436 		    setuid(pw->pw_uid) || seteuid(pw->pw_uid))
    437 #else
    438 		    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
    439 		    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
    440 #endif
    441 			fatalx("cannot drop privileges (%s)", errno_s);
    442 	}
    443 
    444 	kore_platform_sandbox();
    445 }
    446 
    447 void
    448 kore_worker_no_accept(u_int8_t idx)
    449 {
    450 	worker_no_accept[idx] = 1;
    451 }
    452 
    453 void
    454 kore_worker_entry(struct kore_worker *kw)
    455 {
    456 	struct kore_runtime_call	*sigcall;
    457 	u_int64_t			last_seed;
    458 	int				quit, had_lock, sig;
    459 	u_int64_t			netwait, now, next_timeo;
    460 
    461 	worker = kw;
    462 
    463 	if (!kore_foreground)
    464 		closelog();
    465 
    466 #if defined(__linux__)
    467 	kore_seccomp_traceme();
    468 #endif
    469 
    470 	kore_platform_proctitle(kore_worker_name(kw->id));
    471 
    472 	kore_pid = kw->pid;
    473 	kore_signal_setup();
    474 
    475 	if (kw->id == KORE_WORKER_KEYMGR) {
    476 		kore_keymgr_run();
    477 		exit(0);
    478 	}
    479 
    480 #if defined(KORE_USE_ACME)
    481 	if (kw->id == KORE_WORKER_ACME) {
    482 		kore_acme_run();
    483 		exit(0);
    484 	}
    485 #endif
    486 
    487 	net_init();
    488 	kore_connection_init();
    489 	kore_platform_event_init();
    490 	kore_msg_worker_init();
    491 
    492 #if defined(KORE_USE_TASKS)
    493 	kore_task_init();
    494 #endif
    495 
    496 	kore_worker_privsep();
    497 
    498 #if !defined(KORE_NO_HTTP)
    499 	http_init();
    500 	kore_filemap_resolve_paths();
    501 	kore_accesslog_worker_init();
    502 #endif
    503 	kore_timer_init();
    504 	kore_fileref_init();
    505 	kore_tls_keymgr_init();
    506 
    507 	quit = 0;
    508 	had_lock = 0;
    509 	next_timeo = 0;
    510 	accept_avail = 1;
    511 	worker_active_connections = 0;
    512 
    513 	last_seed = 0;
    514 
    515 	if (kore_keymgr_active) {
    516 		kore_msg_register(KORE_MSG_CRL, worker_keymgr_response);
    517 		kore_msg_register(KORE_MSG_ENTROPY_RESP, worker_entropy_recv);
    518 		kore_msg_register(KORE_MSG_CERTIFICATE, worker_keymgr_response);
    519 
    520 		if (worker->restarted) {
    521 			kore_msg_send(KORE_WORKER_KEYMGR,
    522 			    KORE_MSG_CERTIFICATE_REQ, NULL, 0);
    523 		}
    524 #if defined(KORE_USE_ACME)
    525 		kore_msg_register(KORE_ACME_CHALLENGE_SET_CERT,
    526 		    worker_keymgr_response);
    527 		kore_msg_register(KORE_ACME_CHALLENGE_CLEAR_CERT,
    528 		    worker_keymgr_response);
    529 #endif
    530 	}
    531 
    532 	kore_msg_register(KORE_MSG_ACCEPT_AVAILABLE, worker_accept_avail);
    533 
    534 	if (nlisteners == 0)
    535 		worker_no_lock = 1;
    536 
    537 	worker_runtime_configure();
    538 
    539 	kore_module_onload();
    540 	kore_domain_callback(worker_domain_check);
    541 
    542 	kore_worker_started();
    543 	worker->restarted = 0;
    544 
    545 	sigcall = worker_runtime_signal();
    546 
    547 	for (;;) {
    548 		now = kore_time_ms();
    549 
    550 		if (kore_keymgr_active &&
    551 		    (now - last_seed) > KORE_RESEED_TIME) {
    552 			kore_msg_send(KORE_WORKER_KEYMGR,
    553 			    KORE_MSG_ENTROPY_REQ, NULL, 0);
    554 			last_seed = now;
    555 		}
    556 
    557 		if (worker->no_accept == 0 &&
    558 		    !worker->has_lock && accept_avail) {
    559 			if (worker_acceptlock_obtain()) {
    560 				accept_avail = 0;
    561 				if (had_lock == 0) {
    562 					kore_platform_enable_accept();
    563 					had_lock = 1;
    564 				}
    565 			}
    566 		}
    567 
    568 		netwait = kore_timer_next_run(now);
    569 
    570 		if (netwait == KORE_WAIT_INFINITE) {
    571 			if (sig_recv != 0)
    572 				netwait = 10;
    573 #if !defined(KORE_NO_HTTP)
    574 			if (http_request_count > 0)
    575 				netwait = 100;
    576 #endif
    577 		}
    578 
    579 #if defined(KORE_USE_PYTHON)
    580 		if (kore_python_coro_pending())
    581 			netwait = 0;
    582 #endif
    583 
    584 		kore_platform_event_wait(netwait);
    585 		now = kore_time_ms();
    586 
    587 		if (worker->no_accept == 0) {
    588 			if (worker->has_lock)
    589 				worker_acceptlock_release();
    590 
    591 			if (!worker->has_lock) {
    592 				if (had_lock == 1) {
    593 					had_lock = 0;
    594 					kore_platform_disable_accept();
    595 				}
    596 			}
    597 		}
    598 
    599 		sig = sig_recv;
    600 		if (sig != 0) {
    601 			switch (sig) {
    602 			case SIGHUP:
    603 				kore_module_reload(1);
    604 				break;
    605 			case SIGQUIT:
    606 			case SIGINT:
    607 			case SIGTERM:
    608 				quit = 1;
    609 				break;
    610 			case SIGCHLD:
    611 #if defined(KORE_USE_PYTHON)
    612 				kore_python_proc_reap();
    613 #endif
    614 				break;
    615 			default:
    616 				break;
    617 			}
    618 
    619 			if (sigcall != NULL)
    620 				kore_runtime_signal(sigcall, sig);
    621 
    622 			if (sig == sig_recv)
    623 				sig_recv = 0;
    624 		}
    625 
    626 		if (quit)
    627 			break;
    628 
    629 		kore_timer_run(now);
    630 #if defined(KORE_USE_CURL)
    631 		kore_curl_run_scheduled();
    632 		kore_curl_do_timeout();
    633 #endif
    634 #if !defined(KORE_NO_HTTP)
    635 		if (worker->no_accept == 0)
    636 			http_process();
    637 #endif
    638 #if defined(KORE_USE_PYTHON)
    639 		kore_python_coro_run();
    640 #endif
    641 		if (next_timeo <= now) {
    642 			kore_connection_check_timeout(now);
    643 			next_timeo = now + 500;
    644 		}
    645 
    646 		kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
    647 	}
    648 
    649 	worker_runtime_teardown();
    650 	kore_server_cleanup();
    651 
    652 	kore_platform_event_cleanup();
    653 	kore_connection_cleanup();
    654 	kore_domain_cleanup();
    655 	kore_tls_cleanup();
    656 	kore_module_cleanup();
    657 #if !defined(KORE_NO_HTTP)
    658 	http_cleanup();
    659 #endif
    660 	net_cleanup();
    661 
    662 #if defined(KORE_USE_PYTHON)
    663 	kore_python_cleanup();
    664 #endif
    665 
    666 #if defined(KORE_USE_PGSQL)
    667 	kore_pgsql_sys_cleanup();
    668 #endif
    669 
    670 	kore_mem_cleanup();
    671 	exit(0);
    672 }
    673 
    674 void
    675 kore_worker_reap(void)
    676 {
    677 	pid_t			pid;
    678 	int			status;
    679 
    680 	for (;;) {
    681 		pid = waitpid(-worker_pgrp, &status, WNOHANG);
    682 
    683 		if (pid == -1) {
    684 			if (errno != ECHILD && errno != EINTR) {
    685 				kore_log(LOG_ERR,
    686 				    "%s: waitpid(): %s", __func__, errno_s);
    687 			}
    688 			break;
    689 		}
    690 
    691 		if (pid == 0)
    692 			break;
    693 
    694 		worker_reaper(pid, status);
    695 	}
    696 }
    697 
    698 void
    699 kore_worker_make_busy(void)
    700 {
    701 	if (worker->no_accept)
    702 		fatal("%s called with no_accept set", __func__);
    703 
    704 	if (worker_count == WORKER_SOLO_COUNT || worker_no_lock == 1)
    705 		return;
    706 
    707 	if (worker->has_lock) {
    708 		worker_unlock();
    709 		worker->has_lock = 0;
    710 		kore_msg_send(KORE_MSG_WORKER_ALL,
    711 		    KORE_MSG_ACCEPT_AVAILABLE, NULL, 0);
    712 	}
    713 }
    714 
    715 int
    716 kore_worker_keymgr_response_verify(struct kore_msg *msg, const void *data,
    717     struct kore_domain **out)
    718 {
    719 	struct kore_server		*srv;
    720 	struct kore_domain		*dom;
    721 	const struct kore_x509_msg	*req;
    722 
    723 	if (msg->length < sizeof(*req)) {
    724 		kore_log(LOG_WARNING,
    725 		    "short keymgr message (%zu)", msg->length);
    726 		return (KORE_RESULT_ERROR);
    727 	}
    728 
    729 	req = (const struct kore_x509_msg *)data;
    730 	if (msg->length != (sizeof(*req) + req->data_len)) {
    731 		kore_log(LOG_WARNING,
    732 		    "invalid keymgr payload (%zu)", msg->length);
    733 		return (KORE_RESULT_ERROR);
    734 	}
    735 
    736 	if (req->domain[KORE_DOMAINNAME_LEN] != '\0') {
    737 		kore_log(LOG_WARNING, "domain not NUL-terminated");
    738 		return (KORE_RESULT_ERROR);
    739 
    740 	}
    741 
    742 	if (out == NULL)
    743 		return (KORE_RESULT_OK);
    744 
    745 	dom = NULL;
    746 
    747 	LIST_FOREACH(srv, &kore_servers, list) {
    748 		dom = NULL;
    749 
    750 		if (srv->tls == 0)
    751 			continue;
    752 
    753 		TAILQ_FOREACH(dom, &srv->domains, list) {
    754 			if (!strcmp(dom->domain, req->domain))
    755 				break;
    756 		}
    757 
    758 		if (dom != NULL)
    759 			break;
    760 	}
    761 
    762 	if (dom == NULL) {
    763 		kore_log(LOG_WARNING,
    764 		    "got keymgr response for domain that does not exist");
    765 		return (KORE_RESULT_ERROR);
    766 	}
    767 
    768 	*out = dom;
    769 
    770 	return (KORE_RESULT_OK);
    771 }
    772 
    773 void
    774 kore_worker_started(void)
    775 {
    776 	const char	*chroot;
    777 
    778 	if (worker->ps->skip_chroot)
    779 		chroot = "root";
    780 	else
    781 		chroot = "chroot";
    782 
    783 	if (!kore_quiet) {
    784 		kore_log(LOG_NOTICE,
    785 		    "started (#%d %s=%s%s%s)",
    786 		    getpid(), chroot, worker->ps->root,
    787 		    worker->ps->skip_runas ? "" : " user=",
    788 		    worker->ps->skip_runas ? "" : worker->ps->runas);
    789 	}
    790 
    791 	worker->ready = 1;
    792 }
    793 
    794 static void
    795 worker_runtime_configure(void)
    796 {
    797 	struct kore_runtime_call	*rcall;
    798 
    799 	rcall = NULL;
    800 
    801 #if defined(KORE_USE_PYTHON)
    802 	rcall = kore_runtime_getcall(KORE_PYTHON_WORKER_START_HOOK);
    803 #endif
    804 
    805 	if (rcall == NULL)
    806 		rcall = kore_runtime_getcall("kore_worker_configure");
    807 
    808 	if (rcall != NULL) {
    809 		kore_runtime_execute(rcall);
    810 		kore_free(rcall);
    811 	}
    812 }
    813 
    814 static struct kore_runtime_call *
    815 worker_runtime_signal(void)
    816 {
    817 	struct kore_runtime_call	*rcall;
    818 
    819 	rcall = NULL;
    820 
    821 #if defined(KORE_USE_PYTHON)
    822 	rcall = kore_runtime_getcall(KORE_PYTHON_SIGNAL_HOOK);
    823 #endif
    824 
    825 	if (rcall == NULL)
    826 		rcall = kore_runtime_getcall("kore_worker_signal");
    827 
    828 	return (rcall);
    829 }
    830 
    831 static void
    832 worker_runtime_teardown(void)
    833 {
    834 	struct kore_runtime_call	*rcall;
    835 
    836 	rcall = NULL;
    837 
    838 #if defined(KORE_USE_PYTHON)
    839 	rcall = kore_runtime_getcall(KORE_PYTHON_WORKER_STOP_HOOK);
    840 #endif
    841 
    842 	if (rcall == NULL)
    843 		rcall = kore_runtime_getcall("kore_worker_teardown");
    844 
    845 	if (rcall != NULL) {
    846 		kore_runtime_execute(rcall);
    847 		kore_free(rcall);
    848 	}
    849 }
    850 
    851 static void
    852 worker_domain_check(struct kore_domain *dom)
    853 {
    854 	struct stat	st;
    855 
    856 	if (dom->cafile != NULL) {
    857 		if (stat(dom->cafile, &st) == -1)
    858 			fatalx("'%s': %s", dom->cafile, errno_s);
    859 		if (access(dom->cafile, R_OK) == -1)
    860 			fatalx("'%s': not readable", dom->cafile);
    861 	}
    862 }
    863 
    864 static void
    865 worker_reaper(pid_t pid, int status)
    866 {
    867 	u_int16_t		idx;
    868 	struct kore_worker	*kw;
    869 	const char		*func;
    870 
    871 #if defined(__linux__)
    872 	if (kore_seccomp_trace(pid, status))
    873 		return;
    874 #endif
    875 
    876 	for (idx = 0; idx < worker_count; idx++) {
    877 		kw = WORKER(idx);
    878 		if (kw->pid != pid)
    879 			continue;
    880 
    881 		kw->msg[0]->evt.flags |= KORE_EVENT_READ;
    882 		net_recv_flush(kw->msg[0]);
    883 
    884 		if (!kore_quiet) {
    885 			kore_log(LOG_NOTICE,
    886 			    "worker %s (%d) exited with status %d",
    887 			    kore_worker_name(kw->id), pid, status);
    888 		}
    889 
    890 		kw->running = 0;
    891 
    892 		if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
    893 			kw->pid = 0;
    894 			break;
    895 		}
    896 
    897 		func = "none";
    898 #if !defined(KORE_NO_HTTP)
    899 		if (kw->active_route != NULL)
    900 			func = kw->active_route->func;
    901 #endif
    902 		kore_log(LOG_NOTICE,
    903 		    "worker %d (pid: %d) (hdlr: %s) gone",
    904 		    kw->id, kw->pid, func);
    905 
    906 #if defined(__linux__)
    907 		if (WIFSIGNALED(status) && WTERMSIG(status) == SIGSYS) {
    908 			kore_log(LOG_NOTICE,
    909 			    "worker %d died from sandbox violation", kw->id);
    910 		}
    911 #endif
    912 
    913 		if (kw->id == KORE_WORKER_KEYMGR ||
    914 		    kw->id == KORE_WORKER_ACME) {
    915 			kore_log(LOG_CRIT,
    916 			    "keymgr or acme process gone, stopping");
    917 			kw->pid = 0;
    918 			kore_quit = KORE_QUIT_FATAL;
    919 			break;
    920 		}
    921 
    922 		if (kw->pid == accept_lock->current &&
    923 		    worker_no_lock == 0)
    924 			worker_unlock();
    925 
    926 #if !defined(KORE_NO_HTTP)
    927 		if (kw->active_route != NULL) {
    928 			kw->active_route->errors++;
    929 			kore_log(LOG_NOTICE,
    930 			    "hdlr %s has caused %d error(s)",
    931 			    kw->active_route->func,
    932 			    kw->active_route->errors);
    933 		}
    934 #endif
    935 
    936 		if (worker_policy == KORE_WORKER_POLICY_TERMINATE) {
    937 			kw->pid = 0;
    938 			kore_log(LOG_NOTICE,
    939 			    "worker policy is 'terminate', stopping");
    940 			kore_quit = KORE_QUIT_FATAL;
    941 			break;
    942 		}
    943 
    944 		if (kore_quit == KORE_QUIT_NONE) {
    945 			kore_log(LOG_NOTICE, "restarting worker %d", kw->id);
    946 			kw->restarted = 1;
    947 			kore_msg_parent_remove(kw);
    948 
    949 			if (!kore_worker_spawn(idx, kw->id, kw->cpu)) {
    950 				kore_quit = KORE_QUIT_FATAL;
    951 				kore_log(LOG_ERR, "failed to restart worker");
    952 			} else {
    953 				kore_msg_parent_add(kw);
    954 			}
    955 
    956 			break;
    957 		}
    958 	}
    959 }
    960 
    961 static inline void
    962 worker_acceptlock_release(void)
    963 {
    964 	if (worker->no_accept)
    965 		fatal("%s called with no_accept set", __func__);
    966 
    967 	if (worker_count == WORKER_SOLO_COUNT || worker_no_lock == 1)
    968 		return;
    969 
    970 	if (worker->has_lock != 1)
    971 		return;
    972 
    973 	if (worker_active_connections < worker_max_connections) {
    974 #if !defined(KORE_NO_HTTP)
    975 		if (http_request_count < http_request_limit)
    976 			return;
    977 #else
    978 		return;
    979 #endif
    980 	}
    981 
    982 #if defined(WORKER_DEBUG)
    983 	kore_log(LOG_DEBUG, "worker busy, releasing lock");
    984 #endif
    985 
    986 	worker_unlock();
    987 	worker->has_lock = 0;
    988 
    989 	kore_msg_send(KORE_MSG_WORKER_ALL, KORE_MSG_ACCEPT_AVAILABLE, NULL, 0);
    990 }
    991 
    992 static inline int
    993 worker_acceptlock_obtain(void)
    994 {
    995 	int		r;
    996 
    997 	if (worker->no_accept)
    998 		fatal("%s called with no_accept set", __func__);
    999 
   1000 	if (worker->has_lock == 1)
   1001 		return (1);
   1002 
   1003 	if (worker_count == WORKER_SOLO_COUNT || worker_no_lock == 1) {
   1004 		worker->has_lock = 1;
   1005 		return (1);
   1006 	}
   1007 
   1008 	if (worker_active_connections >= worker_max_connections)
   1009 		return (0);
   1010 
   1011 #if !defined(KORE_NO_HTTP)
   1012 	if (http_request_count >= http_request_limit)
   1013 		return (0);
   1014 #endif
   1015 
   1016 	r = 0;
   1017 	if (worker_trylock()) {
   1018 		r = 1;
   1019 		worker->has_lock = 1;
   1020 #if defined(WORKER_DEBUG)
   1021 		kore_log(LOG_DEBUG, "got lock");
   1022 #endif
   1023 	}
   1024 
   1025 	return (r);
   1026 }
   1027 
   1028 static int
   1029 worker_trylock(void)
   1030 {
   1031 	if (worker->no_accept)
   1032 		fatal("%s called with no_accept set", __func__);
   1033 
   1034 	if (!__sync_bool_compare_and_swap(&(accept_lock->lock), 0, 1))
   1035 		return (0);
   1036 
   1037 	accept_lock->current = worker->pid;
   1038 
   1039 	return (1);
   1040 }
   1041 
   1042 static void
   1043 worker_unlock(void)
   1044 {
   1045 	if (worker->no_accept)
   1046 		fatal("%s called with no_accept set", __func__);
   1047 
   1048 	accept_lock->current = 0;
   1049 	if (!__sync_bool_compare_and_swap(&(accept_lock->lock), 1, 0))
   1050 		kore_log(LOG_NOTICE, "worker_unlock(): wasn't locked");
   1051 }
   1052 
   1053 static void
   1054 worker_accept_avail(struct kore_msg *msg, const void *data)
   1055 {
   1056 	if (worker->no_accept)
   1057 		fatal("%s called with no_accept set", __func__);
   1058 
   1059 	accept_avail = 1;
   1060 }
   1061 
   1062 static void
   1063 worker_entropy_recv(struct kore_msg *msg, const void *data)
   1064 {
   1065 	if (msg->length != 1024) {
   1066 		kore_log(LOG_WARNING,
   1067 		    "invalid entropy response (got:%zu - wanted:1024)",
   1068 		    msg->length);
   1069 	}
   1070 
   1071 	kore_tls_seed(data, msg->length);
   1072 }
   1073 
   1074 static void
   1075 worker_keymgr_response(struct kore_msg *msg, const void *data)
   1076 {
   1077 	struct kore_domain		*dom;
   1078 	const struct kore_x509_msg	*req;
   1079 
   1080 	if (!kore_worker_keymgr_response_verify(msg, data, &dom))
   1081 		return;
   1082 
   1083 	req = (const struct kore_x509_msg *)data;
   1084 
   1085 	switch (msg->id) {
   1086 	case KORE_MSG_CERTIFICATE:
   1087 		kore_tls_domain_setup(dom, KORE_PEM_CERT_CHAIN,
   1088 		    req->data, req->data_len);
   1089 		break;
   1090 	case KORE_MSG_CRL:
   1091 		kore_tls_domain_crl(dom, req->data, req->data_len);
   1092 		break;
   1093 #if defined(KORE_USE_ACME)
   1094 	case KORE_ACME_CHALLENGE_SET_CERT:
   1095 		if (dom->tls_ctx == NULL) {
   1096 			kore_tls_domain_setup(dom, KORE_DER_CERT_DATA,
   1097 			    req->data, req->data_len);
   1098 		}
   1099 
   1100 		kore_free(dom->acme_cert);
   1101 		dom->acme_cert_len = req->data_len;
   1102 		dom->acme_cert = kore_calloc(1, req->data_len);
   1103 		memcpy(dom->acme_cert, req->data, req->data_len);
   1104 
   1105 		kore_log(LOG_NOTICE, "[%s] tls-alpn-01 challenge active",
   1106 		    dom->domain);
   1107 		dom->acme_challenge = 1;
   1108 		break;
   1109 	case KORE_ACME_CHALLENGE_CLEAR_CERT:
   1110 		dom->acme_cert_len = 0;
   1111 		dom->acme_challenge = 0;
   1112 
   1113 		kore_free(dom->acme_cert);
   1114 		dom->acme_cert = NULL;
   1115 
   1116 		kore_log(LOG_NOTICE, "[%s] tls-alpn-01 challenge disabled",
   1117 		    dom->domain);
   1118 		break;
   1119 #endif
   1120 	default:
   1121 		kore_log(LOG_WARNING, "unknown keymgr request %u", msg->id);
   1122 		break;
   1123 	}
   1124 }