kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 8565b47800e61e5aa28fef5eae99e9efdf925c46
parent f021c8db44b41aaaaf1a57155cbfcc93d4c06df3
Author: Joris Vink <joris@coders.se>
Date:   Thu, 14 Aug 2014 22:05:34 +0200

Attach tasks/pgsqls to http_requests once more.

This way if an http request is removed while tasks or
pgsqls are still active they are free'd out and cancelled
properly.

Diffstat:
examples/tasks/src/tasks.c | 36++++++++++++++++++++++++++----------
includes/http.h | 5+++--
includes/pgsql.h | 2++
includes/tasks.h | 4+++-
src/http.c | 39+++++++++++++++++++++++++++++++++++----
src/pgsql.c | 6++++++
src/tasks.c | 18++++++------------
7 files changed, 81 insertions(+), 29 deletions(-)

diff --git a/examples/tasks/src/tasks.c b/examples/tasks/src/tasks.c @@ -38,18 +38,25 @@ int post_back(struct http_request *); int page_handler(struct http_request *); size_t curl_write_cb(char *, size_t, size_t, void *); +struct rstate { + struct kore_task task; +}; + int page_handler(struct http_request *req) { u_int32_t len; + struct rstate *state; char *user, result[64]; /* * Lets check if a task has been created yet, this is important * as we only want to fire this off once and we will be called * again once it has been created. + * + * In this example, we'll store our state with our task in hdlr_extra. */ - if (req->task == NULL) { + if (req->hdlr_extra == NULL) { /* Grab the user argument */ http_populate_arguments(req); if (!http_argument_get_string("user", &user, &len)) { @@ -58,6 +65,13 @@ page_handler(struct http_request *req) } /* + * Allocate rstate and bind it to the hdlr_extra field. + * Kore automatically frees this when freeing the result. + */ + state = kore_malloc(sizeof(*state)); + req->hdlr_extra = state; + + /* * Create a new task that will execute the run_curl() * function and bind it to our request. * @@ -65,20 +79,22 @@ page_handler(struct http_request *req) * the page handler for that request to refire after the * task has completed or when it writes on the task channel. */ - kore_task_create(&req->task, run_curl); - kore_task_bind_request(req->task, req); + kore_task_create(&state->task, run_curl); + kore_task_bind_request(&state->task, req); /* * Start the task and write the user we received in our * GET request to its channel. */ - kore_task_run(req->task); - kore_task_channel_write(req->task, user, len); + kore_task_run(&state->task); + kore_task_channel_write(&state->task, user, len); /* * Tell Kore to retry us later. */ return (KORE_RESULT_RETRY); + } else { + state = req->hdlr_extra; } /* @@ -88,7 +104,7 @@ page_handler(struct http_request *req) * In order to distuingish between the two we can inspect the * state of the task. */ - if (kore_task_state(req->task) != KORE_TASK_STATE_FINISHED) { + if (kore_task_state(&state->task) != KORE_TASK_STATE_FINISHED) { http_request_sleep(req); return (KORE_RESULT_RETRY); } @@ -96,8 +112,8 @@ page_handler(struct http_request *req) /* * Task is finished, check the result. */ - if (kore_task_result(req->task) != KORE_RESULT_OK) { - kore_task_destroy(req->task); + if (kore_task_result(&state->task) != KORE_RESULT_OK) { + kore_task_destroy(&state->task); http_response(req, 500, NULL, 0); return (KORE_RESULT_OK); } @@ -110,7 +126,7 @@ page_handler(struct http_request *req) * larger then the buffer you passed this is a sign of truncation * and should be treated carefully. */ - len = kore_task_channel_read(req->task, result, sizeof(result)); + len = kore_task_channel_read(&state->task, result, sizeof(result)); if (len > sizeof(result)) { http_response(req, 500, NULL, 0); } else { @@ -118,7 +134,7 @@ page_handler(struct http_request *req) } /* We good, destroy the task. */ - kore_task_destroy(req->task); + kore_task_destroy(&state->task); return (KORE_RESULT_OK); } diff --git a/includes/http.h b/includes/http.h @@ -176,9 +176,10 @@ struct http_request { void *hdlr_extra; char *query_string; u_int8_t *multipart_body; - struct kore_module_handle *hdlr; - struct kore_task *task; + + LIST_HEAD(, kore_task) tasks; + LIST_HEAD(, kore_pgsql) pgsqls; TAILQ_HEAD(, http_header) req_headers; TAILQ_HEAD(, http_header) resp_headers; diff --git a/includes/pgsql.h b/includes/pgsql.h @@ -33,6 +33,8 @@ struct kore_pgsql { char *error; PGresult *result; struct pgsql_conn *conn; + + LIST_ENTRY(kore_pgsql) rlist; }; extern u_int16_t pgsql_conn_max; diff --git a/includes/tasks.h b/includes/tasks.h @@ -37,7 +37,9 @@ struct kore_task { int (*entry)(struct kore_task *); struct kore_task_thread *thread; + TAILQ_ENTRY(kore_task) list; + LIST_ENTRY(kore_task) rlist; }; struct kore_task_thread { @@ -59,7 +61,7 @@ void kore_task_handle(struct kore_task *, int); void kore_task_bind_request(struct kore_task *, struct http_request *); -void kore_task_create(struct kore_task **, +void kore_task_create(struct kore_task *, int (*entry)(struct kore_task *)); u_int32_t kore_task_channel_read(struct kore_task *, void *, u_int32_t); diff --git a/src/http.c b/src/http.c @@ -23,6 +23,10 @@ #include "kore.h" #include "http.h" +#if defined(KORE_USE_PGSQL) +#include "pgsql.h" +#endif + #if defined(KORE_USE_TASKS) #include "tasks.h" #endif @@ -144,7 +148,11 @@ http_request_new(struct connection *c, struct spdy_stream *s, const char *host, } #if defined(KORE_USE_TASKS) - req->task = NULL; + LIST_INIT(&(req->tasks)); +#endif + +#if defined(KORE_USE_PGSQL) + LIST_INIT(&(req->pgsqls)); #endif http_request_count++; @@ -296,18 +304,41 @@ http_response_header(struct http_request *req, void http_request_free(struct http_request *req) { +#if defined(KORE_USE_TASKS) + struct kore_task *t, *nt; + int pending_tasks; +#endif +#if defined(KORE_USE_PGSQL) + struct kore_pgsql *pgsql; +#endif struct http_file *f, *fnext; struct http_arg *q, *qnext; struct http_header *hdr, *next; #if defined(KORE_USE_TASKS) - if (req->task != NULL && !kore_task_finished(req->task)) { - if (kore_task_state(req->task) != KORE_TASK_STATE_ABORT) - kore_task_set_state(req->task, KORE_TASK_STATE_ABORT); + pending_tasks = 0; + for (t = LIST_FIRST(&(req->tasks)); t != NULL; t = nt) { + nt = LIST_NEXT(t, rlist); + if (!kore_task_finished(t)) { + pending_tasks++; + } else { + kore_task_destroy(t); + } + } + + if (pending_tasks) { + kore_debug("http_request_free %d pending tasks", pending_tasks); return; } #endif +#if defined(KORE_USE_PGSQL) + while (!LIST_EMPTY(&(req->pgsqls))) { + pgsql = LIST_FIRST(&(req->pgsqls)); + kore_pgsql_cleanup(pgsql); + } +#endif + kore_debug("http_request_free: %p->%p", req->owner, req); TAILQ_REMOVE(&http_requests, req, list); diff --git a/src/pgsql.c b/src/pgsql.c @@ -90,6 +90,8 @@ kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req, conn->job->pgsql = pgsql; conn->job->req = req; + LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist); + if (!PQsendQuery(conn->db, query)) { pgsql_conn_cleanup(conn); return (KORE_RESULT_ERROR); @@ -172,6 +174,8 @@ kore_pgsql_continue(struct http_request *req, struct kore_pgsql *pgsql) void kore_pgsql_cleanup(struct kore_pgsql *pgsql) { + kore_debug("kore_pgsql_cleanup(%p)", pgsql); + if (pgsql->result != NULL) PQclear(pgsql->result); @@ -187,6 +191,8 @@ kore_pgsql_cleanup(struct kore_pgsql *pgsql) pgsql->result = NULL; pgsql->error = NULL; pgsql->conn = NULL; + + LIST_REMOVE(pgsql, rlist); } void diff --git a/src/tasks.c b/src/tasks.c @@ -58,12 +58,8 @@ kore_task_init(void) } void -kore_task_create(struct kore_task **out, int (*entry)(struct kore_task *)) +kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *)) { - struct kore_task *t; - - t = kore_malloc(sizeof(struct kore_task)); - t->req = NULL; t->entry = entry; t->type = KORE_TYPE_TASK; @@ -72,9 +68,6 @@ kore_task_create(struct kore_task **out, int (*entry)(struct kore_task *)) if (socketpair(AF_UNIX, SOCK_STREAM, 0,t->fds) == -1) fatal("kore_task_create: socketpair() %s", errno_s); - - if (out != NULL) - *out = t; } void @@ -104,7 +97,7 @@ kore_task_bind_request(struct kore_task *t, struct http_request *req) kore_debug("kore_task_bind_request: %p bound to %p", req, t); t->req = req; - req->task = t; + LIST_INSERT_HEAD(&(req->tasks), t, rlist); http_request_sleep(req); kore_platform_schedule_read(t->fds[0], t); @@ -115,14 +108,15 @@ kore_task_destroy(struct kore_task *t) { kore_debug("kore_task_destroy: %p", t); - if (t->req != NULL) - t->req->task = NULL; + if (t->req != NULL) { + t->req = NULL; + LIST_REMOVE(t, rlist); + } close(t->fds[0]); close(t->fds[1]); /* This might already be closed. */ pthread_rwlock_destroy(&(t->lock)); - kore_mem_free(t); } int