commit 0dda6f996fcadccb415ab5836495bfe41c776df5
parent 315f964abdac8347d4d5d18b478b9c91ad874747
Author: Joris Vink <joris@coders.se>
Date: Wed, 26 Jun 2013 16:37:22 +0200
Add a form of synchronization between what worker will be accepting
new connections and which ones will not be notified for it.
Fixes the thundering herd problem, and nicely spreads out load between
all the workers equally. A configuration option (workers_max_connections)
is available to tweak how many connections a worker will have before
giving up the accept lock.
Two ways are added to this commit for access locking:
- Locking via semaphores.
- Locking via GCC's builtin atomic methods.
The default is running with semaphores disabled (OpenBSD cannot do
sem_init() with pshared set to 1, which is required).
If you want to use semaphores add KORE_USE_SEMAPHORES to CFLAGS,
and -lpthread to LDFLAGS in the Makefile.
Other fixes:
- BSD: add a timeout to kevent().
- Merge kore_worker_wait together, linux knows waitpid() as well.
- Send the correct SIGQUIT signal to workers instead of SIGINT.
- Fix kore_time_ms().
- Log fatal worker messages in syslog.
- Refactor code even more.
- Do not free our own kore_worker structure.
Diffstat:
10 files changed, 333 insertions(+), 186 deletions(-)
diff --git a/Makefile b/Makefile
@@ -18,7 +18,7 @@ default:
@echo "Please specify a build target [linux | bsd]"
linux:
- @LDFLAGS=-ldl CFLAGS=-D_GNU_SOURCE=1 S_SRC=src/linux.c make kore
+ @LDFLAGS="-ldl" CFLAGS="-D_GNU_SOURCE=1" S_SRC=src/linux.c make kore
bsd:
@S_SRC=src/bsd.c make kore
diff --git a/includes/kore.h b/includes/kore.h
@@ -112,8 +112,6 @@ struct kore_worker {
TAILQ_ENTRY(kore_worker) list;
};
-TAILQ_HEAD(kore_worker_h, kore_worker);
-
struct kore_domain {
char *domain;
char *certfile;
@@ -152,23 +150,28 @@ extern char *config_file;
extern u_int16_t cpu_count;
extern u_int8_t worker_count;
+extern u_int32_t worker_max_connections;
extern struct listener server;
extern struct kore_worker *worker;
-extern struct kore_worker_h kore_workers;
extern struct kore_domain_h domains;
extern struct kore_domain *primary_dom;
extern struct passwd *pw;
void kore_signal(int);
+void kore_worker_wait(int);
void kore_worker_init(void);
+void kore_worker_shutdown(void);
+void kore_worker_dispatch_signal(int);
void kore_worker_connection_add(struct connection *);
-void kore_worker_connection_move(struct connection *c);
+void kore_worker_connection_move(struct connection *);
+void kore_worker_connection_remove(struct connection *);
void kore_platform_event_init(void);
-void kore_platform_event_wait(int);
-void kore_platform_worker_wait(int);
+void kore_platform_event_wait(void);
void kore_platform_proctitle(char *);
+void kore_platform_enable_accept(void);
+void kore_platform_disable_accept(void);
void kore_platform_event_schedule(int, int, int, void *);
void kore_platform_worker_setcpu(struct kore_worker *);
@@ -181,6 +184,7 @@ void kore_worker_entry(struct kore_worker *);
int kore_ssl_sni_cb(SSL *, int *, void *);
int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *);
+int kore_connection_nonblock(int);
int kore_connection_handle(struct connection *);
void kore_connection_remove(struct connection *);
void kore_connection_disconnect(struct connection *);
diff --git a/modules/example/module.conf b/modules/example/module.conf
@@ -1,8 +1,7 @@
# Example Kore configuration
# Server configuration.
-#bind 10.211.55.3 443
-bind 92.243.14.169 443
+bind 10.211.55.3 443
# The path worker processes will chroot too after starting.
#chroot /home/joris/src/kore
@@ -15,6 +14,10 @@ runas joris
# kore will automatically distribute all workers on them.
workers 2
+# The number of active connections each worker can handle.
+# You might have to tweak this number based on your hardware.
+worker_max_connections 50
+
# Store the main process its pid in this file.
#pidfile /var/run/kore.pid
@@ -42,7 +45,7 @@ load modules/example/example.module
# handler path module_callback
# Example domain that responds to 10.211.55.33.
-domain 92.243.14.169 {
+domain 10.211.55.3 {
certfile cert/server.crt
certkey cert/server.key
accesslog /var/log/kore_access.log
diff --git a/src/bsd.c b/src/bsd.c
@@ -34,7 +34,6 @@
#include <stdlib.h>
#include <string.h>
#include <syslog.h>
-#include <unistd.h>
#include <time.h>
#include <regex.h>
#include <zlib.h>
@@ -57,50 +56,6 @@ kore_platform_init(void)
}
void
-kore_platform_worker_wait(int final)
-{
- pid_t pid;
- int status;
- struct kore_worker k, *kw, *next;
-
- if (final)
- pid = waitpid(WAIT_ANY, &status, 0);
- else
- pid = waitpid(WAIT_ANY, &status, WNOHANG);
-
- if (pid == -1) {
- kore_debug("waitpid(): %s", errno_s);
- return;
- }
-
- if (pid == 0)
- return;
-
- for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) {
- next = TAILQ_NEXT(kw, list);
- if (kw->pid != pid)
- continue;
-
- k = *kw;
- TAILQ_REMOVE(&kore_workers, kw, list);
- kore_log(LOG_NOTICE, "worker %d (%d)-> status %d",
- kw->id, pid, status);
- free(kw);
-
- if (final)
- continue;
-
- if (WEXITSTATUS(status) || WTERMSIG(status) ||
- WCOREDUMP(status)) {
- kore_log(LOG_NOTICE,
- "worker %d (pid: %d) gone, respawning new one",
- k.id, k.pid);
- kore_worker_spawn(0);
- }
- }
-}
-
-void
kore_platform_worker_setcpu(struct kore_worker *kw)
{
}
@@ -114,16 +69,21 @@ kore_platform_event_init(void)
nchanges = 0;
events = kore_calloc(KQUEUE_EVENTS, sizeof(struct kevent));
changelist = kore_calloc(KQUEUE_EVENTS, sizeof(struct kevent));
- kore_platform_event_schedule(server.fd, EVFILT_READ, EV_ADD, &server);
+
+ kore_platform_event_schedule(server.fd,
+ EVFILT_READ, EV_ADD | EV_DISABLE, &server);
}
void
-kore_platform_event_wait(int quit)
+kore_platform_event_wait(void)
{
struct connection *c;
+ struct timespec timeo;
int n, i, *fd;
- n = kevent(kfd, changelist, nchanges, events, KQUEUE_EVENTS, NULL);
+ timeo.tv_sec = 0;
+ timeo.tv_nsec = 100000000;
+ n = kevent(kfd, changelist, nchanges, events, KQUEUE_EVENTS, &timeo);
if (n == -1) {
if (errno == EINTR)
return;
@@ -148,16 +108,14 @@ kore_platform_event_wait(int quit)
}
if (*fd == server.fd) {
- if (!quit) {
- kore_connection_accept(&server, &c);
- if (c == NULL)
- continue;
-
- kore_platform_event_schedule(c->fd,
- EVFILT_READ, EV_ADD, c);
- kore_platform_event_schedule(c->fd,
- EVFILT_WRITE, EV_ADD | EV_ONESHOT, c);
- }
+ kore_connection_accept(&server, &c);
+ if (c == NULL)
+ continue;
+
+ kore_platform_event_schedule(c->fd,
+ EVFILT_READ, EV_ADD, c);
+ kore_platform_event_schedule(c->fd,
+ EVFILT_WRITE, EV_ADD | EV_ONESHOT, c);
} else {
c = (struct connection *)events[i].udata;
if (events[i].filter == EVFILT_READ)
@@ -191,6 +149,20 @@ kore_platform_event_schedule(int fd, int type, int flags, void *data)
}
void
+kore_platform_enable_accept(void)
+{
+ kore_platform_event_schedule(server.fd,
+ EVFILT_READ, EV_ENABLE, &server);
+}
+
+void
+kore_platform_disable_accept(void)
+{
+ kore_platform_event_schedule(server.fd,
+ EVFILT_READ, EV_DISABLE, NULL);
+}
+
+void
kore_platform_proctitle(char *title)
{
setproctitle("%s", title);
diff --git a/src/config.c b/src/config.c
@@ -51,26 +51,28 @@ static int configure_pidfile(char **);
static int configure_accesslog(char **);
static int configure_certfile(char **);
static int configure_certkey(char **);
+static int configure_max_connections(char **);
static void domain_sslstart(void);
static struct {
const char *name;
int (*configure)(char **);
} config_names[] = {
- { "bind", configure_bind },
- { "load", configure_load },
- { "onload", configure_onload },
- { "static", configure_handler },
- { "dynamic", configure_handler },
- { "domain", configure_domain },
- { "chroot", configure_chroot },
- { "runas", configure_runas },
- { "workers", configure_workers },
- { "pidfile", configure_pidfile },
- { "accesslog", configure_accesslog },
- { "certfile", configure_certfile },
- { "certkey", configure_certkey },
- { NULL, NULL },
+ { "bind", configure_bind },
+ { "load", configure_load },
+ { "onload", configure_onload },
+ { "static", configure_handler },
+ { "dynamic", configure_handler },
+ { "domain", configure_domain },
+ { "chroot", configure_chroot },
+ { "runas", configure_runas },
+ { "workers", configure_workers },
+ { "worker_max_connections", configure_max_connections },
+ { "pidfile", configure_pidfile },
+ { "accesslog", configure_accesslog },
+ { "certfile", configure_certfile },
+ { "certkey", configure_certkey },
+ { NULL, NULL },
};
char *config_file = NULL;
@@ -371,6 +373,21 @@ configure_certkey(char **argv)
return (KORE_RESULT_OK);
}
+static int
+configure_max_connections(char **argv)
+{
+ int err;
+
+ if (argv[1] == NULL)
+ return (KORE_RESULT_ERROR);
+
+ worker_max_connections = kore_strtonum(argv[1], 1, 65535, &err);
+ if (err != KORE_RESULT_OK)
+ return (KORE_RESULT_ERROR);
+
+ return (KORE_RESULT_OK);
+}
+
static void
domain_sslstart(void)
{
diff --git a/src/connection.c b/src/connection.c
@@ -36,7 +36,6 @@
#include <string.h>
#include <sched.h>
#include <syslog.h>
-#include <unistd.h>
#include <time.h>
#include <regex.h>
#include <zlib.h>
@@ -46,8 +45,6 @@
#include "kore.h"
#include "http.h"
-static int kore_connection_nonblock(int);
-
int
kore_connection_accept(struct listener *l, struct connection **out)
{
@@ -187,8 +184,8 @@ kore_connection_remove(struct connection *c)
if (c->ssl != NULL)
SSL_free(c->ssl);
-
close(c->fd);
+
if (c->inflate_started)
inflateEnd(&(c->z_inflate));
if (c->deflate_started)
@@ -221,10 +218,11 @@ kore_connection_remove(struct connection *c)
free(s);
}
+ kore_worker_connection_remove(c);
free(c);
}
-static int
+int
kore_connection_nonblock(int fd)
{
int flags;
diff --git a/src/kore.c b/src/kore.c
@@ -28,7 +28,6 @@
#include <pwd.h>
#include <errno.h>
-#include <grp.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
@@ -36,7 +35,6 @@
#include <string.h>
#include <sched.h>
#include <syslog.h>
-#include <unistd.h>
#include <time.h>
#include <regex.h>
#include <zlib.h>
@@ -77,7 +75,6 @@ int
main(int argc, char *argv[])
{
int ch;
- struct kore_worker *kw, *next;
if (getuid() != 0)
fatal("kore must be started as root");
@@ -114,41 +111,12 @@ main(int argc, char *argv[])
kore_server_start();
- for (;;) {
- if (sig_recv != 0) {
- if (sig_recv == SIGHUP) {
- kore_module_reload();
- TAILQ_FOREACH(kw, &kore_workers, list) {
- if (kill(kw->pid, SIGHUP) == -1) {
- kore_debug("kill(%d, HUP): %s",
- kw->pid, errno_s);
- }
- }
- } else if (sig_recv == SIGQUIT) {
- break;
- }
- sig_recv = 0;
- }
-
- if (!kore_accesslog_wait())
- break;
- kore_platform_worker_wait(0);
- }
-
- for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) {
- next = TAILQ_NEXT(kw, list);
- if (kill(kw->pid, SIGINT) == -1)
- kore_debug("kill(%d, SIGINT): %s", kw->pid, errno_s);
- }
-
- kore_log(LOG_NOTICE, "waiting for workers to drain and finish");
- while (!TAILQ_EMPTY(&kore_workers))
- kore_platform_worker_wait(1);
-
kore_log(LOG_NOTICE, "server shutting down");
+ kore_worker_shutdown();
unlink(kore_pidfile);
close(server.fd);
+ kore_log(LOG_NOTICE, "goodbye");
return (0);
}
@@ -201,6 +169,10 @@ kore_server_start(void)
{
if (!kore_server_bind(&server, server_ip, server_port))
fatal("cannot bind to %s:%d", server_ip, server_port);
+
+ free(server_ip);
+ free(runas_user);
+
if (daemon(1, 1) == -1)
fatal("cannot daemon(): %s", errno_s);
@@ -212,8 +184,22 @@ kore_server_start(void)
kore_worker_init();
- free(server_ip);
- free(runas_user);
+ for (;;) {
+ if (sig_recv != 0) {
+ if (sig_recv == SIGHUP || sig_recv == SIGQUIT) {
+ kore_worker_dispatch_signal(sig_recv);
+ if (sig_recv == SIGHUP)
+ kore_module_reload();
+ if (sig_recv == SIGQUIT)
+ break;
+ }
+ sig_recv = 0;
+ }
+
+ if (!kore_accesslog_wait())
+ break;
+ kore_worker_wait(0);
+ }
}
static int
@@ -228,6 +214,9 @@ kore_server_bind(struct listener *l, const char *ip, int port)
return (KORE_RESULT_ERROR);
}
+ if (!kore_connection_nonblock(l->fd))
+ return (KORE_RESULT_ERROR);
+
on = 1;
if (setsockopt(l->fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&on,
sizeof(on)) == -1) {
@@ -247,7 +236,7 @@ kore_server_bind(struct listener *l, const char *ip, int port)
return (KORE_RESULT_ERROR);
}
- if (listen(l->fd, 50) == -1) {
+ if (listen(l->fd, 5000) == -1) {
close(l->fd);
kore_debug("listen(): %s", errno_s);
return (KORE_RESULT_ERROR);
diff --git a/src/linux.c b/src/linux.c
@@ -36,7 +36,6 @@
#include <string.h>
#include <sched.h>
#include <syslog.h>
-#include <unistd.h>
#include <time.h>
#include <regex.h>
#include <zlib.h>
@@ -61,51 +60,6 @@ kore_platform_init(void)
}
void
-kore_platform_worker_wait(int final)
-{
- int r;
- siginfo_t info;
- struct kore_worker k, *kw, *next;
-
- memset(&info, 0, sizeof(info));
- if (final)
- r = waitid(P_ALL, 0, &info, WEXITED);
- else
- r = waitid(P_ALL, 0, &info, WEXITED | WNOHANG);
- if (r == -1) {
- kore_debug("waitid(): %s", errno_s);
- return;
- }
-
- if (info.si_pid == 0)
- return;
-
- for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) {
- next = TAILQ_NEXT(kw, list);
- if (kw->pid != info.si_pid)
- continue;
-
- k = *kw;
- TAILQ_REMOVE(&kore_workers, kw, list);
- kore_log(LOG_NOTICE, "worker %d (%d)-> status %d (%d)",
- kw->id, info.si_pid, info.si_status, info.si_code);
- free(kw);
-
- if (final)
- continue;
-
- if (info.si_code == CLD_EXITED ||
- info.si_code == CLD_KILLED ||
- info.si_code == CLD_DUMPED) {
- kore_log(LOG_NOTICE,
- "worker %d (pid: %d) gone, respawning new one",
- k.id, k.pid);
- kore_worker_spawn(k.cpu);
- }
- }
-}
-
-void
kore_platform_worker_setcpu(struct kore_worker *kw)
{
cpu_set_t cpuset;
@@ -123,20 +77,19 @@ kore_platform_worker_setcpu(struct kore_worker *kw)
void
kore_platform_event_init(void)
{
- if ((efd = epoll_create(1000)) == -1)
+ if ((efd = epoll_create(10000)) == -1)
fatal("epoll_create(): %s", errno_s);
events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event));
- kore_platform_event_schedule(server.fd, EPOLLIN, 0, &server);
}
void
-kore_platform_event_wait(int quit)
+kore_platform_event_wait(void)
{
struct connection *c;
- int n, i, *fd;
+ int n, i, a, *fd;
- n = epoll_wait(efd, events, EPOLL_EVENTS, 100);
+ n = epoll_wait(efd, events, EPOLL_EVENTS, 10);
if (n == -1) {
if (errno == EINTR)
return;
@@ -160,10 +113,10 @@ kore_platform_event_wait(int quit)
}
if (*fd == server.fd) {
- if (!quit) {
+ for (a = 0; a < 10; a++) {
kore_connection_accept(&server, &c);
if (c == NULL)
- continue;
+ break;
kore_platform_event_schedule(c->fd,
EPOLLIN | EPOLLOUT | EPOLLET, 0, c);
@@ -202,6 +155,19 @@ kore_platform_event_schedule(int fd, int type, int flags, void *udata)
}
void
+kore_platform_enable_accept(void)
+{
+ kore_platform_event_schedule(server.fd, EPOLLIN, 0, &server);
+}
+
+void
+kore_platform_disable_accept(void)
+{
+ if (epoll_ctl(efd, EPOLL_CTL_DEL, server.fd, NULL) == -1)
+ fatal("kore_platform_disable_accept: %s", errno_s);
+}
+
+void
kore_platform_proctitle(char *title)
{
if (prctl(PR_SET_NAME, title) == -1)
diff --git a/src/utils.c b/src/utils.c
@@ -310,7 +310,7 @@ kore_time_ms(void)
if (gettimeofday(&tv, NULL) == -1)
return (0);
- return (tv.tv_sec * 1000 + (tv.tv_usec / 100));
+ return (tv.tv_sec * 1000 + (tv.tv_usec / 1000));
}
void
@@ -323,6 +323,7 @@ fatal(const char *fmt, ...)
vsnprintf(buf, sizeof(buf), fmt, args);
va_end(args);
+ kore_log(LOG_ERR, "%s", buf);
printf("kore: %s\n", buf);
exit(1);
}
diff --git a/src/worker.c b/src/worker.c
@@ -18,6 +18,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
#include <sys/wait.h>
#include <netinet/in.h>
@@ -30,30 +32,50 @@
#include <errno.h>
#include <grp.h>
#include <fcntl.h>
+#include <regex.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <sched.h>
#include <syslog.h>
-#include <unistd.h>
+#include <semaphore.h>
#include <time.h>
-#include <regex.h>
-#include <zlib.h>
#include <unistd.h>
+#include <zlib.h>
#include "spdy.h"
#include "kore.h"
#include "http.h"
+#define KORE_SHM_KEY 15000
+
+#if defined(KORE_USE_SEMAPHORE)
+#define kore_trylock sem_trywait
+#define kore_unlock sem_post
+static sem_t *kore_accept_lock;
+#else
+#define kore_trylock kore_internal_trylock
+#define kore_unlock kore_internal_unlock
+static int *kore_accept_lock;
+static int kore_internal_trylock(int *);
+static int kore_internal_unlock(int *);
+#endif
+
+static void kore_worker_acceptlock_obtain(void);
+static void kore_worker_acceptlock_release(void);
+
static u_int16_t workerid = 0;
static TAILQ_HEAD(, connection) disconnected;
static TAILQ_HEAD(, connection) worker_clients;
+static TAILQ_HEAD(, kore_worker) kore_workers;
+static int shm_accept_key;
-struct kore_worker_h kore_workers;
-struct kore_worker *worker = NULL;
+static u_int32_t worker_active_connections = 0;
+static u_int8_t worker_has_acceptlock = 0;
-extern volatile sig_atomic_t sig_recv;
+extern volatile sig_atomic_t sig_recv;
+struct kore_worker *worker = NULL;
+u_int32_t worker_max_connections = 250;
void
kore_worker_init(void)
@@ -63,6 +85,20 @@ kore_worker_init(void)
if (worker_count == 0)
fatal("no workers specified");
+ shm_accept_key = shmget(KORE_SHM_KEY,
+ sizeof(*kore_accept_lock), IPC_CREAT | IPC_EXCL | 0700);
+ if (shm_accept_key == -1)
+ fatal("kore_worker_init(): shmget() %s", errno_s);
+ if ((kore_accept_lock = shmat(shm_accept_key, NULL, 0)) == NULL)
+ fatal("kore_worker_init(): shmat() %s", errno_s);
+
+#if defined(KORE_USE_SEMAPHORE)
+ if (sem_init(kore_accept_lock, 1, 1) == -1)
+ fatal("kore_worker_init(): sem_init() %s", errno_s);
+#else
+ *kore_accept_lock = 0;
+#endif
+
kore_debug("kore_worker_init(): system has %d cpu's", cpu_count);
kore_debug("kore_worker_init(): starting %d workers", worker_count);
if (worker_count > cpu_count)
@@ -99,9 +135,34 @@ kore_worker_spawn(u_int16_t cpu)
}
void
+kore_worker_shutdown(void)
+{
+ kore_log(LOG_NOTICE, "waiting for workers to drain and shutdown");
+ while (!TAILQ_EMPTY(&kore_workers))
+ kore_worker_wait(1);
+
+ if (shmctl(shm_accept_key, IPC_RMID, NULL) == -1) {
+ kore_log(LOG_NOTICE,
+ "failed to deleted shm segment: %s", errno_s);
+ }
+}
+
+void
+kore_worker_dispatch_signal(int sig)
+{
+ struct kore_worker *kw;
+
+ TAILQ_FOREACH(kw, &kore_workers, list) {
+ if (kill(kw->pid, sig) == -1)
+ kore_debug("kill(%d, %d): %s", kw->pid, sig, errno_s);
+ }
+}
+
+void
kore_worker_entry(struct kore_worker *kw)
{
int quit;
+ u_int32_t lowat;
char buf[16];
struct connection *c, *cnext;
struct kore_worker *k, *next;
@@ -122,6 +183,9 @@ kore_worker_entry(struct kore_worker *kw)
for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) {
next = TAILQ_NEXT(k, list);
+ if (k == worker)
+ continue;
+
TAILQ_REMOVE(&kore_workers, k, list);
free(k);
}
@@ -141,6 +205,7 @@ kore_worker_entry(struct kore_worker *kw)
kore_platform_event_init();
kore_accesslog_worker_init();
+ lowat = worker_max_connections / 10;
kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu);
for (;;) {
if (sig_recv != 0) {
@@ -151,7 +216,17 @@ kore_worker_entry(struct kore_worker *kw)
sig_recv = 0;
}
- kore_platform_event_wait(quit);
+ if (!quit && !worker_has_acceptlock &&
+ worker_active_connections < lowat)
+ kore_worker_acceptlock_obtain();
+
+ kore_platform_event_wait();
+
+ if (worker_has_acceptlock &&
+ (worker_active_connections >= worker_max_connections ||
+ quit == 1))
+ kore_worker_acceptlock_release();
+
http_process();
for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
@@ -186,6 +261,7 @@ void
kore_worker_connection_add(struct connection *c)
{
TAILQ_INSERT_TAIL(&worker_clients, c, list);
+ worker_active_connections++;
}
void
@@ -194,3 +270,124 @@ kore_worker_connection_move(struct connection *c)
TAILQ_REMOVE(&worker_clients, c, list);
TAILQ_INSERT_TAIL(&disconnected, c, list);
}
+
+void
+kore_worker_connection_remove(struct connection *c)
+{
+ worker_active_connections--;
+}
+
+void
+kore_worker_wait(int final)
+{
+ pid_t pid;
+ int status;
+ struct kore_worker k, *kw, *next;
+
+ if (final)
+ pid = waitpid(WAIT_ANY, &status, 0);
+ else
+ pid = waitpid(WAIT_ANY, &status, WNOHANG);
+
+ if (pid == -1) {
+ kore_debug("waitpid(): %s", errno_s);
+ return;
+ }
+
+ if (pid == 0)
+ return;
+
+ for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) {
+ next = TAILQ_NEXT(kw, list);
+ if (kw->pid != pid)
+ continue;
+
+ k = *kw;
+ TAILQ_REMOVE(&kore_workers, kw, list);
+ kore_log(LOG_NOTICE, "worker %d (%d)-> status %d",
+ kw->id, pid, status);
+ free(kw);
+
+ if (final)
+ continue;
+
+ if (WEXITSTATUS(status) || WTERMSIG(status) ||
+ WCOREDUMP(status)) {
+ kore_log(LOG_NOTICE,
+ "worker %d (pid: %d) gone, respawning new one",
+ k.id, k.pid);
+ kore_worker_spawn(k.cpu);
+ }
+ }
+}
+
+static void
+kore_worker_acceptlock_obtain(void)
+{
+ int ret;
+
+ if (worker_count == 1 && !worker_has_acceptlock) {
+ worker_has_acceptlock = 1;
+ kore_platform_enable_accept();
+ return;
+ }
+
+ ret = kore_trylock(kore_accept_lock);
+ if (ret == -1) {
+ if (errno == EAGAIN)
+ return;
+ kore_log(LOG_WARNING, "kore_worker_acceptlock(): %s", errno_s);
+ } else {
+ worker_has_acceptlock = 1;
+ kore_platform_enable_accept();
+ kore_log(LOG_NOTICE, "obtained accept lock (%d/%d)",
+ worker_active_connections, worker_max_connections);
+ }
+}
+
+static void
+kore_worker_acceptlock_release(void)
+{
+ if (worker_count == 1)
+ return;
+
+ if (worker_has_acceptlock != 1) {
+ kore_log(LOG_NOTICE,
+ "kore_worker_acceptlock_release() != 1");
+ return;
+ }
+
+ if (kore_unlock(kore_accept_lock) == -1) {
+ kore_log(LOG_NOTICE,
+ "kore_worker_acceptlock_release(): %s", errno_s);
+ } else {
+ worker_has_acceptlock = 0;
+ kore_platform_disable_accept();
+ kore_log(LOG_NOTICE, "released %d/%d",
+ worker_active_connections, worker_max_connections);
+ }
+}
+
+#if !defined(KORE_USE_SEMAPHORE)
+
+static int
+kore_internal_trylock(int *lock)
+{
+ errno = EAGAIN;
+ if (__sync_val_compare_and_swap(lock, 0, 1) == 1)
+ return (-1);
+
+ errno = 0;
+ return (0);
+}
+
+static int
+kore_internal_unlock(int *lock)
+{
+ if (__sync_val_compare_and_swap(lock, 1, 0) != 1)
+ kore_log(LOG_NOTICE, "kore_internal_unlock(): wasnt locked");
+
+ return (0);
+}
+
+#endif