kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit f9b3cfcee406be84b4bab53d5e310625ab545adc
parent 9ad263e2870964f8ae54fd0511885877b1faf466
Author: Joris Vink <joris@coders.se>
Date:   Thu, 30 May 2013 20:38:25 +0200

reschedule events if we cannot lock the connection at the time being.

Diffstat:
src/kore.c | 60++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 56 insertions(+), 4 deletions(-)

diff --git a/src/kore.c b/src/kore.c @@ -47,12 +47,23 @@ #define EPOLL_EVENTS 500 +#define RESCHEDULE_AT_ERROR 1 +#define RESCHEDULE_AT_NORMAL 2 + +struct reschedule { + struct connection *c; + int reason; + int events; + TAILQ_ENTRY(reschedule) list; +}; + static int efd = -1; static SSL_CTX *ssl_ctx = NULL; volatile sig_atomic_t sig_recv; static TAILQ_HEAD(, connection) disconnected; static TAILQ_HEAD(, kore_worker) kore_workers; +static TAILQ_HEAD(, reschedule) reschedule_list; static struct kore_worker *last_worker = NULL; int server_port = 0; @@ -69,6 +80,7 @@ static int kore_socket_nonblock(int); static int kore_server_sslstart(void); static void kore_event(int, int, void *); static int kore_server_accept(struct listener *); +static void kore_reschedule(struct connection *, int, int); static int kore_connection_handle(struct connection *, int); static void kore_server_final_disconnect(struct connection *); static int kore_server_bind(struct listener *, const char *, int); @@ -82,6 +94,7 @@ main(int argc, char *argv[]) struct epoll_event *events; int n, i, *fd; struct connection *c, *cnext; + struct reschedule *sched, *snext; if (argc != 2) fatal("Usage: kore [config file]"); @@ -116,6 +129,7 @@ main(int argc, char *argv[]) fatal("unable to drop privileges"); TAILQ_INIT(&disconnected); + TAILQ_INIT(&reschedule_list); pthread_mutex_init(&disconnect_lock, NULL); kore_worker_init(); @@ -151,8 +165,8 @@ main(int argc, char *argv[]) c = (struct connection *)events[i].data.ptr; if (pthread_mutex_trylock(&(c->lock))) { - // Reschedule the client. - kore_log("resched on error"); + kore_reschedule(c, + RESCHEDULE_AT_ERROR, 0); } else { kore_server_disconnect(c); pthread_mutex_unlock(&(c->lock)); @@ -165,8 +179,9 @@ main(int argc, char *argv[]) } else { c = (struct connection *)events[i].data.ptr; if (pthread_mutex_trylock(&(c->lock))) { - // Reschedule the client. - kore_log("resched on normal"); + kore_reschedule(c, + RESCHEDULE_AT_NORMAL, + events[i].events); } else { if (!kore_connection_handle(c, events[i].events)) @@ -176,6 +191,27 @@ main(int argc, char *argv[]) } } + for (sched = TAILQ_FIRST(&reschedule_list); sched != NULL; + sched = snext) { + snext = TAILQ_NEXT(sched, list); + if (pthread_mutex_trylock(&(sched->c->lock))) + continue; + + if (sched->reason == RESCHEDULE_AT_ERROR) { + kore_server_disconnect(sched->c); + } else { + if (!kore_connection_handle(sched->c, + sched->events)) + kore_server_disconnect(sched->c); + } + + pthread_mutex_unlock(&(sched->c->lock)); + + kore_log("handled rescheduled %p", sched->c); + TAILQ_REMOVE(&reschedule_list, sched, list); + free(sched); + } + if (pthread_mutex_trylock(&disconnect_lock)) continue; @@ -356,6 +392,7 @@ kore_server_final_disconnect(struct connection *c) SSL_free(c->ssl); } + kore_log("kore_server_final_disconnect(%p) succeeded", c); TAILQ_REMOVE(&disconnected, c, list); close(c->fd); if (c->inflate_started) @@ -617,6 +654,21 @@ kore_event(int fd, int flags, void *udata) } static void +kore_reschedule(struct connection *c, int reason, int events) +{ + struct reschedule *sched; + + kore_log("kore_reschedule(%p, %d, %d)", c, reason, events); + + sched = (struct reschedule *)kore_malloc(sizeof(*sched)); + sched->c = c; + sched->reason = reason; + sched->events = events; + + TAILQ_INSERT_TAIL(&reschedule_list, sched, list); +} + +static void kore_signal(int sig) { sig_recv = sig;