kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 659e19f92f4e4e5584c62f72aefa2ba07562ad53
parent 20f02ced2309775c0e9247e482bc169c379d1984
Author: Joris Vink <joris@coders.se>
Date:   Sat, 27 Jul 2013 20:56:15 +0200

add IPv6 support and support for multiple listeners.

Diffstat:
includes/kore.h | 37+++++++++++++++++++++++++++++--------
modules/example/module.conf | 7++++---
src/accesslog.c | 22+++++++++++++++++++---
src/bsd.c | 43++++++++++++++++++++++++++++++-------------
src/config.c | 19+++----------------
src/connection.c | 18++++++++++++++----
src/kore.c | 128+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
src/linux.c | 34++++++++++++++++++++++++----------
8 files changed, 199 insertions(+), 109 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -75,11 +75,25 @@ struct netbuf { TAILQ_ENTRY(netbuf) list; }; +#define KORE_TYPE_LISTENER 1 +#define KORE_TYPE_CONNECTION 2 + struct listener { + u_int8_t type; + int fd; - struct sockaddr_in sin; + u_int8_t addrtype; + + union { + struct sockaddr_in ipv4; + struct sockaddr_in6 ipv6; + } addr; + + LIST_ENTRY(listener) list; }; +LIST_HEAD(listener_head, listener); + #define CONN_STATE_UNKNOWN 0 #define CONN_STATE_SSL_SHAKE 1 #define CONN_STATE_ESTABLISHED 2 @@ -98,14 +112,20 @@ struct listener { #define KORE_IDLE_TIMER_MAX 20000 struct connection { + u_int8_t type; int fd; u_int8_t state; u_int8_t proto; - struct sockaddr_in sin; void *owner; SSL *ssl; u_int8_t flags; + u_int8_t addrtype; + union { + struct sockaddr_in ipv4; + struct sockaddr_in6 ipv6; + } addr; + struct { u_int64_t length; u_int64_t start; @@ -202,8 +222,6 @@ struct kore_pool { extern pid_t kore_pid; extern int kore_debug; -extern int server_port; -extern char *server_ip; extern char *chroot_path; extern char *runas_user; extern char *kore_module_onload; @@ -211,13 +229,14 @@ extern char *kore_pidfile; extern char *config_file; extern char *kore_ssl_cipher_list; +extern u_int8_t nlisteners; extern u_int64_t spdy_idle_time; extern u_int16_t cpu_count; extern u_int8_t worker_count; extern u_int32_t worker_max_connections; extern u_int32_t worker_active_connections; -extern struct listener server; +extern struct listener_head listeners; extern struct kore_worker *worker; extern struct kore_domain_h domains; extern struct kore_domain *primary_dom; @@ -229,10 +248,13 @@ void kore_worker_wait(int); void kore_worker_init(void); void kore_worker_shutdown(void); void kore_worker_dispatch_signal(int); +void kore_worker_spawn(u_int16_t, u_int16_t); +void kore_worker_entry(struct kore_worker *); void kore_worker_connection_add(struct connection *); void kore_worker_connection_move(struct connection *); void kore_worker_connection_remove(struct connection *); +void kore_platform_init(void); void kore_platform_event_init(void); void kore_platform_event_wait(void); void kore_platform_proctitle(char *); @@ -241,13 +263,12 @@ void kore_platform_disable_accept(void); void kore_platform_event_schedule(int, int, int, void *); void kore_platform_worker_setcpu(struct kore_worker *); -void kore_platform_init(void); void kore_accesslog_init(void); int kore_accesslog_wait(void); -void kore_worker_spawn(u_int16_t, u_int16_t); void kore_accesslog_worker_init(void); -void kore_worker_entry(struct kore_worker *); + int kore_ssl_sni_cb(SSL *, int *, void *); +int kore_server_bind(const char *, const char *); int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); void kore_connection_init(void); diff --git a/modules/example/module.conf b/modules/example/module.conf @@ -1,7 +1,8 @@ # Example Kore configuration # Server configuration. -bind 10.211.55.3 443 +bind 127.0.0.1 443 +bind ::1 443 # The path worker processes will chroot too after starting. chroot /home/joris/src/kore @@ -50,8 +51,8 @@ load modules/example/example.module # Syntax: # handler path module_callback -# Example domain that responds to 10.211.55.3. -domain 10.211.55.3 { +# Example domain that responds to localhost. +domain localhost { certfile cert/server.crt certkey cert/server.key accesslog /var/log/kore_access.log diff --git a/src/accesslog.c b/src/accesslog.c @@ -29,7 +29,8 @@ struct kore_log_packet { u_int16_t time_req; u_int16_t worker_id; u_int16_t worker_cpu; - struct in_addr src; + u_int8_t addrtype; + u_int8_t addr[sizeof(struct in6_addr)]; char host[KORE_DOMAINNAME_LEN]; char path[HTTP_URI_LEN]; char agent[HTTP_USERAGENT_LEN]; @@ -59,6 +60,7 @@ kore_accesslog_wait(void) struct kore_domain *dom; struct pollfd pfd[1]; struct kore_log_packet logpacket; + char addr[INET6_ADDRSTRLEN]; char *method, buf[4096], *tbuf; pfd[0].fd = accesslog_fd[0]; @@ -97,10 +99,14 @@ kore_accesslog_wait(void) else method = "POST"; + if (inet_ntop(logpacket.addrtype, &(logpacket.addr), + addr, sizeof(addr)) == NULL) + kore_strlcpy(addr, "unknown", sizeof(addr)); + time(&now); tbuf = kore_time_to_date(now); snprintf(buf, sizeof(buf), "[%s] %s %d %s %s (w#%d) (%dms) (%s)\n", - tbuf, inet_ntoa(logpacket.src), logpacket.status, method, + tbuf, addr, logpacket.status, method, logpacket.path, logpacket.worker_id, logpacket.time_req, logpacket.agent); slen = strlen(buf); @@ -124,11 +130,21 @@ kore_accesslog(struct http_request *req) ssize_t len; struct kore_log_packet logpacket; + logpacket.addrtype = req->owner->addrtype; + if (logpacket.addrtype == AF_INET) { + memcpy(logpacket.addr, + &(req->owner->addr.ipv4.sin_addr), + sizeof(req->owner->addr.ipv4.sin_addr)); + } else { + memcpy(logpacket.addr, + &(req->owner->addr.ipv6.sin6_addr), + sizeof(req->owner->addr.ipv6.sin6_addr)); + } + logpacket.status = req->status; logpacket.method = req->method; logpacket.worker_id = worker->id; logpacket.worker_cpu = worker->cpu; - logpacket.src = req->owner->sin.sin_addr; logpacket.time_req = req->end - req->start; kore_strlcpy(logpacket.host, req->host, sizeof(logpacket.host)); kore_strlcpy(logpacket.path, req->path, sizeof(logpacket.path)); diff --git a/src/bsd.c b/src/bsd.c @@ -39,24 +39,30 @@ kore_platform_worker_setcpu(struct kore_worker *kw) void kore_platform_event_init(void) { + struct listener *l; + if ((kfd = kqueue()) == -1) fatal("kqueue(): %s", errno_s); nchanges = 0; - event_count = worker_max_connections + 1; + event_count = worker_max_connections + nlisteners; events = kore_calloc(event_count, sizeof(struct kevent)); changelist = kore_calloc(event_count, sizeof(struct kevent)); - kore_platform_event_schedule(server.fd, - EVFILT_READ, EV_ADD | EV_DISABLE, &server); + LIST_FOREACH(l, &listeners, list) { + kore_platform_event_schedule(l->fd, + EVFILT_READ, EV_ADD | EV_DISABLE, l); + } } void kore_platform_event_wait(void) { + struct listener *l; struct connection *c; + u_int8_t type; struct timespec timeo; - int n, i, *fd; + int n, i; timeo.tv_sec = 0; timeo.tv_nsec = 100000000; @@ -72,11 +78,14 @@ kore_platform_event_wait(void) kore_debug("main(): %d sockets available", n); for (i = 0; i < n; i++) { - fd = (int *)events[i].udata; + if (events[i].udata == NULL) + fatal("events[%d].udata == NULL", i); + + type = *(u_int8_t *)events[i].udata; if (events[i].flags & EV_EOF || events[i].flags & EV_ERROR) { - if (*fd == server.fd) + if (type == KORE_TYPE_LISTENER) fatal("error on server socket"); c = (struct connection *)events[i].udata; @@ -84,9 +93,13 @@ kore_platform_event_wait(void) continue; } - if (*fd == server.fd) { - while (worker->accepted < worker->accept_treshold) { - kore_connection_accept(&server, &c); + if (type == KORE_TYPE_LISTENER) { + l = (struct listener *)events[i].udata; + + while ((worker->accepted < worker->accept_treshold) && + (worker_active_connections < + worker_max_connections)) { + kore_connection_accept(l, &c); if (c == NULL) break; @@ -133,15 +146,19 @@ kore_platform_event_schedule(int fd, int type, int flags, void *data) void kore_platform_enable_accept(void) { - kore_platform_event_schedule(server.fd, - EVFILT_READ, EV_ENABLE, &server); + struct listener *l; + + LIST_FOREACH(l, &listeners, list) + kore_platform_event_schedule(l->fd, EVFILT_READ, EV_ENABLE, l); } void kore_platform_disable_accept(void) { - kore_platform_event_schedule(server.fd, - EVFILT_READ, EV_DISABLE, NULL); + struct listener *l; + + LIST_FOREACH(l, &listeners, list) + kore_platform_event_schedule(l->fd, EVFILT_READ, EV_DISABLE, l); } void diff --git a/src/config.c b/src/config.c @@ -113,8 +113,8 @@ kore_parse_config(void) if (!kore_module_loaded()) fatal("no site module was loaded"); - if (server_ip == NULL || server_port == 0) - fatal("missing a correct bind directive in configuration"); + if (LIST_EMPTY(&listeners)) + fatal("no listeners defined"); if (chroot_path == NULL) fatal("missing a chroot path"); if (runas_user == NULL) @@ -126,23 +126,10 @@ kore_parse_config(void) static int configure_bind(char **argv) { - int err; - if (argv[1] == NULL || argv[2] == NULL) return (KORE_RESULT_ERROR); - if (server_ip != NULL || server_port != 0) { - kore_debug("duplicate bind directive seen"); - return (KORE_RESULT_ERROR); - } - server_ip = kore_strdup(argv[1]); - server_port = kore_strtonum(argv[2], 1, 65535, &err); - if (err != KORE_RESULT_OK) { - kore_debug("%s is an invalid port number", argv[2]); - return (KORE_RESULT_ERROR); - } - - return (KORE_RESULT_OK); + return (kore_server_bind(argv[1], argv[2])); } static int diff --git a/src/connection.c b/src/connection.c @@ -34,16 +34,26 @@ kore_connection_init(void) int kore_connection_accept(struct listener *l, struct connection **out) { - socklen_t len; struct connection *c; + struct sockaddr *sin; + socklen_t len; kore_debug("kore_connection_accept(%p)", l); *out = NULL; - len = sizeof(struct sockaddr_in); - c = kore_pool_get(&connection_pool); - if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { + c->type = KORE_TYPE_CONNECTION; + + c->addrtype = l->addrtype; + if (c->addrtype == AF_INET) { + len = sizeof(struct sockaddr_in); + sin = (struct sockaddr *)&(c->addr.ipv4); + } else { + len = sizeof(struct sockaddr_in6); + sin = (struct sockaddr *)&(c->addr.ipv6); + } + + if ((c->fd = accept(l->fd, sin, &len)) == -1) { kore_pool_put(&connection_pool, c); kore_debug("accept(): %s", errno_s); return (KORE_RESULT_ERROR); diff --git a/src/kore.c b/src/kore.c @@ -16,20 +16,20 @@ #include <sys/socket.h> +#include <netdb.h> #include <signal.h> #include "kore.h" volatile sig_atomic_t sig_recv; -struct listener server; +struct listener_head listeners; +u_int8_t nlisteners; struct passwd *pw = NULL; pid_t kore_pid = -1; u_int16_t cpu_count = 1; int kore_debug = 0; -int server_port = 0; u_int8_t worker_count = 0; -char *server_ip = NULL; char *runas_user = NULL; char *chroot_path = NULL; char *kore_pidfile = KORE_PIDFILE_DEFAULT; @@ -39,7 +39,6 @@ static void usage(void); static void kore_server_start(void); static void kore_write_kore_pid(void); static void kore_server_sslstart(void); -static int kore_server_bind(struct listener *, const char *, int); static void usage(void) @@ -52,6 +51,7 @@ int main(int argc, char *argv[]) { int ch; + struct listener *l; if (getuid() != 0) fatal("kore must be started as root"); @@ -78,6 +78,9 @@ main(int argc, char *argv[]) kore_pid = getpid(); + nlisteners = 0; + LIST_INIT(&listeners); + kore_log_init(); kore_mem_init(); kore_domain_init(); @@ -96,7 +99,9 @@ main(int argc, char *argv[]) kore_log(LOG_NOTICE, "server shutting down"); kore_worker_shutdown(); unlink(kore_pidfile); - close(server.fd); + + LIST_FOREACH(l, &listeners, list) + close(l->fd); kore_log(LOG_NOTICE, "goodbye"); return (0); @@ -131,6 +136,72 @@ kore_ssl_sni_cb(SSL *ssl, int *ad, void *arg) return (SSL_TLSEXT_ERR_NOACK); } +int +kore_server_bind(const char *ip, const char *port) +{ + struct listener *l; + int on, r; + struct addrinfo *results; + + kore_debug("kore_server_bind(%s, %s)", ip, port); + + r = getaddrinfo(ip, port, NULL, &results); + if (r != 0) + fatal("getaddrinfo(%s): %s", ip, gai_strerror(r)); + + l = kore_malloc(sizeof(struct listener)); + l->type = KORE_TYPE_LISTENER; + l->addrtype = results->ai_family; + + if (l->addrtype != AF_INET && l->addrtype != AF_INET6) + fatal("getaddrinfo(): unknown address family %d", l->addrtype); + + if ((l->fd = socket(results->ai_family, SOCK_STREAM, 0)) == -1) { + kore_mem_free(l); + freeaddrinfo(results); + kore_debug("socket(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + if (!kore_connection_nonblock(l->fd)) { + kore_mem_free(l); + freeaddrinfo(results); + return (KORE_RESULT_ERROR); + } + + on = 1; + if (setsockopt(l->fd, SOL_SOCKET, + SO_REUSEADDR, (const char *)&on, sizeof(on)) == -1) { + close(l->fd); + kore_mem_free(l); + freeaddrinfo(results); + kore_debug("setsockopt(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + if (bind(l->fd, results->ai_addr, results->ai_addrlen) == -1) { + close(l->fd); + kore_mem_free(l); + freeaddrinfo(results); + kore_debug("bind(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + freeaddrinfo(results); + + if (listen(l->fd, 5000) == -1) { + close(l->fd); + kore_mem_free(l); + kore_debug("listen(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + + nlisteners++; + LIST_INSERT_HEAD(&listeners, l, list); + + return (KORE_RESULT_OK); +} + void kore_signal(int sig) { @@ -149,10 +220,6 @@ kore_server_sslstart(void) static void kore_server_start(void) { - if (!kore_server_bind(&server, server_ip, server_port)) - fatal("cannot bind to %s:%d", server_ip, server_port); - - kore_mem_free(server_ip); kore_mem_free(runas_user); if (daemon(1, 1) == -1) @@ -183,49 +250,6 @@ kore_server_start(void) } } -static int -kore_server_bind(struct listener *l, const char *ip, int port) -{ - int on; - - kore_debug("kore_server_bind(%p, %s, %d)", l, ip, port); - - if ((l->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - kore_debug("socket(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - if (!kore_connection_nonblock(l->fd)) - return (KORE_RESULT_ERROR); - - on = 1; - if (setsockopt(l->fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&on, - sizeof(on)) == -1) { - kore_debug("setsockopt(): %s", errno_s); - close(l->fd); - return (KORE_RESULT_ERROR); - } - - memset(&(l->sin), 0, sizeof(l->sin)); - l->sin.sin_family = AF_INET; - l->sin.sin_port = htons(port); - l->sin.sin_addr.s_addr = inet_addr(ip); - - if (bind(l->fd, (struct sockaddr *)&(l->sin), sizeof(l->sin)) == -1) { - close(l->fd); - kore_debug("bind(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - if (listen(l->fd, 5000) == -1) { - close(l->fd); - kore_debug("listen(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - - return (KORE_RESULT_OK); -} - static void kore_write_kore_pid(void) { diff --git a/src/linux.c b/src/linux.c @@ -59,7 +59,7 @@ kore_platform_event_init(void) if ((efd = epoll_create(10000)) == -1) fatal("epoll_create(): %s", errno_s); - event_count = worker_max_connections + 1; + event_count = worker_max_connections + nlisteners; events = kore_calloc(event_count, sizeof(struct epoll_event)); } @@ -67,7 +67,9 @@ void kore_platform_event_wait(void) { struct connection *c; - int n, i, *fd; + struct listener *l; + u_int8_t type; + int n, i; n = epoll_wait(efd, events, event_count, 100); if (n == -1) { @@ -80,23 +82,28 @@ kore_platform_event_wait(void) kore_debug("main(): %d sockets available", n); for (i = 0; i < n; i++) { - fd = (int *)events[i].data.ptr; + if (events[i].data.ptr == NULL) + fatal("events[%d].data.ptr == NULL", i); + + type = *(u_int8_t *)events[i].data.ptr; if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) { - if (*fd == server.fd) - fatal("error on server socket"); + if (type == KORE_TYPE_LISTENER) + fatal("failed on listener socket"); c = (struct connection *)events[i].data.ptr; kore_connection_disconnect(c); continue; } - if (*fd == server.fd) { + if (type == KORE_TYPE_LISTENER) { + l = (struct listener *)events[i].data.ptr; + while ((worker->accepted < worker->accept_treshold) && (worker_active_connections < worker_max_connections)) { - kore_connection_accept(&server, &c); + kore_connection_accept(l, &c); if (c == NULL) break; @@ -142,14 +149,21 @@ kore_platform_event_schedule(int fd, int type, int flags, void *udata) void kore_platform_enable_accept(void) { - kore_platform_event_schedule(server.fd, EPOLLIN, 0, &server); + struct listener *l; + + LIST_FOREACH(l, &listeners, list) + kore_platform_event_schedule(l->fd, EPOLLIN, 0, l); } void kore_platform_disable_accept(void) { - if (epoll_ctl(efd, EPOLL_CTL_DEL, server.fd, NULL) == -1) - fatal("kore_platform_disable_accept: %s", errno_s); + struct listener *l; + + LIST_FOREACH(l, &listeners, list) { + if (epoll_ctl(efd, EPOLL_CTL_DEL, l->fd, NULL) == -1) + fatal("kore_platform_disable_accept: %s", errno_s); + } } void