kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 3e5c17b8a32c7fb77b9975876a14737372aa4af4
parent cff44cd5f31e10ae0aa2229754759253d99e512f
Author: Joris Vink <joris@coders.se>
Date:   Wed, 26 Jun 2013 11:18:32 +0200

refactor code quite a bit.

Diffstat:
Makefile | 3++-
includes/kore.h | 30++++++++++++++++++++----------
modules/example/module.conf | 8+++++---
src/bsd.c | 46++++++++++++++++------------------------------
src/config.c | 23++++++++++++++++++++---
src/connection.c | 246+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/http.c | 4++--
src/kore.c | 400++++++++-----------------------------------------------------------------------
src/linux.c | 47+++++++++++++----------------------------------
src/utils.c | 2+-
src/worker.c | 196+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
11 files changed, 559 insertions(+), 446 deletions(-)

diff --git a/Makefile b/Makefile @@ -4,7 +4,8 @@ CC=gcc BIN=kore S_SRC+= src/kore.c src/buf.c src/config.c src/net.c src/spdy.c src/http.c \ - src/accesslog.c src/domain.c src/module.c src/utils.c src/zlib_dict.c + src/accesslog.c src/domain.c src/module.c src/utils.c \ + src/worker.c src/connection.c src/zlib_dict.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-I/usr/local/ssl/include diff --git a/includes/kore.h b/includes/kore.h @@ -140,7 +140,7 @@ struct buf_vec { u_int32_t length; }; -extern pid_t mypid; +extern pid_t kore_pid; extern int kore_debug; extern int server_port; extern char *server_ip; @@ -148,6 +148,7 @@ extern char *chroot_path; extern char *runas_user; extern char *kore_module_onload; extern char *kore_pidfile; +extern char *config_file; extern u_int16_t cpu_count; extern u_int8_t worker_count; @@ -157,34 +158,43 @@ extern struct kore_worker *worker; extern struct kore_worker_h kore_workers; extern struct kore_domain_h domains; extern struct kore_domain *primary_dom; +extern struct passwd *pw; +void kore_signal(int); void kore_worker_init(void); -void kore_worker_wait(int); -void kore_event_init(void); -void kore_event_wait(int); +void kore_worker_connection_add(struct connection *); +void kore_worker_connection_move(struct connection *c); + +void kore_platform_event_init(void); +void kore_platform_event_wait(int); +void kore_platform_worker_wait(int); +void kore_platform_proctitle(char *); +void kore_platform_event_schedule(int, int, int, void *); +void kore_platform_worker_setcpu(struct kore_worker *); + void kore_platform_init(void); void kore_accesslog_init(void); int kore_accesslog_wait(void); -void kore_set_proctitle(char *); void kore_worker_spawn(u_int16_t); void kore_accesslog_worker_init(void); void kore_worker_entry(struct kore_worker *); -void kore_worker_setcpu(struct kore_worker *); -void kore_event_schedule(int, int, int, void *); -int kore_connection_handle(struct connection *); -int kore_server_accept(struct listener *, struct connection **); int kore_ssl_sni_cb(SSL *, int *, void *); int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); +int kore_connection_handle(struct connection *); +void kore_connection_remove(struct connection *); +void kore_connection_disconnect(struct connection *); +int kore_connection_accept(struct listener *, struct connection **); + u_int64_t kore_time_ms(void); void kore_log_init(void); void *kore_malloc(size_t); +void kore_parse_config(void); void *kore_calloc(size_t, size_t); void *kore_realloc(void *, size_t); time_t kore_date_to_time(char *); char *kore_time_to_date(time_t); char *kore_strdup(const char *); -void kore_parse_config(const char *); void kore_log(int, const char *, ...); void kore_strlcpy(char *, const char *, size_t); void kore_server_disconnect(struct connection *); diff --git a/modules/example/module.conf b/modules/example/module.conf @@ -1,10 +1,12 @@ # Example Kore configuration # Server configuration. -bind 10.211.55.3 443 +#bind 10.211.55.3 443 +bind 92.243.14.169 443 # The path worker processes will chroot too after starting. -chroot /home/joris/src/kore +#chroot /home/joris/src/kore +chroot /tmp # Worker processes will run as the specified user. runas joris @@ -40,7 +42,7 @@ load modules/example/example.module # handler path module_callback # Example domain that responds to 10.211.55.33. -domain 10.211.55.3 { +domain 92.243.14.169 { certfile cert/server.crt certkey cert/server.key accesslog /var/log/kore_access.log diff --git a/src/bsd.c b/src/bsd.c @@ -57,22 +57,7 @@ kore_platform_init(void) } void -kore_worker_init(void) -{ - u_int16_t i; - - if (worker_count == 0) - fatal("no workers specified"); - - kore_debug("kore_worker_init(): starting %d workers", worker_count); - - TAILQ_INIT(&kore_workers); - for (i = 0; i < worker_count; i++) - kore_worker_spawn(0); -} - -void -kore_worker_wait(int final) +kore_platform_worker_wait(int final) { pid_t pid; int status; @@ -116,12 +101,12 @@ kore_worker_wait(int final) } void -kore_worker_setcpu(struct kore_worker *kw) +kore_platform_worker_setcpu(struct kore_worker *kw) { } void -kore_event_init(void) +kore_platform_event_init(void) { if ((kfd = kqueue()) == -1) fatal("kqueue(): %s", errno_s); @@ -129,11 +114,11 @@ kore_event_init(void) nchanges = 0; events = kore_calloc(KQUEUE_EVENTS, sizeof(struct kevent)); changelist = kore_calloc(KQUEUE_EVENTS, sizeof(struct kevent)); - kore_event_schedule(server.fd, EVFILT_READ, EV_ADD, &server); + kore_platform_event_schedule(server.fd, EVFILT_READ, EV_ADD, &server); } void -kore_event_wait(int quit) +kore_platform_event_wait(int quit) { struct connection *c; int n, i, *fd; @@ -158,20 +143,20 @@ kore_event_wait(int quit) fatal("error on server socket"); c = (struct connection *)events[i].udata; - kore_server_disconnect(c); + kore_connection_disconnect(c); continue; } if (*fd == server.fd) { if (!quit) { - kore_server_accept(&server, &c); + kore_connection_accept(&server, &c); if (c == NULL) continue; - kore_event_schedule(c->fd, EVFILT_READ, + kore_plaform_event_schedule(c->fd, EVFILT_READ, EV_ADD, c); - kore_event_schedule(c->fd, EVFILT_WRITE, - EV_ADD | EV_ONESHOT, c); + kore_platform_event_schedule(c->fd, + EVFILT_WRITE, EV_ADD | EV_ONESHOT, c); } } else { c = (struct connection *)events[i].udata; @@ -181,11 +166,12 @@ kore_event_wait(int quit) c->flags |= CONN_WRITE_POSSIBLE; if (!kore_connection_handle(c)) { - kore_server_disconnect(c); + kore_connection_disconnect(c); } else { if (!TAILQ_EMPTY(&(c->send_queue))) { - kore_event_schedule(c->fd, EVFILT_WRITE, - EV_ADD | EV_ONESHOT, c); + kore_platform_event_schedule(c->fd, + EVFILT_WRITE, EV_ADD | EV_ONESHOT, + c); } } } @@ -193,7 +179,7 @@ kore_event_wait(int quit) } void -kore_event_schedule(int fd, int type, int flags, void *data) +kore_platform_event_schedule(int fd, int type, int flags, void *data) { if (nchanges >= KQUEUE_EVENTS) { kore_log(LOG_WARNING, "cannot schedule %d (%d) on %d", @@ -205,7 +191,7 @@ kore_event_schedule(int fd, int type, int flags, void *data) } void -kore_set_proctitle(char *title) +kore_platform_proctitle(char *title) { setproctitle("%s", title); } diff --git a/src/config.c b/src/config.c @@ -28,6 +28,7 @@ #include <ctype.h> #include <errno.h> #include <fcntl.h> +#include <pwd.h> #include <stdarg.h> #include <stdio.h> #include <stdlib.h> @@ -72,17 +73,21 @@ static struct { { NULL, NULL }, }; +char *config_file = NULL; static struct kore_domain *current_domain = NULL; void -kore_parse_config(const char *config_path) +kore_parse_config(void) { FILE *fp; int i, lineno; char buf[BUFSIZ], *p, *t, *argv[5]; - if ((fp = fopen(config_path, "r")) == NULL) - fatal("configuration given cannot be opened: %s", config_path); + if (config_file == NULL) + fatal("specify a configuration file with -c"); + + if ((fp = fopen(config_file, "r")) == NULL) + fatal("configuration given cannot be opened: %s", config_file); lineno = 1; while (fgets(buf, sizeof(buf), fp) != NULL) { @@ -117,6 +122,18 @@ kore_parse_config(const char *config_path) lineno++; } + + if (!kore_module_loaded()) + fatal("no site module was loaded"); + + if (server_ip == NULL || server_port == 0) + fatal("missing a correct bind directive in configuration"); + if (chroot_path == NULL) + fatal("missing a chroot path"); + if (runas_user == NULL) + fatal("missing a username to run as"); + if ((pw = getpwnam(runas_user)) == NULL) + fatal("user '%s' does not exist", runas_user); } static int diff --git a/src/connection.c b/src/connection.c @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> +#include <sys/wait.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <pwd.h> +#include <errno.h> +#include <grp.h> +#include <fcntl.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sched.h> +#include <syslog.h> +#include <unistd.h> +#include <time.h> +#include <regex.h> +#include <zlib.h> +#include <unistd.h> + +#include "spdy.h" +#include "kore.h" +#include "http.h" + +static int kore_connection_nonblock(int); + +int +kore_connection_accept(struct listener *l, struct connection **out) +{ + socklen_t len; + struct connection *c; + + kore_debug("kore_connection_accept(%p)", l); + + *out = NULL; + len = sizeof(struct sockaddr_in); + c = (struct connection *)kore_malloc(sizeof(*c)); + if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { + free(c); + kore_debug("accept(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + if (!kore_connection_nonblock(c->fd)) { + close(c->fd); + free(c); + return (KORE_RESULT_ERROR); + } + + c->owner = l; + c->ssl = NULL; + c->flags = 0; + c->inflate_started = 0; + c->deflate_started = 0; + c->client_stream_id = 0; + c->proto = CONN_PROTO_UNKNOWN; + c->state = CONN_STATE_SSL_SHAKE; + + TAILQ_INIT(&(c->send_queue)); + TAILQ_INIT(&(c->recv_queue)); + TAILQ_INIT(&(c->spdy_streams)); + kore_worker_connection_add(c); + + *out = c; + return (KORE_RESULT_OK); +} + +void +kore_connection_disconnect(struct connection *c) +{ + if (c->state != CONN_STATE_DISCONNECTING) { + kore_debug("preparing %p for disconnection", c); + c->state = CONN_STATE_DISCONNECTING; + kore_worker_connection_move(c); + } +} + +int +kore_connection_handle(struct connection *c) +{ + int r; + u_int32_t len; + const u_char *data; + + kore_debug("kore_connection_handle(%p)", c); + + switch (c->state) { + case CONN_STATE_SSL_SHAKE: + if (c->ssl == NULL) { + c->ssl = SSL_new(primary_dom->ssl_ctx); + if (c->ssl == NULL) { + kore_debug("SSL_new(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + + SSL_set_fd(c->ssl, c->fd); + } + + r = SSL_accept(c->ssl); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + return (KORE_RESULT_OK); + default: + kore_debug("SSL_accept(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + } + + r = SSL_get_verify_result(c->ssl); + if (r != X509_V_OK) { + kore_debug("SSL_get_verify_result(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + + SSL_get0_next_proto_negotiated(c->ssl, &data, &len); + if (data) { + if (!memcmp(data, "spdy/3", 6)) + kore_debug("using SPDY/3"); + c->proto = CONN_PROTO_SPDY; + net_recv_queue(c, SPDY_FRAME_SIZE, 0, + NULL, spdy_frame_recv); + } else { + kore_debug("using HTTP/1.1"); + c->proto = CONN_PROTO_HTTP; + net_recv_queue(c, HTTP_HEADER_MAX_LEN, + NETBUF_CALL_CB_ALWAYS, NULL, + http_header_recv); + } + + c->state = CONN_STATE_ESTABLISHED; + /* FALLTHROUGH */ + case CONN_STATE_ESTABLISHED: + if (c->flags & CONN_READ_POSSIBLE) { + if (!net_recv_flush(c)) + return (KORE_RESULT_ERROR); + } + + if (c->flags & CONN_WRITE_POSSIBLE) { + if (!net_send_flush(c)) + return (KORE_RESULT_ERROR); + } + break; + case CONN_STATE_DISCONNECTING: + break; + default: + kore_debug("unknown state on %d (%d)", c->fd, c->state); + break; + } + + return (KORE_RESULT_OK); +} + +void +kore_connection_remove(struct connection *c) +{ + struct netbuf *nb, *next; + struct spdy_stream *s, *snext; + + kore_debug("kore_connection_remove(%p)", c); + + if (c->ssl != NULL) + SSL_free(c->ssl); + + close(c->fd); + if (c->inflate_started) + inflateEnd(&(c->z_inflate)); + if (c->deflate_started) + deflateEnd(&(c->z_deflate)); + + for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { + next = TAILQ_NEXT(nb, list); + TAILQ_REMOVE(&(c->send_queue), nb, list); + free(nb->buf); + free(nb); + } + + for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { + next = TAILQ_NEXT(nb, list); + TAILQ_REMOVE(&(c->recv_queue), nb, list); + free(nb->buf); + free(nb); + } + + for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { + snext = TAILQ_NEXT(s, list); + TAILQ_REMOVE(&(c->spdy_streams), s, list); + + if (s->hblock != NULL) { + if (s->hblock->header_block != NULL) + free(s->hblock->header_block); + free(s->hblock); + } + + free(s); + } + + free(c); +} + +static int +kore_connection_nonblock(int fd) +{ + int flags; + + kore_debug("kore_connection_nonblock(%d)", fd); + + if ((flags = fcntl(fd, F_GETFL, 0)) == -1) { + kore_debug("fcntl(): F_GETFL %s", errno_s); + return (KORE_RESULT_ERROR); + } + + flags |= O_NONBLOCK; + if (fcntl(fd, F_SETFL, flags) == -1) { + kore_debug("fcntl(): F_SETFL %s", errno_s); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} diff --git a/src/http.c b/src/http.c @@ -138,10 +138,10 @@ http_process(void) case KORE_RESULT_OK: r = net_send_flush(req->owner); if (r == KORE_RESULT_ERROR) - kore_server_disconnect(req->owner); + kore_connection_disconnect(req->owner); break; case KORE_RESULT_ERROR: - kore_server_disconnect(req->owner); + kore_connection_disconnect(req->owner); break; case KORE_RESULT_RETRY: break; diff --git a/src/kore.c b/src/kore.c @@ -47,16 +47,11 @@ #include "http.h" volatile sig_atomic_t sig_recv; -static TAILQ_HEAD(, connection) disconnected; -static TAILQ_HEAD(, connection) worker_clients; -static struct passwd *pw = NULL; -static u_int16_t workerid = 0; struct listener server; -pid_t mypid = -1; +struct passwd *pw = NULL; +pid_t kore_pid = -1; u_int16_t cpu_count = 1; -struct kore_worker_h kore_workers; -struct kore_worker *worker = NULL; int kore_debug = 0; int server_port = 0; u_int8_t worker_count = 0; @@ -66,11 +61,9 @@ char *chroot_path = NULL; char *kore_pidfile = KORE_PIDFILE_DEFAULT; static void usage(void); -static void kore_signal(int); -static void kore_write_mypid(void); -static int kore_socket_nonblock(int); +static void kore_server_start(void); +static void kore_write_kore_pid(void); static void kore_server_sslstart(void); -static void kore_server_final_disconnect(struct connection *); static int kore_server_bind(struct listener *, const char *, int); static void @@ -85,13 +78,11 @@ main(int argc, char *argv[]) { int ch; struct kore_worker *kw, *next; - char *config_file; if (getuid() != 0) fatal("kore must be started as root"); kore_debug = 0; - config_file = NULL; while ((ch = getopt(argc, argv, "c:d")) != -1) { switch (ch) { case 'c': @@ -108,49 +99,20 @@ main(int argc, char *argv[]) argc -= optind; argv += optind; - if (config_file == NULL) - fatal("please specify a configuration file to use (-c)"); - - mypid = getpid(); - + kore_pid = getpid(); kore_domain_init(); kore_server_sslstart(); - kore_parse_config(config_file); - if (!kore_module_loaded()) - fatal("no site module was loaded"); - - if (server_ip == NULL || server_port == 0) - fatal("missing a correct bind directive in configuration"); - if (chroot_path == NULL) - fatal("missing a chroot path"); - if (runas_user == NULL) - fatal("missing a username to run as"); - if ((pw = getpwnam(runas_user)) == NULL) - fatal("user '%s' does not exist", runas_user); - + kore_parse_config(); kore_log_init(); kore_platform_init(); kore_accesslog_init(); - if (!kore_server_bind(&server, server_ip, server_port)) - fatal("cannot bind to %s:%d", server_ip, server_port); - if (daemon(1, 1) == -1) - fatal("cannot daemon(): %s", errno_s); - - mypid = getpid(); - kore_write_mypid(); - - kore_log(LOG_NOTICE, "kore is starting up"); - kore_worker_init(); - kore_set_proctitle("kore [parent]"); - sig_recv = 0; signal(SIGHUP, kore_signal); signal(SIGQUIT, kore_signal); - free(server_ip); - free(runas_user); + kore_server_start(); for (;;) { if (sig_recv != 0) { @@ -170,7 +132,7 @@ main(int argc, char *argv[]) if (!kore_accesslog_wait()) break; - kore_worker_wait(0); + kore_platform_worker_wait(0); } for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { @@ -181,7 +143,7 @@ main(int argc, char *argv[]) kore_log(LOG_NOTICE, "waiting for workers to drain and finish"); while (!TAILQ_EMPTY(&kore_workers)) - kore_worker_wait(1); + kore_platform_worker_wait(1); kore_log(LOG_NOTICE, "server shutting down"); unlink(kore_pidfile); @@ -191,238 +153,6 @@ main(int argc, char *argv[]) } int -kore_server_accept(struct listener *l, struct connection **out) -{ - socklen_t len; - struct connection *c; - - kore_debug("kore_server_accept(%p)", l); - - *out = NULL; - len = sizeof(struct sockaddr_in); - c = (struct connection *)kore_malloc(sizeof(*c)); - if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { - free(c); - kore_debug("accept(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - if (!kore_socket_nonblock(c->fd)) { - close(c->fd); - free(c); - return (KORE_RESULT_ERROR); - } - - c->owner = l; - c->ssl = NULL; - c->flags = 0; - c->inflate_started = 0; - c->deflate_started = 0; - c->client_stream_id = 0; - c->proto = CONN_PROTO_UNKNOWN; - c->state = CONN_STATE_SSL_SHAKE; - - TAILQ_INIT(&(c->send_queue)); - TAILQ_INIT(&(c->recv_queue)); - TAILQ_INIT(&(c->spdy_streams)); - TAILQ_INSERT_TAIL(&worker_clients, c, list); - - *out = c; - return (KORE_RESULT_OK); -} - -void -kore_server_disconnect(struct connection *c) -{ - if (c->state != CONN_STATE_DISCONNECTING) { - kore_debug("preparing %p for disconnection", c); - c->state = CONN_STATE_DISCONNECTING; - TAILQ_REMOVE(&worker_clients, c, list); - TAILQ_INSERT_TAIL(&disconnected, c, list); - } -} - -int -kore_connection_handle(struct connection *c) -{ - int r; - u_int32_t len; - const u_char *data; - - kore_debug("kore_connection_handle(%p)", c); - - switch (c->state) { - case CONN_STATE_SSL_SHAKE: - if (c->ssl == NULL) { - c->ssl = SSL_new(primary_dom->ssl_ctx); - if (c->ssl == NULL) { - kore_debug("SSL_new(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - - SSL_set_fd(c->ssl, c->fd); - } - - r = SSL_accept(c->ssl); - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - return (KORE_RESULT_OK); - default: - kore_debug("SSL_accept(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - } - - r = SSL_get_verify_result(c->ssl); - if (r != X509_V_OK) { - kore_debug("SSL_get_verify_result(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - - SSL_get0_next_proto_negotiated(c->ssl, &data, &len); - if (data) { - if (!memcmp(data, "spdy/3", 6)) - kore_debug("using SPDY/3"); - c->proto = CONN_PROTO_SPDY; - net_recv_queue(c, SPDY_FRAME_SIZE, 0, - NULL, spdy_frame_recv); - } else { - kore_debug("using HTTP/1.1"); - c->proto = CONN_PROTO_HTTP; - net_recv_queue(c, HTTP_HEADER_MAX_LEN, - NETBUF_CALL_CB_ALWAYS, NULL, - http_header_recv); - } - - c->state = CONN_STATE_ESTABLISHED; - /* FALLTHROUGH */ - case CONN_STATE_ESTABLISHED: - if (c->flags & CONN_READ_POSSIBLE) { - if (!net_recv_flush(c)) - return (KORE_RESULT_ERROR); - } - - if (c->flags & CONN_WRITE_POSSIBLE) { - if (!net_send_flush(c)) - return (KORE_RESULT_ERROR); - } - break; - case CONN_STATE_DISCONNECTING: - break; - default: - kore_debug("unknown state on %d (%d)", c->fd, c->state); - break; - } - - return (KORE_RESULT_OK); -} - -void -kore_worker_spawn(u_int16_t cpu) -{ - struct kore_worker *kw; - - kw = (struct kore_worker *)kore_malloc(sizeof(*kw)); - kw->id = workerid++; - kw->cpu = cpu; - kw->pid = fork(); - if (kw->pid == -1) - fatal("could not spawn worker child: %s", errno_s); - - if (kw->pid == 0) { - kw->pid = getpid(); - kore_worker_entry(kw); - /* NOTREACHED */ - } - - TAILQ_INSERT_TAIL(&kore_workers, kw, list); -} - -void -kore_worker_entry(struct kore_worker *kw) -{ - int quit; - char buf[16]; - struct connection *c, *cnext; - struct kore_worker *k, *next; - - worker = kw; - - if (chroot(chroot_path) == -1) - fatal("cannot chroot(): %s", errno_s); - if (chdir("/") == -1) - fatal("cannot chdir(): %s", errno_s); - if (setgroups(1, &pw->pw_gid) || setresgid(pw->pw_gid, pw->pw_gid, - pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) - fatal("unable to drop privileges"); - - snprintf(buf, sizeof(buf), "kore [wrk %d]", kw->id); - kore_set_proctitle(buf); - kore_worker_setcpu(kw); - - for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) { - next = TAILQ_NEXT(k, list); - TAILQ_REMOVE(&kore_workers, k, list); - free(k); - } - - mypid = kw->pid; - - sig_recv = 0; - signal(SIGHUP, kore_signal); - signal(SIGQUIT, kore_signal); - signal(SIGPIPE, SIG_IGN); - - http_init(); - TAILQ_INIT(&disconnected); - TAILQ_INIT(&worker_clients); - - quit = 0; - kore_event_init(); - kore_accesslog_worker_init(); - - kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu); - for (;;) { - if (sig_recv != 0) { - if (sig_recv == SIGHUP) - kore_module_reload(); - else if (sig_recv == SIGQUIT) - quit = 1; - sig_recv = 0; - } - - kore_event_wait(quit); - http_process(); - - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - kore_server_final_disconnect(c); - } - - if (quit && http_request_count == 0) - break; - } - - for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - net_send_flush(c); - kore_server_final_disconnect(c); - } - - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - net_send_flush(c); - kore_server_final_disconnect(c); - } - - kore_debug("worker %d shutting down", kw->id); - exit(0); -} - -int kore_ssl_npn_cb(SSL *ssl, const u_char **data, unsigned int *len, void *arg) { kore_debug("kore_ssl_npn_cb(): sending protocols"); @@ -451,6 +181,12 @@ kore_ssl_sni_cb(SSL *ssl, int *ad, void *arg) return (SSL_TLSEXT_ERR_NOACK); } +void +kore_signal(int sig) +{ + sig_recv = sig; +} + static void kore_server_sslstart(void) { @@ -460,6 +196,26 @@ kore_server_sslstart(void) SSL_load_error_strings(); } +static void +kore_server_start(void) +{ + if (!kore_server_bind(&server, server_ip, server_port)) + fatal("cannot bind to %s:%d", server_ip, server_port); + if (daemon(1, 1) == -1) + fatal("cannot daemon(): %s", errno_s); + + kore_pid = getpid(); + kore_write_kore_pid(); + + kore_log(LOG_NOTICE, "kore is starting up"); + kore_platform_proctitle("kore [parent]"); + + kore_worker_init(); + + free(server_ip); + free(runas_user); +} + static int kore_server_bind(struct listener *l, const char *ip, int port) { @@ -472,11 +228,6 @@ kore_server_bind(struct listener *l, const char *ip, int port) return (KORE_RESULT_ERROR); } - if (!kore_socket_nonblock(l->fd)) { - close(l->fd); - return (KORE_RESULT_ERROR); - } - on = 1; if (setsockopt(l->fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&on, sizeof(on)) == -1) { @@ -506,89 +257,14 @@ kore_server_bind(struct listener *l, const char *ip, int port) } static void -kore_server_final_disconnect(struct connection *c) -{ - struct netbuf *nb, *next; - struct spdy_stream *s, *snext; - - kore_debug("kore_server_final_disconnect(%p)", c); - - if (c->ssl != NULL) - SSL_free(c->ssl); - - TAILQ_REMOVE(&disconnected, c, list); - close(c->fd); - if (c->inflate_started) - inflateEnd(&(c->z_inflate)); - if (c->deflate_started) - deflateEnd(&(c->z_deflate)); - - for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { - next = TAILQ_NEXT(nb, list); - TAILQ_REMOVE(&(c->send_queue), nb, list); - free(nb->buf); - free(nb); - } - - for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { - next = TAILQ_NEXT(nb, list); - TAILQ_REMOVE(&(c->recv_queue), nb, list); - free(nb->buf); - free(nb); - } - - for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { - snext = TAILQ_NEXT(s, list); - TAILQ_REMOVE(&(c->spdy_streams), s, list); - - if (s->hblock != NULL) { - if (s->hblock->header_block != NULL) - free(s->hblock->header_block); - free(s->hblock); - } - - free(s); - } - - free(c); -} - -static int -kore_socket_nonblock(int fd) -{ - int flags; - - kore_debug("kore_socket_nonblock(%d)", fd); - - if ((flags = fcntl(fd, F_GETFL, 0)) == -1) { - kore_debug("fcntl(): F_GETFL %s", errno_s); - return (KORE_RESULT_ERROR); - } - - flags |= O_NONBLOCK; - if (fcntl(fd, F_SETFL, flags) == -1) { - kore_debug("fcntl(): F_SETFL %s", errno_s); - return (KORE_RESULT_ERROR); - } - - return (KORE_RESULT_OK); -} - -static void -kore_write_mypid(void) +kore_write_kore_pid(void) { FILE *fp; if ((fp = fopen(kore_pidfile, "w+")) == NULL) { - kore_debug("kore_write_mypid(): fopen() %s", errno_s); + kore_debug("kore_write_kore_pid(): fopen() %s", errno_s); } else { - fprintf(fp, "%d\n", mypid); + fprintf(fp, "%d\n", kore_pid); fclose(fp); } } - -static void -kore_signal(int sig) -{ - sig_recv = sig; -} diff --git a/src/linux.c b/src/linux.c @@ -61,29 +61,7 @@ kore_platform_init(void) } void -kore_worker_init(void) -{ - u_int16_t i, cpu; - - if (worker_count == 0) - fatal("no workers specified"); - - kore_debug("kore_worker_init(): system has %d cpu's", cpu_count); - kore_debug("kore_worker_init(): starting %d workers", worker_count); - if (worker_count > cpu_count) - kore_debug("kore_worker_init(): more workers then cpu's"); - - cpu = 0; - TAILQ_INIT(&kore_workers); - for (i = 0; i < worker_count; i++) { - kore_worker_spawn(cpu++); - if (cpu == cpu_count) - cpu = 0; - } -} - -void -kore_worker_wait(int final) +kore_platform_worker_wait(int final) { int r; siginfo_t info; @@ -128,7 +106,7 @@ kore_worker_wait(int final) } void -kore_worker_setcpu(struct kore_worker *kw) +kore_platform_worker_setcpu(struct kore_worker *kw) { cpu_set_t cpuset; @@ -143,17 +121,17 @@ kore_worker_setcpu(struct kore_worker *kw) } void -kore_event_init(void) +kore_platform_event_init(void) { if ((efd = epoll_create(1000)) == -1) fatal("epoll_create(): %s", errno_s); events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); - kore_event_schedule(server.fd, EPOLLIN, 0, &server); + kore_platform_event_schedule(server.fd, EPOLLIN, 0, &server); } void -kore_event_wait(int quit) +kore_platform_event_wait(int quit) { struct connection *c; int n, i, *fd; @@ -177,17 +155,17 @@ kore_event_wait(int quit) fatal("error on server socket"); c = (struct connection *)events[i].data.ptr; - kore_server_disconnect(c); + kore_connection_disconnect(c); continue; } if (*fd == server.fd) { if (!quit) { - kore_server_accept(&server, &c); + kore_connection_accept(&server, &c); if (c == NULL) continue; - kore_event_schedule(c->fd, + kore_platform_event_schedule(c->fd, EPOLLIN | EPOLLOUT | EPOLLET, 0, c); } } else { @@ -198,17 +176,18 @@ kore_event_wait(int quit) c->flags |= CONN_WRITE_POSSIBLE; if (!kore_connection_handle(c)) - kore_server_disconnect(c); + kore_connection_disconnect(c); } } } void -kore_event_schedule(int fd, int type, int flags, void *udata) +kore_platform_event_schedule(int fd, int type, int flags, void *udata) { struct epoll_event evt; - kore_debug("kore_event(%d, %d, %d, %p)", fd, type, flags, udata); + kore_debug("kore_platform_event(%d, %d, %d, %p)", + fd, type, flags, udata); evt.events = type; evt.data.ptr = udata; @@ -223,7 +202,7 @@ kore_event_schedule(int fd, int type, int flags, void *udata) } void -kore_set_proctitle(char *title) +kore_platform_proctitle(char *title) { if (prctl(PR_SET_NAME, title) == -1) kore_debug("prctl(): %s", errno_s); diff --git a/src/utils.c b/src/utils.c @@ -114,7 +114,7 @@ kore_debug_internal(char *file, int line, const char *fmt, ...) vsnprintf(buf, sizeof(buf), fmt, args); va_end(args); - printf("[%d] %s:%d - %s\n", mypid, file, line, buf); + printf("[%d] %s:%d - %s\n", kore_pid, file, line, buf); } void diff --git a/src/worker.c b/src/worker.c @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> +#include <sys/wait.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <pwd.h> +#include <errno.h> +#include <grp.h> +#include <fcntl.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sched.h> +#include <syslog.h> +#include <unistd.h> +#include <time.h> +#include <regex.h> +#include <zlib.h> +#include <unistd.h> + +#include "spdy.h" +#include "kore.h" +#include "http.h" + +static u_int16_t workerid = 0; +static TAILQ_HEAD(, connection) disconnected; +static TAILQ_HEAD(, connection) worker_clients; + +struct kore_worker_h kore_workers; +struct kore_worker *worker = NULL; + +extern volatile sig_atomic_t sig_recv; + +void +kore_worker_init(void) +{ + u_int16_t i, cpu; + + if (worker_count == 0) + fatal("no workers specified"); + + kore_debug("kore_worker_init(): system has %d cpu's", cpu_count); + kore_debug("kore_worker_init(): starting %d workers", worker_count); + if (worker_count > cpu_count) + kore_debug("kore_worker_init(): more workers then cpu's"); + + cpu = 0; + TAILQ_INIT(&kore_workers); + for (i = 0; i < worker_count; i++) { + kore_worker_spawn(cpu++); + if (cpu == cpu_count) + cpu = 0; + } +} + +void +kore_worker_spawn(u_int16_t cpu) +{ + struct kore_worker *kw; + + kw = (struct kore_worker *)kore_malloc(sizeof(*kw)); + kw->id = workerid++; + kw->cpu = cpu; + kw->pid = fork(); + if (kw->pid == -1) + fatal("could not spawn worker child: %s", errno_s); + + if (kw->pid == 0) { + kw->pid = getpid(); + kore_worker_entry(kw); + /* NOTREACHED */ + } + + TAILQ_INSERT_TAIL(&kore_workers, kw, list); +} + +void +kore_worker_entry(struct kore_worker *kw) +{ + int quit; + char buf[16]; + struct connection *c, *cnext; + struct kore_worker *k, *next; + + worker = kw; + + if (chroot(chroot_path) == -1) + fatal("cannot chroot(): %s", errno_s); + if (chdir("/") == -1) + fatal("cannot chdir(): %s", errno_s); + if (setgroups(1, &pw->pw_gid) || setresgid(pw->pw_gid, pw->pw_gid, + pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) + fatal("unable to drop privileges"); + + snprintf(buf, sizeof(buf), "kore [wrk %d]", kw->id); + kore_platform_proctitle(buf); + kore_platform_worker_setcpu(kw); + + for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) { + next = TAILQ_NEXT(k, list); + TAILQ_REMOVE(&kore_workers, k, list); + free(k); + } + + kore_pid = kw->pid; + + sig_recv = 0; + signal(SIGHUP, kore_signal); + signal(SIGQUIT, kore_signal); + signal(SIGPIPE, SIG_IGN); + + http_init(); + TAILQ_INIT(&disconnected); + TAILQ_INIT(&worker_clients); + + quit = 0; + kore_platform_event_init(); + kore_accesslog_worker_init(); + + kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu); + for (;;) { + if (sig_recv != 0) { + if (sig_recv == SIGHUP) + kore_module_reload(); + else if (sig_recv == SIGQUIT) + quit = 1; + sig_recv = 0; + } + + kore_platform_event_wait(quit); + http_process(); + + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + TAILQ_REMOVE(&disconnected, c, list); + kore_connection_remove(c); + } + + if (quit && http_request_count == 0) + break; + } + + for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + net_send_flush(c); + TAILQ_REMOVE(&worker_clients, c, list); + kore_connection_remove(c); + } + + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + net_send_flush(c); + TAILQ_REMOVE(&disconnected, c, list); + kore_connection_remove(c); + } + + kore_debug("worker %d shutting down", kw->id); + exit(0); +} + +void +kore_worker_connection_add(struct connection *c) +{ + TAILQ_INSERT_TAIL(&worker_clients, c, list); +} + +void +kore_worker_connection_move(struct connection *c) +{ + TAILQ_REMOVE(&worker_clients, c, list); + TAILQ_INSERT_TAIL(&disconnected, c, list); +}