kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 146a0189abf66eac40c6f8170004cadcdebbc85a
parent cf700b34f74aa853b71dba0add1566ae755265b5
Author: Joris Vink <joris@coders.se>
Date:   Sun, 29 Jun 2014 14:15:40 +0200

More work on the background task implementation.

Tasks are now assigned to available threads instead
of a global task list.

You can now pass messages between your page handler
and the created task using the kore_task_channel_*
functions.

Only one task per time can be assigned to a request
but I feel this is probably a bad design choice.

Preferably we'd want to be able to start tasks
regardless of being in a page handler or not,
this not only ads flexibility but seems like
a better choice overall as it opens a lot more
possibilities about how tasks can be used.

Diffstat:
contrib/tasks/kore_tasks.c | 185++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
includes/http.h | 3+--
includes/kore_tasks.h | 18+++++++++---------
src/connection.c | 2+-
src/http.c | 7++++++-
src/linux.c | 26++++++++++++++------------
6 files changed, 172 insertions(+), 69 deletions(-)

diff --git a/contrib/tasks/kore_tasks.c b/contrib/tasks/kore_tasks.c @@ -27,64 +27,78 @@ #include "kore_tasks.h" static u_int8_t threads; -static pthread_mutex_t task_lock; -static pthread_cond_t task_broadcast; +static pthread_mutex_t task_thread_lock; -static TAILQ_HEAD(, kore_task) task_list; static TAILQ_HEAD(, kore_task_thread) task_threads; static void *task_thread(void *); -static void task_thread_spawn(void); +static void task_channel_read(int, void *, u_int32_t); +static void task_channel_write(int, void *, u_int32_t); +static void task_thread_spawn(struct kore_task_thread **); + +#define THREAD_FD_ASSIGN(t, f, i, o) \ + do { \ + if (pthread_self() == t) { \ + f = i; \ + kore_debug("fd for thread"); \ + } else { \ + f = o; \ + kore_debug("fd for worker"); \ + } \ + } while (0); void kore_task_init(void) { threads = 0; - TAILQ_INIT(&task_list); TAILQ_INIT(&task_threads); - - pthread_mutex_init(&task_lock, NULL); - pthread_cond_init(&task_broadcast, NULL); - - task_thread_spawn(); + pthread_mutex_init(&task_thread_lock, NULL); } void -kore_task_setup(struct http_request *req) +kore_task_create(struct http_request *req, void (*entry)(struct kore_task *)) { - int i; - - for (i = 0; i < HTTP_TASK_MAX; i++) - req->tasks[i] = NULL; -} + struct kore_task_thread *tt; -void -kore_task_create(struct http_request *req, int idx, - void (*entry)(struct kore_task *)) -{ - if (idx >= HTTP_TASK_MAX) - fatal("kore_task_create: idx > HTTP_TASK_MAX"); - if (req->tasks[idx] != NULL) + if (req->task != NULL) return; req->flags |= HTTP_REQUEST_SLEEPING; - req->tasks[idx] = kore_malloc(sizeof(struct kore_task)); - req->tasks[idx]->owner = req; - req->tasks[idx]->entry = entry; - req->tasks[idx]->type = KORE_TYPE_TASK; + req->task = kore_malloc(sizeof(struct kore_task)); + req->task->owner = req; + req->task->entry = entry; + req->task->type = KORE_TYPE_TASK; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, req->tasks[idx]->fds) == -1) + if (socketpair(AF_UNIX, SOCK_STREAM, 0, req->task->fds) == -1) fatal("kore_task_create: socketpair() %s", errno_s); - kore_platform_schedule_read(req->tasks[idx]->fds[0], req->tasks[idx]); + kore_platform_schedule_read(req->task->fds[0], req->task); + + pthread_mutex_lock(&task_thread_lock); + if (TAILQ_EMPTY(&task_threads)) + task_thread_spawn(&tt); + else + tt = TAILQ_FIRST(&task_threads); + + pthread_mutex_unlock(&task_thread_lock); + pthread_mutex_lock(&(tt->lock)); + + req->task->thread = tt; + TAILQ_INSERT_TAIL(&(tt->tasks), req->task, list); - pthread_mutex_lock(&task_lock); - TAILQ_INSERT_TAIL(&task_list, req->tasks[idx], list); - pthread_mutex_unlock(&task_lock); + pthread_mutex_unlock(&(tt->lock)); + pthread_cond_signal(&(tt->cond)); +} - pthread_cond_broadcast(&task_broadcast); +void +kore_task_destroy(struct kore_task *t) +{ + if (t->fds[1] != -1) + close(t->fds[1]); + close(t->fds[0]); + kore_mem_free(t); } void @@ -93,12 +107,36 @@ kore_task_finish(struct kore_task *t) kore_debug("kore_task_finished: %p", t); close(t->fds[1]); + t->fds[1] = -1; } void kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len) { + int fd; + kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len); + + THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); + task_channel_write(fd, &len, sizeof(len)); + task_channel_write(fd, data, len); +} + +u_int32_t +kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len) +{ + int fd; + u_int32_t dlen; + + kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len); + + THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); + task_channel_read(fd, &dlen, sizeof(dlen)); + if (dlen > len) + fatal("task_channel_read: buffer too small, wanted %d", dlen); + task_channel_read(fd, out, dlen); + + return (dlen); } void @@ -109,23 +147,68 @@ kore_task_handle(struct kore_task *t, int finished) kore_debug("kore_task_handle: %p, %d", t, finished); if (finished) { - close(t->fds[0]); + /* XXX - How do we deal with req being gone? */ req->flags &= ~HTTP_REQUEST_SLEEPING; - kore_mem_free(t); } } static void -task_thread_spawn(void) +task_channel_write(int fd, void *data, u_int32_t len) +{ + ssize_t r; + u_int8_t *d; + u_int32_t offset; + + d = data; + offset = 0; + while (offset != len) { + r = write(fd, d + offset, len - offset); + if (r == -1 && errno == EINTR) + continue; + if (r == -1) + fatal("task_channel_write: %s", errno_s); + offset += r; + } +} + +static void +task_channel_read(int fd, void *out, u_int32_t len) +{ + ssize_t r; + u_int8_t *d; + u_int32_t offset; + + d = out; + offset = 0; + while (offset != len) { + r = read(fd, d + offset, len - offset); + if (r == -1 && errno == EINTR) + continue; + if (r == -1) + fatal("task_channel_read: %s", errno_s); + if (r == 0) + fatal("task_channel_read: unexpected eof"); + + offset += r; + } +} + +static void +task_thread_spawn(struct kore_task_thread **out) { struct kore_task_thread *tt; tt = kore_malloc(sizeof(*tt)); tt->idx = threads++; - TAILQ_INSERT_TAIL(&task_threads, tt, list); + + TAILQ_INIT(&(tt->tasks)); + pthread_cond_init(&(tt->cond), NULL); + pthread_mutex_init(&(tt->lock), NULL); if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0) fatal("pthread_create: %s", errno_s); + + *out = tt; } static void * @@ -136,20 +219,34 @@ task_thread(void *arg) kore_debug("task_thread: #%d starting", tt->idx); + pthread_mutex_lock(&(tt->lock)); + + pthread_mutex_lock(&task_thread_lock); + TAILQ_INSERT_TAIL(&task_threads, tt, list); + pthread_mutex_unlock(&task_thread_lock); + for (;;) { - pthread_mutex_lock(&task_lock); - if (TAILQ_EMPTY(&task_list)) - pthread_cond_wait(&task_broadcast, &task_lock); + if (TAILQ_EMPTY(&(tt->tasks))) + pthread_cond_wait(&(tt->cond), &(tt->lock)); kore_debug("task_thread#%d: woke up", tt->idx); - t = TAILQ_FIRST(&task_list); - TAILQ_REMOVE(&task_list, t, list); - pthread_mutex_unlock(&task_lock); + t = TAILQ_FIRST(&(tt->tasks)); + TAILQ_REMOVE(&(tt->tasks), t, list); + pthread_mutex_unlock(&(tt->lock)); + + pthread_mutex_lock(&task_thread_lock); + TAILQ_REMOVE(&task_threads, tt, list); + pthread_mutex_unlock(&task_thread_lock); kore_debug("task_thread#%d: executing %p", tt->idx, t); - t->thread = tt; t->entry(t); + + pthread_mutex_lock(&task_thread_lock); + TAILQ_INSERT_HEAD(&task_threads, tt, list); + pthread_mutex_unlock(&task_thread_lock); + + pthread_mutex_lock(&(tt->lock)); } pthread_exit(NULL); diff --git a/includes/http.h b/includes/http.h @@ -121,7 +121,6 @@ struct http_file { #define HTTP_REQUEST_SLEEPING 0x04 #define HTTP_PGSQL_MAX 20 -#define HTTP_TASK_MAX 10 struct kore_pgsql; struct kore_task; @@ -143,8 +142,8 @@ struct http_request { u_int8_t *multipart_body; struct kore_module_handle *hdlr; + struct kore_task *task; struct kore_pgsql *pgsql[HTTP_PGSQL_MAX]; - struct kore_task *tasks[HTTP_TASK_MAX]; TAILQ_HEAD(, http_header) req_headers; TAILQ_HEAD(, http_header) resp_headers; diff --git a/includes/kore_tasks.h b/includes/kore_tasks.h @@ -19,18 +19,20 @@ struct kore_task { u_int8_t type; - int fds[2]; void *owner; - void *thread; void (*entry)(struct kore_task *); - TAILQ_ENTRY(kore_task) list; + struct kore_task_thread *thread; + TAILQ_ENTRY(kore_task) list; }; struct kore_task_thread { - u_int8_t idx; - pthread_t tid; + u_int8_t idx; + pthread_t tid; + pthread_mutex_t lock; + pthread_cond_t cond; + TAILQ_HEAD(, kore_task) tasks; TAILQ_ENTRY(kore_task_thread) list; }; @@ -38,13 +40,11 @@ struct kore_task_thread { void kore_task_init(void); void kore_task_finish(struct kore_task *); void kore_task_destroy(struct kore_task *); -void kore_task_setup(struct http_request *); void kore_task_handle(struct kore_task *, int); -void kore_task_create(struct http_request *, int, +void kore_task_create(struct http_request *, void (*entry)(struct kore_task *)); +u_int32_t kore_task_channel_read(struct kore_task *, void *, u_int32_t); void kore_task_channel_write(struct kore_task *, void *, u_int32_t); -void kore_task_channel_read(struct kore_task *, - u_int8_t **, u_int32_t *); #endif diff --git a/src/connection.c b/src/connection.c @@ -111,7 +111,7 @@ kore_connection_handle(struct connection *c) const u_char *data; char cn[X509_CN_LENGTH]; - kore_debug("kore_connection_handle(%p)", c); + kore_debug("kore_connection_handle(%p) -> %d", c, c->state); kore_connection_stop_idletimer(c); diff --git a/src/http.c b/src/http.c @@ -142,7 +142,7 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, } #if defined(KORE_USE_TASKS) - kore_task_setup(req); + req->task = NULL; #endif http_request_count++; @@ -319,6 +319,11 @@ http_request_free(struct http_request *req) kore_pgsql_cleanup(req); #endif +#if defined(KORE_USE_TASKS) + if (req->task != NULL) + kore_task_destroy(req->task); +#endif + if (req->method == HTTP_METHOD_POST && req->post_data != NULL) kore_buf_free(req->post_data); if (req->method == HTTP_METHOD_POST && req->multipart_body != NULL) diff --git a/src/linux.c b/src/linux.c @@ -97,24 +97,26 @@ kore_platform_event_wait(void) if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) { - if (type == KORE_TYPE_LISTENER) + switch (type) { + case KORE_TYPE_LISTENER: fatal("failed on listener socket"); - + /* NOTREACHED */ #if defined(KORE_USE_PGSQL) - if (type == KORE_TYPE_PGSQL_CONN) { + case KORE_TYPE_PGSQL_CONN: kore_pgsql_handle(events[i].data.ptr, 1); - continue; - } + break; #endif - #if defined(KORE_USE_TASKS) - if (type == KORE_TYPE_TASK) { + case KORE_TYPE_TASK: kore_task_handle(events[i].data.ptr, 1); - continue; - } + break; #endif - c = (struct connection *)events[i].data.ptr; - kore_connection_disconnect(c); + default: + c = (struct connection *)events[i].data.ptr; + kore_connection_disconnect(c); + break; + } + continue; } @@ -151,7 +153,7 @@ kore_platform_event_wait(void) kore_pgsql_handle(events[i].data.ptr, 0); break; #endif -#if defined(KORE_USE_TASK) +#if defined(KORE_USE_TASKS) case KORE_TYPE_TASK: kore_task_handle(events[i].data.ptr, 0); break;