kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit b9bd2e9a149dc1aa715fc6945c9fbfee492c67ed
parent 146a0189abf66eac40c6f8170004cadcdebbc85a
Author: Joris Vink <joris@coders.se>
Date:   Sun, 29 Jun 2014 20:20:13 +0200

Move tasks away from http_requests.

Instead if a task is used from inside a request
you MUST call kore_task_bind_request() on it.

This way we can move forward for tasks that
don't belong to page handlers.

Also, some bug fixes for removing http_requests
that are indeed linked to a currently running task.

Diffstat:
contrib/tasks/kore_tasks.c | 71++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
includes/kore_tasks.h | 22+++++++++++++++++-----
src/connection.c | 7+++++--
src/http.c | 12+++++-------
4 files changed, 75 insertions(+), 37 deletions(-)

diff --git a/contrib/tasks/kore_tasks.c b/contrib/tasks/kore_tasks.c @@ -40,10 +40,8 @@ static void task_thread_spawn(struct kore_task_thread **); do { \ if (pthread_self() == t) { \ f = i; \ - kore_debug("fd for thread"); \ } else { \ f = o; \ - kore_debug("fd for worker"); \ } \ } while (0); @@ -57,24 +55,20 @@ kore_task_init(void) } void -kore_task_create(struct http_request *req, void (*entry)(struct kore_task *)) +kore_task_create(struct kore_task **out, void (*entry)(struct kore_task *)) { + struct kore_task *t; struct kore_task_thread *tt; - if (req->task != NULL) - return; - - req->flags |= HTTP_REQUEST_SLEEPING; + t = kore_malloc(sizeof(struct kore_task)); + t->entry = entry; + t->type = KORE_TYPE_TASK; + t->state = KORE_TASK_STATE_CREATED; - req->task = kore_malloc(sizeof(struct kore_task)); - req->task->owner = req; - req->task->entry = entry; - req->task->type = KORE_TYPE_TASK; - - if (socketpair(AF_UNIX, SOCK_STREAM, 0, req->task->fds) == -1) + if (socketpair(AF_UNIX, SOCK_STREAM, 0,t->fds) == -1) fatal("kore_task_create: socketpair() %s", errno_s); - kore_platform_schedule_read(req->task->fds[0], req->task); + kore_platform_schedule_read(t->fds[0], t); pthread_mutex_lock(&task_thread_lock); if (TAILQ_EMPTY(&task_threads)) @@ -85,29 +79,55 @@ kore_task_create(struct http_request *req, void (*entry)(struct kore_task *)) pthread_mutex_unlock(&task_thread_lock); pthread_mutex_lock(&(tt->lock)); - req->task->thread = tt; - TAILQ_INSERT_TAIL(&(tt->tasks), req->task, list); + t->thread = tt; + TAILQ_INSERT_TAIL(&(tt->tasks), t, list); pthread_mutex_unlock(&(tt->lock)); pthread_cond_signal(&(tt->cond)); + + if (out != NULL) + *out = t; +} + +void +kore_task_bind_request(struct kore_task *t, struct http_request *req) +{ + kore_debug("kore_task_bind_request: %p bound to %p", req, t); + + t->req = req; + req->task = t; + req->flags |= HTTP_REQUEST_SLEEPING; } void kore_task_destroy(struct kore_task *t) { - if (t->fds[1] != -1) - close(t->fds[1]); + kore_debug("kore_task_destroy: %p", t); + + if (t->req != NULL) + t->req->task = NULL; + close(t->fds[0]); + close(t->fds[1]); /* This might already be closed. */ + kore_mem_free(t); } +int +kore_task_finished(struct kore_task *t) +{ + if (t->state == KORE_TASK_STATE_FINISHED) + return (1); + + return (0); +} + void kore_task_finish(struct kore_task *t) { kore_debug("kore_task_finished: %p", t); close(t->fds[1]); - t->fds[1] = -1; } void @@ -142,13 +162,15 @@ kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len) void kore_task_handle(struct kore_task *t, int finished) { - struct http_request *req = t->owner; - kore_debug("kore_task_handle: %p, %d", t, finished); if (finished) { - /* XXX - How do we deal with req being gone? */ - req->flags &= ~HTTP_REQUEST_SLEEPING; + t->state = KORE_TASK_STATE_FINISHED; + if (t->req != NULL) { + t->req->flags &= ~HTTP_REQUEST_SLEEPING; + if (t->req->flags & HTTP_REQUEST_DELETE) + kore_task_destroy(t); + } } } @@ -240,7 +262,10 @@ task_thread(void *arg) pthread_mutex_unlock(&task_thread_lock); kore_debug("task_thread#%d: executing %p", tt->idx, t); + + t->state = KORE_TASK_STATE_RUNNING; t->entry(t); + kore_task_finish(t); pthread_mutex_lock(&task_thread_lock); TAILQ_INSERT_HEAD(&task_threads, tt, list); diff --git a/includes/kore_tasks.h b/includes/kore_tasks.h @@ -17,11 +17,19 @@ #ifndef __H_KORE_TASKS #define __H_KORE_TASKS +#define KORE_TASK_STATE_CREATED 1 +#define KORE_TASK_STATE_RUNNING 2 +#define KORE_TASK_STATE_FINISHED 3 + +struct http_request; + struct kore_task { - u_int8_t type; - int fds[2]; - void *owner; - void (*entry)(struct kore_task *); + u_int8_t type; + u_int8_t state; + + struct http_request *req; + int fds[2]; + void (*entry)(struct kore_task *); struct kore_task_thread *thread; TAILQ_ENTRY(kore_task) list; @@ -40,8 +48,12 @@ struct kore_task_thread { void kore_task_init(void); void kore_task_finish(struct kore_task *); void kore_task_destroy(struct kore_task *); +int kore_task_finished(struct kore_task *); void kore_task_handle(struct kore_task *, int); -void kore_task_create(struct http_request *, + +void kore_task_bind_request(struct kore_task *, + struct http_request *); +void kore_task_create(struct kore_task **, void (*entry)(struct kore_task *)); u_int32_t kore_task_channel_read(struct kore_task *, void *, u_int32_t); diff --git a/src/connection.c b/src/connection.c @@ -225,9 +225,9 @@ kore_connection_handle(struct connection *c) void kore_connection_remove(struct connection *c) { - struct http_request *req; struct netbuf *nb, *next; struct spdy_stream *s, *snext; + struct http_request *req, *rnext; kore_debug("kore_connection_remove(%p)", c); @@ -249,8 +249,11 @@ kore_connection_remove(struct connection *c) if (c->deflate_started) deflateEnd(&(c->z_deflate)); - TAILQ_FOREACH(req, &(c->http_requests), olist) + for (req = TAILQ_FIRST(&(c->http_requests)); req != NULL; req = rnext) { + rnext = TAILQ_NEXT(req, olist); req->flags |= HTTP_REQUEST_DELETE; + TAILQ_REMOVE(&(c->http_requests), req, olist); + } for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); diff --git a/src/http.c b/src/http.c @@ -164,6 +164,11 @@ http_process(void) next = TAILQ_NEXT(req, list); if (req->flags & HTTP_REQUEST_DELETE) { +#if defined(KORE_USE_TASKS) + if (req->task != NULL && + req->task->state != KORE_TASK_STATE_FINISHED) + continue; +#endif TAILQ_REMOVE(&http_requests, req, list); http_request_free(req); http_request_count--; @@ -272,8 +277,6 @@ http_request_free(struct http_request *req) kore_debug("http_request_free: %p->%p", req->owner, req); - TAILQ_REMOVE(&(req->owner->http_requests), req, olist); - for (hdr = TAILQ_FIRST(&(req->resp_headers)); hdr != NULL; hdr = next) { next = TAILQ_NEXT(hdr, list); @@ -319,11 +322,6 @@ http_request_free(struct http_request *req) kore_pgsql_cleanup(req); #endif -#if defined(KORE_USE_TASKS) - if (req->task != NULL) - kore_task_destroy(req->task); -#endif - if (req->method == HTTP_METHOD_POST && req->post_data != NULL) kore_buf_free(req->post_data); if (req->method == HTTP_METHOD_POST && req->multipart_body != NULL)