kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 0de28488a69ba1170922b9d790abef5cbda19984
parent ae0a8ece7534d05e8151db669ae1fdf61c51702c
Author: Joris Vink <joris@coders.se>
Date:   Tue,  4 Jun 2013 11:55:38 +0200

move from multithreads to single threaded worker processes.

Diffstat:
Makefile | 2+-
includes/http.h | 2++
includes/kore.h | 13+++----------
src/http.c | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++--
src/kore.c | 428+++++++++++++++++++++++++++++++++++--------------------------------------------
src/utils.c | 2+-
6 files changed, 251 insertions(+), 251 deletions(-)

diff --git a/Makefile b/Makefile @@ -11,7 +11,7 @@ CFLAGS+=-I/usr/local/ssl/include CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes CFLAGS+=-Wmissing-declarations -Wshadow -Wpointer-arith -Wcast-qual CFLAGS+=-D_GNU_SOURCE=1 -Wsign-compare -Iincludes -g -LDFLAGS=-rdynamic -Llibs -lssl -lcrypto -lpthread -ldl -lz +LDFLAGS=-rdynamic -Llibs -lssl -lcrypto -ldl -lz light: $(S_OBJS) $(CC) $(CFLAGS) $(S_OBJS) $(LDFLAGS) -o $(BIN) diff --git a/includes/http.h b/includes/http.h @@ -56,6 +56,8 @@ struct http_request { TAILQ_ENTRY(http_request) list; }; +void http_init(void); +void http_process(void); time_t http_date_to_time(char *); void http_request_free(struct http_request *); int http_response(struct http_request *, int, diff --git a/includes/kore.h b/includes/kore.h @@ -78,7 +78,6 @@ struct connection { void *owner; SSL *ssl; int flags; - pthread_mutex_t lock; u_int8_t inflate_started; z_stream z_inflate; @@ -108,13 +107,8 @@ struct kore_module_handle { }; struct kore_worker { - u_int8_t id; - pthread_t pctx; - pthread_mutex_t lock; - pthread_cond_t cond; - u_int32_t load; - - TAILQ_HEAD(, http_request) requests; + u_int16_t id; + pid_t pid; TAILQ_ENTRY(kore_worker) list; }; @@ -138,6 +132,7 @@ extern char *chroot_path; extern char *runas_user; extern char *kore_module_onload; extern u_int8_t worker_count; +extern pid_t mypid; void *kore_malloc(size_t); void *kore_calloc(size_t, size_t); @@ -158,8 +153,6 @@ int kore_module_domain_new(char *); void *kore_module_handler_find(char *, char *); int kore_module_handler_new(char *, char *, char *, int); -void kore_worker_delegate(struct http_request *); - void fatal(const char *, ...); void kore_log_internal(char *, int, const char *, ...); diff --git a/src/http.c b/src/http.c @@ -42,6 +42,14 @@ static int http_post_data_recv(struct netbuf *); static int http_send_done(struct netbuf *); +static TAILQ_HEAD(, http_request) http_requests; + +void +http_init(void) +{ + TAILQ_INIT(&http_requests); +} + int http_request_new(struct connection *c, struct spdy_stream *s, char *host, char *method, char *path, struct http_request **out) @@ -76,12 +84,55 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, if (out != NULL) *out = req; - kore_worker_delegate(req); - + TAILQ_INSERT_TAIL(&http_requests, req, list); return (KORE_RESULT_OK); } void +http_process(void) +{ + struct http_request *req, *next; + int r, (*hdlr)(struct http_request *); + + for (req = TAILQ_FIRST(&http_requests); req != NULL; req = next) { + next = TAILQ_NEXT(req, list); + + if (req->flags & HTTP_REQUEST_DELETE) { + TAILQ_REMOVE(&http_requests, req, list); + http_request_free(req); + continue; + } + + if (!(req->flags & HTTP_REQUEST_COMPLETE)) + continue; + + hdlr = kore_module_handler_find(req->host, req->path); + if (hdlr == NULL) + r = http_generic_404(req); + else + r = hdlr(req); + + switch (r) { + case KORE_RESULT_OK: + r = net_send_flush(req->owner); + if (r == KORE_RESULT_ERROR) + kore_server_disconnect(req->owner); + break; + case KORE_RESULT_ERROR: + kore_server_disconnect(req->owner); + break; + case KORE_RESULT_RETRY: + break; + } + + if (r != KORE_RESULT_RETRY) { + TAILQ_REMOVE(&http_requests, req, list); + http_request_free(req); + } + } +} + +void http_response_header_add(struct http_request *req, char *header, char *value) { struct http_header *hdr; diff --git a/src/kore.c b/src/kore.c @@ -19,6 +19,7 @@ #include <sys/socket.h> #include <sys/queue.h> #include <sys/epoll.h> +#include <sys/wait.h> #include <netinet/in.h> #include <arpa/inet.h> @@ -38,7 +39,6 @@ #include <time.h> #include <regex.h> #include <zlib.h> -#include <pthread.h> #include <unistd.h> #include "spdy.h" @@ -47,41 +47,33 @@ #define EPOLL_EVENTS 500 -#define RESCHEDULE_AT_ERROR 1 -#define RESCHEDULE_AT_NORMAL 2 - -struct reschedule { - struct connection *c; - int reason; - int events; - TAILQ_ENTRY(reschedule) list; -}; - static int efd = -1; static SSL_CTX *ssl_ctx = NULL; volatile sig_atomic_t sig_recv; +static struct listener server; static TAILQ_HEAD(, connection) disconnected; +static TAILQ_HEAD(, connection) worker_clients; static TAILQ_HEAD(, kore_worker) kore_workers; -static TAILQ_HEAD(, reschedule) reschedule_list; -static struct kore_worker *last_worker = NULL; -static pthread_mutex_t disconnect_lock; -static pthread_rwlock_t module_lock; +static struct passwd *pw = NULL; +static u_int16_t workerid = 0; int server_port = 0; char *server_ip = NULL; char *chroot_path = NULL; char *runas_user = NULL; u_int8_t worker_count = 0; +pid_t mypid = -1; static void kore_signal(int); +static void kore_worker_wait(int); static void kore_worker_init(void); -static void *kore_worker_entry(void *); +static void kore_worker_spawn(void); static int kore_socket_nonblock(int); static int kore_server_sslstart(void); static void kore_event(int, int, void *); static int kore_server_accept(struct listener *); -static void kore_reschedule(struct connection *, int, int); +static void kore_worker_entry(struct kore_worker *); static int kore_connection_handle(struct connection *, int); static void kore_server_final_disconnect(struct connection *); static int kore_server_bind(struct listener *, const char *, int); @@ -90,14 +82,9 @@ static int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); int main(int argc, char *argv[]) { - struct passwd *pw; - struct kore_worker *kw; - struct listener server; - struct epoll_event *events; - int n, i, *fd; - struct connection *c, *cnext; - struct reschedule *sched, *snext; + struct kore_worker *kw, *next; + mypid = getpid(); if (argc != 2) fatal("Usage: kore [config file]"); @@ -116,12 +103,10 @@ main(int argc, char *argv[]) if (!kore_server_bind(&server, server_ip, server_port)) fatal("cannot bind to %s:%d", server_ip, server_port); - if (!kore_server_sslstart()) - fatal("cannot initiate SSL"); - - if ((efd = epoll_create(1000)) == -1) - fatal("epoll_create(): %s", errno_s); + if (daemon(1, 1) == -1) + fatal("cannot daemon(): %s", errno_s); + mypid = getpid(); if (chroot(chroot_path) == -1) fatal("chroot(%s): %s", chroot_path, errno_s); if (chdir("/") == -1) @@ -130,112 +115,46 @@ main(int argc, char *argv[]) pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) fatal("unable to drop privileges"); - TAILQ_INIT(&disconnected); - TAILQ_INIT(&reschedule_list); - pthread_mutex_init(&disconnect_lock, NULL); - pthread_rwlock_init(&module_lock, NULL); - kore_worker_init(); sig_recv = 0; + signal(SIGINT, kore_signal); signal(SIGHUP, kore_signal); - kore_event(server.fd, EPOLLIN, &server); - events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); for (;;) { - if (sig_recv == SIGHUP) { - if (!pthread_rwlock_trywrlock(&module_lock)) { - kore_module_reload(); - sig_recv = 0; - pthread_rwlock_unlock(&module_lock); - } - } - - n = epoll_wait(efd, events, EPOLL_EVENTS, 1000); - if (n == -1) { - if (errno == EINTR) - continue; - fatal("epoll_wait(): %s", errno_s); - } - - TAILQ_FOREACH(kw, &kore_workers, list) { - if (pthread_mutex_trylock(&(kw->lock))) - continue; - pthread_cond_signal(&(kw->cond)); - pthread_mutex_unlock(&(kw->lock)); - } - - if (n > 0) - kore_log("main(): %d sockets available", n); - - for (i = 0; i < n; i++) { - fd = (int *)events[i].data.ptr; - - if (events[i].events & EPOLLERR || - events[i].events & EPOLLHUP) { - if (*fd == server.fd) - fatal("error on server socket"); - - c = (struct connection *)events[i].data.ptr; - if (pthread_mutex_trylock(&(c->lock))) { - kore_reschedule(c, - RESCHEDULE_AT_ERROR, 0); - } else { - kore_server_disconnect(c); - pthread_mutex_unlock(&(c->lock)); + if (sig_recv != 0) { + kore_log("signal %d received", sig_recv); + if (sig_recv == SIGHUP) { + TAILQ_FOREACH(kw, &kore_workers, list) { + if (kill(kw->pid, SIGHUP) == -1) { + kore_log("kill(%d, SIGHUP): %s", + kw->pid, errno_s); + } } - continue; - } - - if (*fd == server.fd) { - kore_server_accept(&server); - } else { - c = (struct connection *)events[i].data.ptr; - if (pthread_mutex_trylock(&(c->lock))) { - kore_reschedule(c, - RESCHEDULE_AT_NORMAL, - events[i].events); - } else { - if (!kore_connection_handle(c, - events[i].events)) - kore_server_disconnect(c); - pthread_mutex_unlock(&(c->lock)); - } - } - } - - for (sched = TAILQ_FIRST(&reschedule_list); sched != NULL; - sched = snext) { - snext = TAILQ_NEXT(sched, list); - if (pthread_mutex_trylock(&(sched->c->lock))) - continue; - - if (sched->reason == RESCHEDULE_AT_ERROR) { - kore_server_disconnect(sched->c); - } else { - if (!kore_connection_handle(sched->c, - sched->events)) - kore_server_disconnect(sched->c); + } else if (sig_recv == SIGINT) { + break; } - - pthread_mutex_unlock(&(sched->c->lock)); - - TAILQ_REMOVE(&reschedule_list, sched, list); - free(sched); + sig_recv = 0; } - if (pthread_mutex_trylock(&disconnect_lock)) - continue; + kore_worker_wait(0); + sleep(1); + } - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - kore_server_final_disconnect(c); - } + for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { + next = TAILQ_NEXT(kw, list); + if (kill(kw->pid, SIGINT) == -1) + kore_log("kill(%d, SIGINT): %s", kw->pid, errno_s); + } - pthread_mutex_unlock(&disconnect_lock); + while (!TAILQ_EMPTY(&kore_workers)) { + kore_log("waiting for workers to drain and finish"); + kore_worker_wait(1); } + kore_log("server shutting down"); close(server.fd); + return (0); } @@ -245,36 +164,11 @@ kore_server_disconnect(struct connection *c) if (c->state != CONN_STATE_DISCONNECTING) { kore_log("preparing %p for disconnection", c); c->state = CONN_STATE_DISCONNECTING; - - pthread_mutex_lock(&disconnect_lock); + TAILQ_REMOVE(&worker_clients, c, list); TAILQ_INSERT_TAIL(&disconnected, c, list); - pthread_mutex_unlock(&disconnect_lock); } } -void -kore_worker_delegate(struct http_request *req) -{ - struct kore_worker *kw; - - if (last_worker != NULL) { - kw = TAILQ_NEXT(last_worker, list); - if (kw == NULL) - kw = TAILQ_FIRST(&kore_workers); - } else { - kw = TAILQ_FIRST(&kore_workers); - } - - last_worker = kw; - - pthread_mutex_lock(&(kw->lock)); - kore_log("assigning request %p to worker %d:%d", req, kw->id, kw->load); - kw->load++; - TAILQ_INSERT_TAIL(&(kw->requests), req, list); - pthread_cond_signal(&(kw->cond)); - pthread_mutex_unlock(&(kw->lock)); -} - static int kore_server_sslstart(void) { @@ -310,6 +204,8 @@ kore_server_sslstart(void) static int kore_server_bind(struct listener *l, const char *ip, int port) { + int on; + kore_log("kore_server_bind(%p, %s, %d)", l, ip, port); if ((l->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { @@ -322,6 +218,14 @@ kore_server_bind(struct listener *l, const char *ip, int port) return (KORE_RESULT_ERROR); } + on = 1; + if (setsockopt(l->fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&on, + sizeof(on)) == -1) { + kore_log("setsockopt(): %s", errno_s); + close(l->fd); + return (KORE_RESULT_ERROR); + } + memset(&(l->sin), 0, sizeof(l->sin)); l->sin.sin_family = AF_INET; l->sin.sin_port = htons(port); @@ -372,11 +276,12 @@ kore_server_accept(struct listener *l) c->client_stream_id = 0; c->proto = CONN_PROTO_UNKNOWN; c->state = CONN_STATE_SSL_SHAKE; - pthread_mutex_init(&(c->lock), NULL); TAILQ_INIT(&(c->send_queue)); TAILQ_INIT(&(c->recv_queue)); TAILQ_INIT(&(c->spdy_streams));; + TAILQ_INSERT_TAIL(&worker_clients, c, list); + kore_event(c->fd, EPOLLIN | EPOLLOUT | EPOLLET, c); return (KORE_RESULT_OK); @@ -388,15 +293,11 @@ kore_server_final_disconnect(struct connection *c) struct netbuf *nb, *next; struct spdy_stream *s, *snext; - if (pthread_mutex_trylock(&(c->lock))) { - kore_log("delaying disconnection of %p", c); - return; - } + kore_log("kore_server_final_disconnect(%p)", c); if (c->ssl != NULL) SSL_free(c->ssl); - kore_log("kore_server_final_disconnect(%p) succeeded", c); TAILQ_REMOVE(&disconnected, c, list); close(c->fd); if (c->inflate_started) @@ -431,7 +332,6 @@ kore_server_final_disconnect(struct connection *c) free(s); } - pthread_mutex_destroy(&(c->lock)); free(c); } @@ -522,102 +422,169 @@ static void kore_worker_init(void) { u_int8_t i; - struct kore_worker *kw; + + if (worker_count == 0) + fatal("no workers specified"); kore_log("kore_worker_init(): starting %d workers", worker_count); TAILQ_INIT(&kore_workers); - for (i = 0; i < worker_count; i++) { - kw = (struct kore_worker *)kore_malloc(sizeof(*kw)); - kw->id = i; - kw->load = 0; - pthread_cond_init(&(kw->cond), NULL); - pthread_mutex_init(&(kw->lock), NULL); - TAILQ_INIT(&(kw->requests)); - TAILQ_INSERT_TAIL(&kore_workers, kw, list); - - if (pthread_create(&(kw->pctx), NULL, kore_worker_entry, kw)) - kore_log("failed to spawn worker: %s", errno_s); + for (i = 0; i < worker_count; i++) + kore_worker_spawn(); +} + +static void +kore_worker_spawn(void) +{ + struct kore_worker *kw; + + kw = (struct kore_worker *)kore_malloc(sizeof(*kw)); + kw->id = workerid++; + kw->pid = fork(); + if (kw->pid == -1) + fatal("could not spawn worker child: %s", errno_s); + + if (kw->pid == 0) { + kw->pid = getpid(); + kore_worker_entry(kw); + /* NOTREACHED */ } - if (i == 0) - fatal("No workers spawned, check logs for errors."); + TAILQ_INSERT_TAIL(&kore_workers, kw, list); } -static void * -kore_worker_entry(void *arg) +static void +kore_worker_wait(int final) { - struct http_request *req; - struct kore_worker *kw = (struct kore_worker *)arg; - int r, (*hdlr)(struct http_request *); + int r; + siginfo_t info; + struct kore_worker *kw, *next; - pthread_mutex_lock(&(kw->lock)); - for (;;) { - pthread_cond_wait(&(kw->cond), &(kw->lock)); + memset(&info, 0, sizeof(info)); + r = waitid(P_ALL, 0, &info, WEXITED | WNOHANG); + if (r == -1) { + kore_log("waitid(): %s", errno_s); + return; + } - while (!TAILQ_EMPTY(&(kw->requests))) { - req = TAILQ_FIRST(&(kw->requests)); - TAILQ_REMOVE(&(kw->requests), req, list); - pthread_mutex_unlock(&(kw->lock)); + if (info.si_pid == 0) + return; - if (req->flags & HTTP_REQUEST_DELETE) { - pthread_mutex_lock(&(kw->lock)); - kw->load--; - http_request_free(req); - continue; - } + for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { + next = TAILQ_NEXT(kw, list); + if (kw->pid != info.si_pid) + continue; - if (!(req->flags & HTTP_REQUEST_COMPLETE)) { - pthread_mutex_lock(&(kw->lock)); - TAILQ_INSERT_TAIL(&(kw->requests), req, list); - continue; - } + TAILQ_REMOVE(&kore_workers, kw, list); + kore_log("worker %d (%d)-> status %d (%d)", + kw->id, info.si_pid, info.si_status, info.si_code); + free(kw); + + if (final) + continue; + + if (info.si_code == CLD_EXITED || + info.si_code == CLD_KILLED || + info.si_code == CLD_DUMPED) { + kore_log("worker died, respawning new one"); + kore_worker_spawn(); + } + } +} + +static void +kore_worker_entry(struct kore_worker *kw) +{ + struct epoll_event *events; + int n, i, *fd; + struct connection *c, *cnext; + struct kore_worker *k, *next; + + for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) { + next = TAILQ_NEXT(k, list); + TAILQ_REMOVE(&kore_workers, k, list); + free(k); + } + + mypid = kw->pid; + + if (!kore_server_sslstart()) + fatal("cannot initiate SSL"); + if ((efd = epoll_create(1000)) == -1) + fatal("epoll_create(): %s", errno_s); + + sig_recv = 0; + signal(SIGHUP, kore_signal); + signal(SIGINT, kore_signal); + + http_init(); + TAILQ_INIT(&disconnected); + TAILQ_INIT(&worker_clients); + + kore_event(server.fd, EPOLLIN, &server); + events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); + for (;;) { + if (sig_recv != 0) { + if (sig_recv == SIGHUP) + kore_module_reload(); + else if (sig_recv == SIGINT) + break; + sig_recv = 0; + } - if (pthread_mutex_trylock(&(req->owner->lock))) { - pthread_mutex_lock(&(kw->lock)); - TAILQ_INSERT_TAIL(&(kw->requests), req, list); + n = epoll_wait(efd, events, EPOLL_EVENTS, 100); + if (n == -1) { + if (errno == EINTR) continue; - } + fatal("epoll_wait(): %s", errno_s); + } + + if (n > 0) + kore_log("main(): %d sockets available", n); + + for (i = 0; i < n; i++) { + fd = (int *)events[i].data.ptr; - if (pthread_rwlock_tryrdlock(&module_lock)) { - pthread_mutex_unlock(&(req->owner->lock)); - pthread_mutex_lock(&(kw->lock)); - TAILQ_INSERT_TAIL(&(kw->requests), req, list); + if (events[i].events & EPOLLERR || + events[i].events & EPOLLHUP) { + if (*fd == server.fd) + fatal("error on server socket"); + + c = (struct connection *)events[i].data.ptr; + kore_server_disconnect(c); continue; } - hdlr = kore_module_handler_find(req->host, req->path); - if (hdlr == NULL) - r = http_generic_404(req); - else - r = hdlr(req); - pthread_rwlock_unlock(&module_lock); - - switch (r) { - case KORE_RESULT_OK: - r = net_send_flush(req->owner); - if (r == KORE_RESULT_ERROR) - kore_server_disconnect(req->owner); - break; - case KORE_RESULT_ERROR: - kore_server_disconnect(req->owner); - break; - case KORE_RESULT_RETRY: - TAILQ_INSERT_TAIL(&(kw->requests), req, list); - break; + if (*fd == server.fd) { + kore_server_accept(&server); + } else { + c = (struct connection *)events[i].data.ptr; + if (!kore_connection_handle(c, + events[i].events)) + kore_server_disconnect(c); } + } - pthread_mutex_unlock(&(req->owner->lock)); + http_process(); - if (r != KORE_RESULT_RETRY) { - pthread_mutex_lock(&(kw->lock)); - http_request_free(req); - kw->load--; - } + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + kore_server_final_disconnect(c); } } - pthread_exit(NULL); + for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + kore_server_final_disconnect(c); + } + + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + kore_server_final_disconnect(c); + } + + kore_log("worker %d shutting down", kw->id); + exit(0); } static int @@ -672,19 +639,6 @@ kore_event(int fd, int flags, void *udata) } static void -kore_reschedule(struct connection *c, int reason, int events) -{ - struct reschedule *sched; - - sched = (struct reschedule *)kore_malloc(sizeof(*sched)); - sched->c = c; - sched->reason = reason; - sched->events = events; - - TAILQ_INSERT_TAIL(&reschedule_list, sched, list); -} - -static void kore_signal(int sig) { sig_recv = sig; diff --git a/src/utils.c b/src/utils.c @@ -112,7 +112,7 @@ kore_log_internal(char *file, int line, const char *fmt, ...) vsnprintf(buf, sizeof(buf), fmt, args); va_end(args); - printf("%s:%d - %s\n", file, line, buf); + printf("[%d] %s:%d - %s\n", mypid, file, line, buf); } void