kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 7b6c03ca5b6d6cc47d87788efba7bf91151fe647
parent baac693f2f90860a4697d7f8aec10f7dabc160d0
Author: Joris Vink <joris@coders.se>
Date:   Fri,  4 Jul 2014 11:28:17 +0200

Task improvements.

Synchronize access to state/result properly so one
can access these from inside the task as well.

Introduce KORE_TASK_STATE_ABORT which will be set
when a task needs to be abort. You can use this
to create tasks that run in a loop until aborted.

Diffstat:
contrib/examples/task_curl/example.c | 2+-
includes/tasks.h | 12++++++++++--
src/http.c | 24+++++++++++++++---------
src/tasks.c | 53++++++++++++++++++++++++++++++++++++++++++++++-------
4 files changed, 72 insertions(+), 19 deletions(-)

diff --git a/contrib/examples/task_curl/example.c b/contrib/examples/task_curl/example.c @@ -94,7 +94,7 @@ page_handler(struct http_request *req) * When we come back here, our background task is finished * and we can check its result. */ - if (req->task->result != KORE_RESULT_OK) { + if (kore_task_result(req->task) != KORE_RESULT_OK) { kore_task_destroy(req->task); http_response(req, 500, NULL, 0); return (KORE_RESULT_OK); diff --git a/includes/tasks.h b/includes/tasks.h @@ -20,13 +20,15 @@ #define KORE_TASK_STATE_CREATED 1 #define KORE_TASK_STATE_RUNNING 2 #define KORE_TASK_STATE_FINISHED 3 +#define KORE_TASK_STATE_ABORT 4 struct http_request; struct kore_task { u_int8_t type; - volatile u_int8_t state; - volatile int result; + int state; + int result; + pthread_rwlock_t lock; struct http_request *req; int fds[2]; @@ -61,4 +63,10 @@ void kore_task_create(struct kore_task **, u_int32_t kore_task_channel_read(struct kore_task *, void *, u_int32_t); void kore_task_channel_write(struct kore_task *, void *, u_int32_t); +void kore_task_set_state(struct kore_task *, int); +void kore_task_set_result(struct kore_task *, int); + +int kore_task_state(struct kore_task *); +int kore_task_result(struct kore_task *); + #endif diff --git a/src/http.c b/src/http.c @@ -164,14 +164,7 @@ http_process(void) next = TAILQ_NEXT(req, list); if (req->flags & HTTP_REQUEST_DELETE) { -#if defined(KORE_USE_TASKS) - if (req->task != NULL && - req->task->state != KORE_TASK_STATE_FINISHED) - continue; -#endif - TAILQ_REMOVE(&http_requests, req, list); http_request_free(req); - http_request_count--; continue; } @@ -275,8 +268,18 @@ http_request_free(struct http_request *req) struct http_arg *q, *qnext; struct http_header *hdr, *next; +#if defined(KORE_USE_TASKS) + if (req->task != NULL && !kore_task_finished(req->task)) { + if (kore_task_state(req->task) != KORE_TASK_STATE_ABORT) + kore_task_set_state(req->task, KORE_TASK_STATE_ABORT); + return; + } +#endif + kore_debug("http_request_free: %p->%p", req->owner, req); + TAILQ_REMOVE(&http_requests, req, list); + for (hdr = TAILQ_FIRST(&(req->resp_headers)); hdr != NULL; hdr = next) { next = TAILQ_NEXT(hdr, list); @@ -333,6 +336,7 @@ http_request_free(struct http_request *req) kore_mem_free(req->hdlr_extra); kore_pool_put(&http_request_pool, req); + http_request_count--; } void @@ -1082,8 +1086,10 @@ http_response_normal(struct http_request *req, struct connection *c, } } - kore_buf_appendf(buf, "Content-length: %d\r\n", len); - kore_buf_append(buf, "\r\n", 2); + if (len > 0) { + kore_buf_appendf(buf, "Content-length: %d\r\n", len); + kore_buf_append(buf, "\r\n", 2); + } htext = kore_buf_release(buf, &hlen); net_send_queue(c, htext, hlen, NULL); diff --git a/src/tasks.c b/src/tasks.c @@ -65,6 +65,7 @@ kore_task_create(struct kore_task **out, int (*entry)(struct kore_task *)) t->entry = entry; t->type = KORE_TYPE_TASK; t->state = KORE_TASK_STATE_CREATED; + pthread_rwlock_init(&(t->lock), NULL); if (socketpair(AF_UNIX, SOCK_STREAM, 0,t->fds) == -1) fatal("kore_task_create: socketpair() %s", errno_s); @@ -117,16 +118,14 @@ kore_task_destroy(struct kore_task *t) close(t->fds[0]); close(t->fds[1]); /* This might already be closed. */ + pthread_rwlock_destroy(&(t->lock)); kore_mem_free(t); } int kore_task_finished(struct kore_task *t) { - if (t->state == KORE_TASK_STATE_FINISHED) - return (1); - - return (0); + return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED)); } void @@ -175,7 +174,7 @@ kore_task_handle(struct kore_task *t, int finished) kore_debug("kore_task_handle: %p, %d", t, finished); if (finished) { - t->state = KORE_TASK_STATE_FINISHED; + kore_task_set_state(t, KORE_TASK_STATE_FINISHED); if (t->req != NULL) { t->req->flags &= ~HTTP_REQUEST_SLEEPING; if (t->req->flags & HTTP_REQUEST_DELETE) @@ -184,6 +183,46 @@ kore_task_handle(struct kore_task *t, int finished) } } +int +kore_task_state(struct kore_task *t) +{ + int s; + + pthread_rwlock_rdlock(&(t->lock)); + s = t->state; + pthread_rwlock_unlock(&(t->lock)); + + return (s); +} + +void +kore_task_set_state(struct kore_task *t, int state) +{ + pthread_rwlock_wrlock(&(t->lock)); + t->state = state; + pthread_rwlock_unlock(&(t->lock)); +} + +int +kore_task_result(struct kore_task *t) +{ + int r; + + pthread_rwlock_rdlock(&(t->lock)); + r = t->result; + pthread_rwlock_unlock(&(t->lock)); + + return (r); +} + +void +kore_task_set_result(struct kore_task *t, int result) +{ + pthread_rwlock_wrlock(&(t->lock)); + t->result = result; + pthread_rwlock_unlock(&(t->lock)); +} + static void task_channel_write(int fd, void *data, u_int32_t len) { @@ -273,8 +312,8 @@ task_thread(void *arg) kore_debug("task_thread#%d: executing %p", tt->idx, t); - t->state = KORE_TASK_STATE_RUNNING; - t->result = t->entry(t); + kore_task_set_state(t, KORE_TASK_STATE_RUNNING); + kore_task_set_result(t, t->entry(t)); kore_task_finish(t); pthread_mutex_lock(&task_thread_lock);