kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 25f1ab9865dbcaffb6a8c303f41275d0dbf626e3
parent 98de763632989c89c25987c797bffd581600ffb0
Author: Joris Vink <joris@coders.se>
Date:   Mon, 17 Jun 2013 23:39:17 +0200

Add BSD kqueue(2) support. Compile with make bsd (or make linux for linux)

Diffstat:
Makefile | 17+++++++++++++----
example.conf | 2+-
example/Makefile | 2+-
example/src/example.c | 1-
includes/kore.h | 24++++++++++++++++++++++--
src/bsd.c | 206+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/kore.c | 467++++++++++++++++++++++++++-----------------------------------------------------
src/linux.c | 230+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/net.c | 1-
9 files changed, 625 insertions(+), 325 deletions(-)

diff --git a/Makefile b/Makefile @@ -3,20 +3,29 @@ CC=gcc BIN=kore -S_SRC= src/kore.c src/buf.c src/config.c src/net.c src/spdy.c src/http.c \ +S_SRC+= src/kore.c src/buf.c src/config.c src/net.c src/spdy.c src/http.c \ src/module.c src/utils.c src/zlib_dict.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-I/usr/local/ssl/include CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes CFLAGS+=-Wmissing-declarations -Wshadow -Wpointer-arith -Wcast-qual -CFLAGS+=-D_GNU_SOURCE=1 -Wsign-compare -Iincludes -g -LDFLAGS=-rdynamic -Llibs -lssl -lcrypto -ldl -lz +CFLAGS+=-Wsign-compare -Iincludes -g +LDFLAGS+=-rdynamic -Llibs -lssl -lcrypto -lz + +default: + @echo "Please specify a build target [linux | bsd]" + +linux: + @LDFLAGS=-ldl CFLAGS=-D_GNU_SOURCE=1 S_SRC=src/linux.c make kore + +bsd: + @S_SRC=src/bsd.c make kore kore: $(S_OBJS) $(CC) $(CFLAGS) $(S_OBJS) $(LDFLAGS) -o $(BIN) -.c.o: $< +.c.o: $(CC) $(CFLAGS) -c $< -o $@ clean: diff --git a/example.conf b/example.conf @@ -3,7 +3,7 @@ # Server configuration. bind 10.211.55.3 443 certfile cert/server.crt -certkey certe/server.key +certkey cert/server.key # The path worker processes will chroot too after starting. chroot /home/joris/src/kore diff --git a/example/Makefile b/example/Makefile @@ -37,7 +37,7 @@ html_inject: tools/html_inject.c .css.c: tools/html_inject $< `basename $<` > $@ -.c.o: $< +.c.o: $(CC) -fPIC $(CFLAGS) -c $< -o $@ clean: diff --git a/example/src/example.c b/example/src/example.c @@ -18,7 +18,6 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/queue.h> -#include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h> diff --git a/includes/kore.h b/includes/kore.h @@ -111,6 +111,8 @@ struct kore_worker { TAILQ_ENTRY(kore_worker) list; }; +TAILQ_HEAD(kore_worker_h, kore_worker); + #define KORE_BUF_INITIAL 128 #define KORE_BUF_INCREMENT KORE_BUF_INITIAL @@ -125,6 +127,7 @@ struct buf_vec { u_int32_t length; }; +extern pid_t mypid; extern int kore_debug; extern int server_port; extern char *server_ip; @@ -134,8 +137,25 @@ extern char *kore_module_onload; extern char *kore_pidfile; extern char *kore_certfile; extern char *kore_certkey; -extern u_int8_t worker_count; -extern pid_t mypid; + +extern u_int16_t cpu_count; +extern u_int8_t worker_count; + +extern struct listener server; +extern struct kore_worker_h kore_workers; + +void kore_init(void); +void kore_worker_init(void); +void kore_worker_wait(int); +void kore_event_init(void); +void kore_event_wait(int); +void kore_set_proctitle(char *); +void kore_worker_spawn(u_int16_t); +void kore_worker_entry(struct kore_worker *); +void kore_worker_setcpu(struct kore_worker *); +void kore_event_schedule(int, int, int, void *); +int kore_connection_handle(struct connection *); +int kore_server_accept(struct listener *, struct connection **); void kore_log_init(void); void *kore_malloc(size_t); diff --git a/src/bsd.c b/src/bsd.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> +#include <sys/event.h> +#include <sys/wait.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <errno.h> +#include <fcntl.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <syslog.h> +#include <unistd.h> +#include <time.h> +#include <regex.h> +#include <zlib.h> +#include <unistd.h> + +#include "spdy.h" +#include "kore.h" +#include "http.h" + +#define KQUEUE_EVENTS 500 +static int kfd = -1; +static struct kevent *events; +static int nchanges; +static struct kevent *changelist; + +void +kore_init(void) +{ + cpu_count = 0; +} + +void +kore_worker_init(void) +{ + u_int16_t i; + + if (worker_count == 0) + fatal("no workers specified"); + + kore_debug("kore_worker_init(): starting %d workers", worker_count); + + TAILQ_INIT(&kore_workers); + for (i = 0; i < worker_count; i++) + kore_worker_spawn(0); +} + +void +kore_worker_wait(int final) +{ + pid_t pid; + int status; + struct kore_worker k, *kw, *next; + + if (final) + pid = waitpid(WAIT_ANY, &status, 0); + else + pid = waitpid(WAIT_ANY, &status, WNOHANG); + + if (pid == -1) { + kore_debug("waitpid(): %s", errno_s); + return; + } + + if (pid == 0) + return; + + for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { + next = TAILQ_NEXT(kw, list); + if (kw->pid != pid) + continue; + + k = *kw; + TAILQ_REMOVE(&kore_workers, kw, list); + kore_log(LOG_NOTICE, "worker %d (%d)-> status %d", + kw->id, pid, status); + free(kw); + + if (final) + continue; + + if (WEXITSTATUS(status) || WTERMSIG(status) || + WCOREDUMP(status)) { + kore_log(LOG_NOTICE, + "worker %d (pid: %d) gone, respawning new one", + k.id, k.pid); + kore_worker_spawn(0); + } + } +} + +void +kore_worker_setcpu(struct kore_worker *kw) +{ +} + +void +kore_event_init(void) +{ + if ((kfd = kqueue()) == -1) + fatal("kqueue(): %s", errno_s); + + nchanges = 0; + events = kore_calloc(KQUEUE_EVENTS, sizeof(struct kevent)); + changelist = kore_calloc(KQUEUE_EVENTS, sizeof(struct kevent)); + kore_event_schedule(server.fd, EVFILT_READ, EV_ADD, &server); +} + +void +kore_event_wait(int quit) +{ + struct connection *c; + int n, i, *fd; + + n = kevent(kfd, changelist, nchanges, events, KQUEUE_EVENTS, NULL); + if (n == -1) { + if (errno == EINTR) + return; + fatal("kevent(): %s", errno_s); + } + + nchanges = 0; + if (n > 0) + kore_debug("main(): %d sockets available", n); + + for (i = 0; i < n; i++) { + fd = (int *)events[i].udata; + + if (events[i].flags & EV_EOF || + events[i].flags & EV_ERROR) { + if (*fd == server.fd) + fatal("error on server socket"); + + c = (struct connection *)events[i].udata; + kore_server_disconnect(c); + continue; + } + + if (*fd == server.fd) { + if (!quit) { + kore_server_accept(&server, &c); + if (c == NULL) + continue; + + kore_event_schedule(c->fd, EVFILT_READ, + EV_ADD, c); + kore_event_schedule(c->fd, EVFILT_WRITE, + EV_ADD | EV_ONESHOT, c); + } + } else { + c = (struct connection *)events[i].udata; + if (events[i].filter == EVFILT_READ) + c->flags |= CONN_READ_POSSIBLE; + if (events[i].filter == EVFILT_WRITE) + c->flags |= CONN_WRITE_POSSIBLE; + + if (!kore_connection_handle(c)) { + kore_server_disconnect(c); + } else { + if (!TAILQ_EMPTY(&(c->send_queue))) { + kore_event_schedule(c->fd, EVFILT_WRITE, + EV_ADD | EV_ONESHOT, c); + } + } + } + } +} + +void +kore_event_schedule(int fd, int type, int flags, void *data) +{ + EV_SET(&changelist[nchanges], fd, type, flags, 0, 0, data); + nchanges++; +} + +void +kore_set_proctitle(char *title) +{ + setproctitle("%s", title); +} diff --git a/src/kore.c b/src/kore.c @@ -18,8 +18,6 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/queue.h> -#include <sys/epoll.h> -#include <sys/prctl.h> #include <sys/wait.h> #include <netinet/in.h> @@ -48,21 +46,18 @@ #include "kore.h" #include "http.h" -#define EPOLL_EVENTS 500 - -static int efd = -1; static SSL_CTX *ssl_ctx = NULL; volatile sig_atomic_t sig_recv; -static struct listener server; static TAILQ_HEAD(, connection) disconnected; static TAILQ_HEAD(, connection) worker_clients; -static TAILQ_HEAD(, kore_worker) kore_workers; static struct passwd *pw = NULL; static u_int16_t workerid = 0; -static u_int16_t cpu_count = 1; +struct listener server; pid_t mypid = -1; +u_int16_t cpu_count = 1; +struct kore_worker_h kore_workers; int kore_debug = 0; int server_port = 0; u_int8_t worker_count = 0; @@ -75,17 +70,9 @@ char *kore_pidfile = KORE_PIDFILE_DEFAULT; static void usage(void); static void kore_signal(int); -static void kore_worker_wait(int); -static void kore_worker_init(void); static void kore_write_mypid(void); static int kore_socket_nonblock(int); static int kore_server_sslstart(void); -static void kore_event(int, int, void *); -static void kore_worker_spawn(u_int16_t); -static int kore_server_accept(struct listener *); -static void kore_worker_entry(struct kore_worker *); -static void kore_worker_setcpu(struct kore_worker *); -static int kore_connection_handle(struct connection *, int); static void kore_server_final_disconnect(struct connection *); static int kore_server_bind(struct listener *, const char *, int); static int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); @@ -146,10 +133,7 @@ main(int argc, char *argv[]) if (kore_certfile == NULL || kore_certkey == NULL) fatal("missing certificate information"); - if ((cpu_count = sysconf(_SC_NPROCESSORS_ONLN)) == -1) { - kore_debug("could not get number of cpu's falling back to 1"); - cpu_count = 1; - } + kore_init(); if (!kore_server_sslstart()) fatal("cannot initiate SSL"); @@ -163,9 +147,7 @@ main(int argc, char *argv[]) kore_log(LOG_NOTICE, "kore is starting up"); kore_worker_init(); - - if (prctl(PR_SET_NAME, "kore [main]")) - kore_debug("cannot set process title"); + kore_set_proctitle("kore [parent]"); sig_recv = 0; signal(SIGHUP, kore_signal); @@ -213,102 +195,15 @@ main(int argc, char *argv[]) return (0); } -void -kore_server_disconnect(struct connection *c) -{ - if (c->state != CONN_STATE_DISCONNECTING) { - kore_debug("preparing %p for disconnection", c); - c->state = CONN_STATE_DISCONNECTING; - TAILQ_REMOVE(&worker_clients, c, list); - TAILQ_INSERT_TAIL(&disconnected, c, list); - } -} - -static int -kore_server_sslstart(void) -{ - kore_debug("kore_server_sslstart()"); - - SSL_library_init(); - SSL_load_error_strings(); - ssl_ctx = SSL_CTX_new(SSLv23_server_method()); - if (ssl_ctx == NULL) { - kore_debug("SSL_ctx_new(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - - if (!SSL_CTX_use_certificate_chain_file(ssl_ctx, kore_certfile)) { - kore_debug("SSL_CTX_use_certificate_file(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - - if (!SSL_CTX_use_PrivateKey_file(ssl_ctx, kore_certkey, - SSL_FILETYPE_PEM)) { - kore_debug("SSL_CTX_use_PrivateKey_file(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - - SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY); - SSL_CTX_set_mode(ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); - SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2); - SSL_CTX_set_next_protos_advertised_cb(ssl_ctx, kore_ssl_npn_cb, NULL); - - return (KORE_RESULT_OK); -} - -static int -kore_server_bind(struct listener *l, const char *ip, int port) -{ - int on; - - kore_debug("kore_server_bind(%p, %s, %d)", l, ip, port); - - if ((l->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - kore_debug("socket(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - if (!kore_socket_nonblock(l->fd)) { - close(l->fd); - return (KORE_RESULT_ERROR); - } - - on = 1; - if (setsockopt(l->fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&on, - sizeof(on)) == -1) { - kore_debug("setsockopt(): %s", errno_s); - close(l->fd); - return (KORE_RESULT_ERROR); - } - - memset(&(l->sin), 0, sizeof(l->sin)); - l->sin.sin_family = AF_INET; - l->sin.sin_port = htons(port); - l->sin.sin_addr.s_addr = inet_addr(ip); - - if (bind(l->fd, (struct sockaddr *)&(l->sin), sizeof(l->sin)) == -1) { - close(l->fd); - kore_debug("bind(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - if (listen(l->fd, 50) == -1) { - close(l->fd); - kore_debug("listen(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - return (KORE_RESULT_OK); -} - -static int -kore_server_accept(struct listener *l) +int +kore_server_accept(struct listener *l, struct connection **out) { socklen_t len; struct connection *c; kore_debug("kore_server_accept(%p)", l); + *out = NULL; len = sizeof(struct sockaddr_in); c = (struct connection *)kore_malloc(sizeof(*c)); if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { @@ -334,75 +229,32 @@ kore_server_accept(struct listener *l) TAILQ_INIT(&(c->send_queue)); TAILQ_INIT(&(c->recv_queue)); - TAILQ_INIT(&(c->spdy_streams));; + TAILQ_INIT(&(c->spdy_streams)); TAILQ_INSERT_TAIL(&worker_clients, c, list); - kore_event(c->fd, EPOLLIN | EPOLLOUT | EPOLLET, c); - + *out = c; return (KORE_RESULT_OK); } -static void -kore_server_final_disconnect(struct connection *c) +void +kore_server_disconnect(struct connection *c) { - struct netbuf *nb, *next; - struct spdy_stream *s, *snext; - - kore_debug("kore_server_final_disconnect(%p)", c); - - if (c->ssl != NULL) - SSL_free(c->ssl); - - TAILQ_REMOVE(&disconnected, c, list); - close(c->fd); - if (c->inflate_started) - inflateEnd(&(c->z_inflate)); - if (c->deflate_started) - deflateEnd(&(c->z_deflate)); - - for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { - next = TAILQ_NEXT(nb, list); - TAILQ_REMOVE(&(c->send_queue), nb, list); - free(nb->buf); - free(nb); - } - - for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { - next = TAILQ_NEXT(nb, list); - TAILQ_REMOVE(&(c->recv_queue), nb, list); - free(nb->buf); - free(nb); - } - - for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { - snext = TAILQ_NEXT(s, list); - TAILQ_REMOVE(&(c->spdy_streams), s, list); - - if (s->hblock != NULL) { - if (s->hblock->header_block != NULL) - free(s->hblock->header_block); - free(s->hblock); - } - - free(s); + if (c->state != CONN_STATE_DISCONNECTING) { + kore_debug("preparing %p for disconnection", c); + c->state = CONN_STATE_DISCONNECTING; + TAILQ_REMOVE(&worker_clients, c, list); + TAILQ_INSERT_TAIL(&disconnected, c, list); } - - free(c); } -static int -kore_connection_handle(struct connection *c, int flags) +int +kore_connection_handle(struct connection *c) { int r; u_int32_t len; const u_char *data; - kore_debug("kore_connection_handle(%p, %d)", c, flags); - - if (flags & EPOLLIN) - c->flags |= CONN_READ_POSSIBLE; - if (flags & EPOLLOUT) - c->flags |= CONN_WRITE_POSSIBLE; + kore_debug("kore_connection_handle(%p)", c); switch (c->state) { case CONN_STATE_SSL_SHAKE: @@ -473,29 +325,7 @@ kore_connection_handle(struct connection *c, int flags) return (KORE_RESULT_OK); } -static void -kore_worker_init(void) -{ - u_int16_t i, cpu; - - if (worker_count == 0) - fatal("no workers specified"); - - kore_debug("kore_worker_init(): system has %d cpu's", cpu_count); - kore_debug("kore_worker_init(): starting %d workers", worker_count); - if (worker_count > cpu_count) - kore_debug("kore_worker_init(): more workers then cpu's"); - - cpu = 0; - TAILQ_INIT(&kore_workers); - for (i = 0; i < worker_count; i++) { - kore_worker_spawn(cpu++); - if (cpu == cpu_count) - cpu = 0; - } -} - -static void +void kore_worker_spawn(u_int16_t cpu) { struct kore_worker *kw; @@ -516,78 +346,13 @@ kore_worker_spawn(u_int16_t cpu) TAILQ_INSERT_TAIL(&kore_workers, kw, list); } -static void -kore_worker_wait(int final) -{ - int r; - siginfo_t info; - struct kore_worker k, *kw, *next; - - memset(&info, 0, sizeof(info)); - if (final) - r = waitid(P_ALL, 0, &info, WEXITED); - else - r = waitid(P_ALL, 0, &info, WEXITED | WNOHANG); - if (r == -1) { - kore_debug("waitid(): %s", errno_s); - return; - } - - if (info.si_pid == 0) - return; - - for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { - next = TAILQ_NEXT(kw, list); - if (kw->pid != info.si_pid) - continue; - - k = *kw; - TAILQ_REMOVE(&kore_workers, kw, list); - kore_log(LOG_NOTICE, "worker %d (%d)-> status %d (%d)", - kw->id, info.si_pid, info.si_status, info.si_code); - free(kw); - - if (final) - continue; - - if (info.si_code == CLD_EXITED || - info.si_code == CLD_KILLED || - info.si_code == CLD_DUMPED) { - kore_log(LOG_NOTICE, - "worker %d (pid: %d) gone, respawning new one", - k.id, k.pid); - kore_worker_spawn(k.cpu); - } - } -} - -static void -kore_worker_setcpu(struct kore_worker *kw) -{ - cpu_set_t cpuset; - - CPU_ZERO(&cpuset); - CPU_SET(kw->cpu, &cpuset); - if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == -1) { - kore_debug("kore_worker_setcpu(): %s", errno_s); - } else { - kore_debug("kore_worker_setcpu(): worker %d on cpu %d", - kw->id, kw->cpu); - } -} - -static void +void kore_worker_entry(struct kore_worker *kw) { + int quit; char buf[16]; - struct epoll_event *events; struct connection *c, *cnext; struct kore_worker *k, *next; - int n, i, *fd, quit; - - snprintf(buf, sizeof(buf), "kore [wrk %d]", kw->id); - if (prctl(PR_SET_NAME, buf) == -1) - kore_debug("cannot set process title"); if (chroot(chroot_path) == -1) fatal("cannot chroot(): %s", errno_s); @@ -597,6 +362,8 @@ kore_worker_entry(struct kore_worker *kw) pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) fatal("unable to drop privileges"); + snprintf(buf, sizeof(buf), "kore [wrk %d]", kw->id); + kore_set_proctitle(buf); kore_worker_setcpu(kw); for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) { @@ -606,8 +373,6 @@ kore_worker_entry(struct kore_worker *kw) } mypid = kw->pid; - if ((efd = epoll_create(1000)) == -1) - fatal("epoll_create(): %s", errno_s); sig_recv = 0; signal(SIGHUP, kore_signal); @@ -618,8 +383,7 @@ kore_worker_entry(struct kore_worker *kw) TAILQ_INIT(&worker_clients); quit = 0; - kore_event(server.fd, EPOLLIN, &server); - events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); + kore_event_init(); kore_log(LOG_NOTICE, "worker %d going to work (CPU: %d)", kw->id, kw->cpu); @@ -632,40 +396,7 @@ kore_worker_entry(struct kore_worker *kw) sig_recv = 0; } - n = epoll_wait(efd, events, EPOLL_EVENTS, 100); - if (n == -1) { - if (errno == EINTR) - continue; - fatal("epoll_wait(): %s", errno_s); - } - - if (n > 0) - kore_debug("main(): %d sockets available", n); - - for (i = 0; i < n; i++) { - fd = (int *)events[i].data.ptr; - - if (events[i].events & EPOLLERR || - events[i].events & EPOLLHUP) { - if (*fd == server.fd) - fatal("error on server socket"); - - c = (struct connection *)events[i].data.ptr; - kore_server_disconnect(c); - continue; - } - - if (*fd == server.fd) { - if (!quit) - kore_server_accept(&server); - } else { - c = (struct connection *)events[i].data.ptr; - if (!kore_connection_handle(c, - events[i].events)) - kore_server_disconnect(c); - } - } - + kore_event_wait(quit); http_process(); for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { @@ -694,6 +425,131 @@ kore_worker_entry(struct kore_worker *kw) } static int +kore_server_sslstart(void) +{ + kore_debug("kore_server_sslstart()"); + + SSL_library_init(); + SSL_load_error_strings(); + ssl_ctx = SSL_CTX_new(SSLv23_server_method()); + if (ssl_ctx == NULL) { + kore_debug("SSL_ctx_new(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + + if (!SSL_CTX_use_certificate_chain_file(ssl_ctx, kore_certfile)) { + kore_debug("SSL_CTX_use_certificate_file(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + + if (!SSL_CTX_use_PrivateKey_file(ssl_ctx, kore_certkey, + SSL_FILETYPE_PEM)) { + kore_debug("SSL_CTX_use_PrivateKey_file(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + + SSL_CTX_set_mode(ssl_ctx, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_mode(ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); + SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2); + SSL_CTX_set_next_protos_advertised_cb(ssl_ctx, kore_ssl_npn_cb, NULL); + + return (KORE_RESULT_OK); +} + +static int +kore_server_bind(struct listener *l, const char *ip, int port) +{ + int on; + + kore_debug("kore_server_bind(%p, %s, %d)", l, ip, port); + + if ((l->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + kore_debug("socket(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + if (!kore_socket_nonblock(l->fd)) { + close(l->fd); + return (KORE_RESULT_ERROR); + } + + on = 1; + if (setsockopt(l->fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&on, + sizeof(on)) == -1) { + kore_debug("setsockopt(): %s", errno_s); + close(l->fd); + return (KORE_RESULT_ERROR); + } + + memset(&(l->sin), 0, sizeof(l->sin)); + l->sin.sin_family = AF_INET; + l->sin.sin_port = htons(port); + l->sin.sin_addr.s_addr = inet_addr(ip); + + if (bind(l->fd, (struct sockaddr *)&(l->sin), sizeof(l->sin)) == -1) { + close(l->fd); + kore_debug("bind(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + if (listen(l->fd, 50) == -1) { + close(l->fd); + kore_debug("listen(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} + +static void +kore_server_final_disconnect(struct connection *c) +{ + struct netbuf *nb, *next; + struct spdy_stream *s, *snext; + + kore_debug("kore_server_final_disconnect(%p)", c); + + if (c->ssl != NULL) + SSL_free(c->ssl); + + TAILQ_REMOVE(&disconnected, c, list); + close(c->fd); + if (c->inflate_started) + inflateEnd(&(c->z_inflate)); + if (c->deflate_started) + deflateEnd(&(c->z_deflate)); + + for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { + next = TAILQ_NEXT(nb, list); + TAILQ_REMOVE(&(c->send_queue), nb, list); + free(nb->buf); + free(nb); + } + + for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { + next = TAILQ_NEXT(nb, list); + TAILQ_REMOVE(&(c->recv_queue), nb, list); + free(nb->buf); + free(nb); + } + + for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { + snext = TAILQ_NEXT(s, list); + TAILQ_REMOVE(&(c->spdy_streams), s, list); + + if (s->hblock != NULL) { + if (s->hblock->header_block != NULL) + free(s->hblock->header_block); + free(s->hblock); + } + + free(s); + } + + free(c); +} + +static int kore_socket_nonblock(int fd) { int flags; @@ -726,25 +582,6 @@ kore_ssl_npn_cb(SSL *ssl, const u_char **data, unsigned int *len, void *arg) } static void -kore_event(int fd, int flags, void *udata) -{ - struct epoll_event evt; - - kore_debug("kore_event(%d, %d, %p)", fd, flags, udata); - - evt.events = flags; - evt.data.ptr = udata; - if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &evt) == -1) { - if (errno == EEXIST) { - if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &evt) == -1) - fatal("epoll_ctl() MOD: %s", errno_s); - } else { - fatal("epoll_ctl() ADD: %s", errno_s); - } - } -} - -static void kore_write_mypid(void) { FILE *fp; diff --git a/src/linux.c b/src/linux.c @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> +#include <sys/epoll.h> +#include <sys/prctl.h> +#include <sys/wait.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <errno.h> +#include <fcntl.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sched.h> +#include <syslog.h> +#include <unistd.h> +#include <time.h> +#include <regex.h> +#include <zlib.h> +#include <unistd.h> + +#include "spdy.h" +#include "kore.h" +#include "http.h" + +#define EPOLL_EVENTS 500 + +static int efd = -1; +static struct epoll_event *events = NULL; + +void +kore_init(void) +{ + if ((cpu_count = sysconf(_SC_NPROCESSORS_ONLN)) == -1) { + kore_debug("could not get number of cpu's falling back to 1"); + cpu_count = 1; + } +} + +void +kore_worker_init(void) +{ + u_int16_t i, cpu; + + if (worker_count == 0) + fatal("no workers specified"); + + kore_debug("kore_worker_init(): system has %d cpu's", cpu_count); + kore_debug("kore_worker_init(): starting %d workers", worker_count); + if (worker_count > cpu_count) + kore_debug("kore_worker_init(): more workers then cpu's"); + + cpu = 0; + TAILQ_INIT(&kore_workers); + for (i = 0; i < worker_count; i++) { + kore_worker_spawn(cpu++); + if (cpu == cpu_count) + cpu = 0; + } +} + +void +kore_worker_wait(int final) +{ + int r; + siginfo_t info; + struct kore_worker k, *kw, *next; + + memset(&info, 0, sizeof(info)); + if (final) + r = waitid(P_ALL, 0, &info, WEXITED); + else + r = waitid(P_ALL, 0, &info, WEXITED | WNOHANG); + if (r == -1) { + kore_debug("waitid(): %s", errno_s); + return; + } + + if (info.si_pid == 0) + return; + + for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { + next = TAILQ_NEXT(kw, list); + if (kw->pid != info.si_pid) + continue; + + k = *kw; + TAILQ_REMOVE(&kore_workers, kw, list); + kore_log(LOG_NOTICE, "worker %d (%d)-> status %d (%d)", + kw->id, info.si_pid, info.si_status, info.si_code); + free(kw); + + if (final) + continue; + + if (info.si_code == CLD_EXITED || + info.si_code == CLD_KILLED || + info.si_code == CLD_DUMPED) { + kore_log(LOG_NOTICE, + "worker %d (pid: %d) gone, respawning new one", + k.id, k.pid); + kore_worker_spawn(k.cpu); + } + } +} + +void +kore_worker_setcpu(struct kore_worker *kw) +{ + cpu_set_t cpuset; + + CPU_ZERO(&cpuset); + CPU_SET(kw->cpu, &cpuset); + if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == -1) { + kore_debug("kore_worker_setcpu(): %s", errno_s); + } else { + kore_debug("kore_worker_setcpu(): worker %d on cpu %d", + kw->id, kw->cpu); + } +} + +void +kore_event_init(void) +{ + if ((efd = epoll_create(1000)) == -1) + fatal("epoll_create(): %s", errno_s); + + events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); + kore_event_schedule(server.fd, EPOLLIN, 0, &server); +} + +void +kore_event_wait(int quit) +{ + struct connection *c; + int n, i, *fd; + + n = epoll_wait(efd, events, EPOLL_EVENTS, 100); + if (n == -1) { + if (errno == EINTR) + return; + fatal("epoll_wait(): %s", errno_s); + } + + if (n > 0) + kore_debug("main(): %d sockets available", n); + + for (i = 0; i < n; i++) { + fd = (int *)events[i].data.ptr; + + if (events[i].events & EPOLLERR || + events[i].events & EPOLLHUP) { + if (*fd == server.fd) + fatal("error on server socket"); + + c = (struct connection *)events[i].data.ptr; + kore_server_disconnect(c); + continue; + } + + if (*fd == server.fd) { + if (!quit) { + kore_server_accept(&server, &c); + if (c == NULL) + continue; + + kore_event_schedule(c->fd, + EPOLLIN | EPOLLOUT | EPOLLET, 0, c); + } + } else { + c = (struct connection *)events[i].data.ptr; + if (events[i].events & EPOLLIN) + c->flags |= CONN_READ_POSSIBLE; + if (events[i].events & EPOLLOUT) + c->flags |= CONN_WRITE_POSSIBLE; + + if (!kore_connection_handle(c)) + kore_server_disconnect(c); + } + } +} + +void +kore_event_schedule(int fd, int type, int flags, void *udata) +{ + struct epoll_event evt; + + kore_debug("kore_event(%d, %d, %d, %p)", fd, type, flags, udata); + + evt.events = type; + evt.data.ptr = udata; + if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &evt) == -1) { + if (errno == EEXIST) { + if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &evt) == -1) + fatal("epoll_ctl() MOD: %s", errno_s); + } else { + fatal("epoll_ctl() ADD: %s", errno_s); + } + } +} + +void +kore_set_proctitle(char *title) +{ + if (prctl(PR_SET_NAME, title) == -1) + kore_debug("prctl(): %s", errno_s); +} diff --git a/src/net.c b/src/net.c @@ -18,7 +18,6 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/queue.h> -#include <sys/epoll.h> #include <netinet/in.h> #include <arpa/inet.h>