commit 9243f409cc5384fde7042dfc86d874b57848c274
parent 8478d8df548310e1883f7da7ceb59830eb9698dd
Author: Joris Vink <joris@coders.se>
Date: Thu, 30 May 2013 19:36:42 +0200
move to a worker based threading approach where we delegate http requests to workers in a round robin basis (later this should be swapped to find the laziest worker and assign the request to that instead).
Diffstat:
7 files changed, 218 insertions(+), 65 deletions(-)
diff --git a/Makefile b/Makefile
@@ -11,7 +11,7 @@ CFLAGS+=-I/usr/local/ssl/include
CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes
CFLAGS+=-Wmissing-declarations -Wshadow -Wpointer-arith -Wcast-qual
CFLAGS+=-D_GNU_SOURCE=1 -Wsign-compare -Iincludes -g
-LDFLAGS=-rdynamic -Llibs -lssl -lcrypto -ldl -lz
+LDFLAGS=-rdynamic -Llibs -lssl -lcrypto -lpthread -ldl -lz
light: $(S_OBJS)
$(CC) $(CFLAGS) $(S_OBJS) $(LDFLAGS) -o $(BIN)
diff --git a/example.conf b/example.conf
@@ -4,6 +4,7 @@
bind 10.211.55.3 443
chroot /tmp
runas joris
+workers 10
# Load our site module now (containing all the goodies).
load example/example.module
diff --git a/includes/http.h b/includes/http.h
@@ -39,6 +39,7 @@ struct http_arg {
#define HTTP_METHOD_POST 1
#define HTTP_REQUEST_COMPLETE 0x01
+#define HTTP_REQUEST_DELETE 0x02
struct http_request {
u_int8_t method;
@@ -55,8 +56,6 @@ struct http_request {
TAILQ_ENTRY(http_request) list;
};
-void http_init(void);
-void http_process(void);
time_t http_date_to_time(char *);
void http_request_free(struct http_request *);
int http_response(struct http_request *, int,
@@ -66,6 +65,7 @@ void http_response_header_add(struct http_request *, char *, char *);
int http_request_new(struct connection *, struct spdy_stream *,
char *, char *, char *, struct http_request **);
+int http_generic_404(struct http_request *);
int http_header_recv(struct netbuf *);
char *http_post_data_text(struct http_request *);
int http_populate_arguments(struct http_request *);
diff --git a/includes/kore.h b/includes/kore.h
@@ -22,8 +22,13 @@
#define errno_s strerror(errno)
#define ssl_errno_s ERR_error_string(ERR_get_error(), NULL)
+
+#if defined(KORE_DEBUG)
#define kore_log(fmt, ...) \
kore_log_internal(__FILE__, __LINE__, fmt, ##__VA_ARGS__)
+#else
+#define kore_log(fmt, ...)
+#endif
#define NETBUF_RECV 0
#define NETBUF_SEND 1
@@ -71,6 +76,7 @@ struct connection {
void *owner;
SSL *ssl;
int flags;
+ pthread_mutex_t lock;
u_int8_t inflate_started;
z_stream z_inflate;
@@ -80,7 +86,7 @@ struct connection {
TAILQ_HEAD(, netbuf) send_queue;
TAILQ_HEAD(, netbuf) recv_queue;
- u_int32_t client_stream_id;
+ u_int32_t client_stream_id;
TAILQ_HEAD(, spdy_stream) spdy_streams;
TAILQ_ENTRY(connection) list;
@@ -99,6 +105,17 @@ struct kore_module_handle {
TAILQ_ENTRY(kore_module_handle) list;
};
+struct kore_worker {
+ u_int8_t id;
+ pthread_t pctx;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ u_int32_t load;
+
+ TAILQ_HEAD(, http_request) requests;
+ TAILQ_ENTRY(kore_worker) list;
+};
+
#define KORE_BUF_INITIAL 128
#define KORE_BUF_INCREMENT KORE_BUF_INITIAL
@@ -117,6 +134,7 @@ extern int server_port;
extern char *server_ip;
extern char *chroot_path;
extern char *runas_user;
+extern u_int8_t worker_count;
void *kore_malloc(size_t);
void *kore_calloc(size_t, size_t);
@@ -137,6 +155,8 @@ int kore_module_domain_new(char *);
void *kore_module_handler_find(char *, char *);
int kore_module_handler_new(char *, char *, char *, int);
+void kore_worker_delegate(struct http_request *);
+
void fatal(const char *, ...);
void kore_log_internal(char *, int, const char *, ...);
diff --git a/src/config.c b/src/config.c
@@ -44,6 +44,7 @@ static int configure_handler(char **);
static int configure_domain(char **);
static int configure_chroot(char **);
static int configure_runas(char **);
+static int configure_workers(char **);
static struct {
const char *name;
@@ -56,6 +57,7 @@ static struct {
{ "domain", configure_domain },
{ "chroot", configure_chroot },
{ "runas", configure_runas },
+ { "workers", configure_workers },
{ NULL, NULL },
};
@@ -209,3 +211,25 @@ configure_runas(char **argv)
runas_user = kore_strdup(argv[1]);
return (KORE_RESULT_OK);
}
+
+static int
+configure_workers(char **argv)
+{
+ int err;
+
+ if (worker_count != 0) {
+ kore_log("duplicate worker directive specified");
+ return (KORE_RESULT_ERROR);
+ }
+
+ if (argv[1] == NULL)
+ return (KORE_RESULT_ERROR);
+
+ worker_count = kore_strtonum(argv[1], 1, 255, &err);
+ if (err != KORE_RESULT_OK) {
+ kore_log("%s is not a correct worker number", argv[1]);
+ return (KORE_RESULT_ERROR);
+ }
+
+ return (KORE_RESULT_OK);
+}
diff --git a/src/http.c b/src/http.c
@@ -39,15 +39,7 @@
#include "kore.h"
#include "http.h"
-TAILQ_HEAD(, http_request) http_requests;
-
-static int http_generic_404(struct http_request *);
static int http_post_data_recv(struct netbuf *);
-void
-http_init(void)
-{
- TAILQ_INIT(&http_requests);
-}
int
http_request_new(struct connection *c, struct spdy_stream *s, char *host,
@@ -80,11 +72,11 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host,
return (KORE_RESULT_ERROR);
}
- TAILQ_INSERT_TAIL(&http_requests, req, list);
-
if (out != NULL)
*out = req;
+ kore_worker_delegate(req);
+
return (KORE_RESULT_OK);
}
@@ -225,40 +217,6 @@ http_request_header_get(struct http_request *req, char *header, char **out)
return (r);
}
-void
-http_process(void)
-{
- struct http_request *req, *next;
- int r, (*hdlr)(struct http_request *);
-
- if (TAILQ_EMPTY(&http_requests))
- return;
-
- kore_log("http_process()");
- for (req = TAILQ_FIRST(&http_requests); req != NULL; req = next) {
- next = TAILQ_NEXT(req, list);
- if (!(req->flags & HTTP_REQUEST_COMPLETE))
- continue;
-
- hdlr = kore_module_handler_find(req->host, req->path);
- if (hdlr == NULL)
- r = http_generic_404(req);
- else
- r = hdlr(req);
-
- if (r != KORE_RESULT_ERROR) {
- net_send_flush(req->owner);
- if (req->owner->proto == CONN_PROTO_HTTP)
- kore_server_disconnect(req->owner);
- } else {
- kore_server_disconnect(req->owner);
- }
-
- TAILQ_REMOVE(&http_requests, req, list);
- http_request_free(req);
- }
-}
-
int
http_header_recv(struct netbuf *nb)
{
@@ -363,8 +321,7 @@ http_header_recv(struct netbuf *nb)
if (req->method == HTTP_METHOD_POST) {
if (!http_request_header_get(req, "content-length", &p)) {
kore_log("POST but no content-length");
- TAILQ_REMOVE(&http_requests, req, list);
- http_request_free(req);
+ req->flags |= HTTP_REQUEST_DELETE;
return (KORE_RESULT_ERROR);
}
@@ -372,8 +329,7 @@ http_header_recv(struct netbuf *nb)
if (v == KORE_RESULT_ERROR) {
free(p);
kore_log("content-length invalid: %s", p);
- TAILQ_REMOVE(&http_requests, req, list);
- http_request_free(req);
+ req->flags |= HTTP_REQUEST_DELETE;
return (KORE_RESULT_ERROR);
}
@@ -462,7 +418,7 @@ http_post_data_text(struct http_request *req)
return (text);
}
-static int
+int
http_generic_404(struct http_request *req)
{
kore_log("http_generic_404(%s, %d, %s)",
diff --git a/src/kore.c b/src/kore.c
@@ -38,6 +38,7 @@
#include <time.h>
#include <regex.h>
#include <zlib.h>
+#include <pthread.h>
#include <unistd.h>
#include "spdy.h"
@@ -51,13 +52,19 @@ static SSL_CTX *ssl_ctx = NULL;
volatile sig_atomic_t sig_recv;
static TAILQ_HEAD(, connection) disconnected;
+static TAILQ_HEAD(, kore_worker) kore_workers;
+static struct kore_worker *last_worker = NULL;
-int server_port = 0;
-char *server_ip = NULL;
-char *chroot_path = NULL;
-char *runas_user = NULL;
+int server_port = 0;
+char *server_ip = NULL;
+char *chroot_path = NULL;
+char *runas_user = NULL;
+u_int8_t worker_count = 0;
+pthread_mutex_t disconnect_lock;
static void kore_signal(int);
+static void kore_worker_init(void);
+static void *kore_worker_entry(void *);
static int kore_socket_nonblock(int);
static int kore_server_sslstart(void);
static void kore_event(int, int, void *);
@@ -108,8 +115,10 @@ main(int argc, char *argv[])
pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
fatal("unable to drop privileges");
- http_init();
TAILQ_INIT(&disconnected);
+ pthread_mutex_init(&disconnect_lock, NULL);
+
+ kore_worker_init();
sig_recv = 0;
signal(SIGHUP, kore_signal);
@@ -141,7 +150,13 @@ main(int argc, char *argv[])
fatal("error on server socket");
c = (struct connection *)events[i].data.ptr;
- kore_server_disconnect(c);
+ if (pthread_mutex_trylock(&(c->lock))) {
+ // Reschedule the client.
+ kore_log("resched on error");
+ } else {
+ kore_server_disconnect(c);
+ pthread_mutex_unlock(&(c->lock));
+ }
continue;
}
@@ -149,19 +164,28 @@ main(int argc, char *argv[])
kore_server_accept(&server);
} else {
c = (struct connection *)events[i].data.ptr;
- if (!kore_connection_handle(c,
- events[i].events))
- kore_server_disconnect(c);
+ if (pthread_mutex_trylock(&(c->lock))) {
+ // Reschedule the client.
+ kore_log("resched on normal");
+ } else {
+ if (!kore_connection_handle(c,
+ events[i].events))
+ kore_server_disconnect(c);
+ pthread_mutex_unlock(&(c->lock));
+ }
}
}
- http_process();
+ if (pthread_mutex_trylock(&disconnect_lock))
+ continue;
for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
cnext = TAILQ_NEXT(c, list);
TAILQ_REMOVE(&disconnected, c, list);
kore_server_final_disconnect(c);
}
+
+ pthread_mutex_unlock(&disconnect_lock);
}
close(server.fd);
@@ -174,10 +198,36 @@ kore_server_disconnect(struct connection *c)
if (c->state != CONN_STATE_DISCONNECTING) {
kore_log("preparing %p for disconnection", c);
c->state = CONN_STATE_DISCONNECTING;
+
+ pthread_mutex_lock(&disconnect_lock);
TAILQ_INSERT_TAIL(&disconnected, c, list);
+ pthread_mutex_unlock(&disconnect_lock);
}
}
+void
+kore_worker_delegate(struct http_request *req)
+{
+ struct kore_worker *kw;
+
+ if (last_worker != NULL) {
+ kw = TAILQ_NEXT(last_worker, list);
+ if (kw == NULL)
+ kw = TAILQ_FIRST(&kore_workers);
+ } else {
+ kw = TAILQ_FIRST(&kore_workers);
+ }
+
+ last_worker = kw;
+
+ pthread_mutex_lock(&(kw->lock));
+ kore_log("assigning request %p to worker %d:%d", req, kw->id, kw->load);
+ kw->load++;
+ TAILQ_INSERT_TAIL(&(kw->requests), req, list);
+ pthread_mutex_unlock(&(kw->lock));
+ pthread_cond_signal(&(kw->cond));
+}
+
static int
kore_server_sslstart(void)
{
@@ -275,6 +325,7 @@ kore_server_accept(struct listener *l)
c->client_stream_id = 0;
c->proto = CONN_PROTO_UNKNOWN;
c->state = CONN_STATE_SSL_SHAKE;
+ pthread_mutex_init(&(c->lock), NULL);
TAILQ_INIT(&(c->send_queue));
TAILQ_INIT(&(c->recv_queue));
@@ -290,12 +341,23 @@ kore_server_final_disconnect(struct connection *c)
struct netbuf *nb, *next;
struct spdy_stream *s, *snext;
+ if (pthread_mutex_trylock(&(c->lock))) {
+ kore_log("delaying disconnection of %p", c);
+ return;
+ }
+
kore_log("kore_server_final_disconnect(%p)", c);
- close(c->fd);
- if (c->ssl != NULL)
+ if (c->ssl != NULL) {
+ if (SSL_shutdown(c->ssl) == 0) {
+ pthread_mutex_unlock(&(c->lock));
+ return;
+ }
+
SSL_free(c->ssl);
+ }
+ close(c->fd);
if (c->inflate_started)
inflateEnd(&(c->z_inflate));
if (c->deflate_started)
@@ -328,6 +390,7 @@ kore_server_final_disconnect(struct connection *c)
free(s);
}
+ pthread_mutex_destroy(&(c->lock));
free(c);
}
@@ -404,6 +467,8 @@ kore_connection_handle(struct connection *c, int flags)
return (KORE_RESULT_ERROR);
}
break;
+ case CONN_STATE_DISCONNECTING:
+ break;
default:
kore_log("unknown state on %d (%d)", c->fd, c->state);
break;
@@ -412,6 +477,93 @@ kore_connection_handle(struct connection *c, int flags)
return (KORE_RESULT_OK);
}
+static void
+kore_worker_init(void)
+{
+ u_int8_t i;
+ struct kore_worker *kw;
+
+ kore_log("kore_worker_init(): starting %d workers", worker_count);
+
+ TAILQ_INIT(&kore_workers);
+ for (i = 0; i < worker_count; i++) {
+ kw = (struct kore_worker *)kore_malloc(sizeof(*kw));
+ kw->id = i;
+ kw->load = 0;
+ pthread_cond_init(&(kw->cond), NULL);
+ pthread_mutex_init(&(kw->lock), NULL);
+ TAILQ_INIT(&(kw->requests));
+ TAILQ_INSERT_TAIL(&kore_workers, kw, list);
+
+ if (pthread_create(&(kw->pctx), NULL, kore_worker_entry, kw))
+ kore_log("failed to spawn worker: %s", errno_s);
+ }
+
+ if (i == 0)
+ fatal("No workers spawned, check logs for errors.");
+}
+
+static void *
+kore_worker_entry(void *arg)
+{
+ u_int8_t retry;
+ struct http_request *req, *next;
+ struct kore_worker *kw = (struct kore_worker *)arg;
+ int r, (*hdlr)(struct http_request *);
+
+ pthread_mutex_lock(&(kw->lock));
+ for (;;) {
+ if (retry == 0) {
+ pthread_cond_wait(&(kw->cond), &(kw->lock));
+ kore_log("worker %d woke up with %d reqs",
+ kw->id, kw->load);
+ }
+
+ retry = 0;
+ for (req = TAILQ_FIRST(&(kw->requests)); req != NULL;
+ req = next) {
+ next = TAILQ_NEXT(req, list);
+ if (req->flags & HTTP_REQUEST_DELETE) {
+ TAILQ_REMOVE(&(kw->requests), req, list);
+ http_request_free(req);
+ continue;
+ }
+
+ if (!(req->flags & HTTP_REQUEST_COMPLETE))
+ continue;
+
+ if (pthread_mutex_trylock(&(req->owner->lock))) {
+ retry = 1;
+ continue;
+ }
+
+ hdlr = kore_module_handler_find(req->host, req->path);
+ if (hdlr == NULL)
+ r = http_generic_404(req);
+ else
+ r = hdlr(req);
+
+ if (r != KORE_RESULT_ERROR) {
+ r = net_send_flush(req->owner);
+ if (r == KORE_RESULT_ERROR ||
+ req->owner->proto == CONN_PROTO_HTTP)
+ kore_server_disconnect(req->owner);
+ } else {
+ kore_server_disconnect(req->owner);
+ }
+
+ pthread_mutex_unlock(&(req->owner->lock));
+
+ TAILQ_REMOVE(&(kw->requests), req, list);
+ http_request_free(req);
+
+ kw->load--;
+ }
+ }
+
+ pthread_exit(NULL);
+}
+
static int
kore_socket_nonblock(int fd)
{