kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit be4b1c7e7be915bb6cd68e24b7e2574d897a5500
parent 5090ebea20215fe14b5d5250eba6b7b9ddd13b8a
Author: Joris Vink <joris@coders.se>
Date:   Wed,  2 Jul 2014 12:19:38 +0200

Move actual code out of contrib into src/.

Diffstat:
Makefile | 4++--
contrib/postgres/kore_pgsql.c | 346-------------------------------------------------------------------------------
contrib/tasks/kore_tasks.c | 291------------------------------------------------------------------------------
src/pgsql.c | 346+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/tasks.c | 291++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 639 insertions(+), 639 deletions(-)

diff --git a/Makefile b/Makefile @@ -23,13 +23,13 @@ ifneq ("$(KORE_PEDANTIC_MALLOC)", "") endif ifneq ("$(PGSQL)", "") - S_SRC+=contrib/postgres/kore_pgsql.c + S_SRC+=src/pgsql.c LDFLAGS+=-L$(shell pg_config --libdir) -lpq CFLAGS+=-I$(shell pg_config --includedir) -DKORE_USE_PGSQL endif ifneq ("$(TASKS)", "") - S_SRC+=contrib/tasks/kore_tasks.c + S_SRC+=src/tasks.c LDFLAGS+=-lpthread CFLAGS+=-DKORE_USE_TASKS endif diff --git a/contrib/postgres/kore_pgsql.c b/contrib/postgres/kore_pgsql.c @@ -1,346 +0,0 @@ -/* - * Copyright (c) 2014 Joris Vink <joris@coders.se> - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -#include <sys/param.h> -#include <sys/queue.h> - -#include <libpq-fe.h> - -#include "kore.h" -#include "http.h" -#include "kore_pgsql.h" - -struct pgsql_job { - u_int8_t idx; - struct http_request *req; - u_int64_t start; - char *query; - - TAILQ_ENTRY(pgsql_job) list; -}; - -#define PGSQL_CONN_MAX 2 -#define PGSQL_CONN_FREE 0x01 - -struct pgsql_conn { - u_int8_t type; - u_int8_t flags; - - PGconn *db; - struct pgsql_job *job; - TAILQ_ENTRY(pgsql_conn) list; -}; - -static void pgsql_conn_cleanup(struct pgsql_conn *); -static int pgsql_conn_create(struct http_request *, int); -static void pgsql_read_result(struct http_request *, int, - struct pgsql_conn *); - -static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free; -static u_int16_t pgsql_conn_count; -char *pgsql_conn_string = NULL; - -void -kore_pgsql_init(void) -{ - pgsql_conn_count = 0; - TAILQ_INIT(&pgsql_conn_free); -} - -int -kore_pgsql_query(struct http_request *req, char *query, int idx) -{ - int fd; - struct pgsql_conn *conn; - - if (idx >= HTTP_PGSQL_MAX) - fatal("kore_pgsql_query: %d > %d", idx, HTTP_PGSQL_MAX); - if (req->pgsql[idx] != NULL) - fatal("kore_pgsql_query: %d already exists", idx); - - if (TAILQ_EMPTY(&pgsql_conn_free)) { - if (pgsql_conn_count >= PGSQL_CONN_MAX) - return (KORE_RESULT_ERROR); - } - - req->pgsql[idx] = kore_malloc(sizeof(struct kore_pgsql)); - req->pgsql[idx]->state = KORE_PGSQL_STATE_INIT; - req->pgsql[idx]->result = NULL; - req->pgsql[idx]->error = NULL; - req->pgsql[idx]->conn = NULL; - - if (TAILQ_EMPTY(&pgsql_conn_free)) { - if (pgsql_conn_create(req, idx) == KORE_RESULT_ERROR) - return (KORE_RESULT_ERROR); - } - - req->flags |= HTTP_REQUEST_SLEEPING; - conn = TAILQ_FIRST(&pgsql_conn_free); - if (!(conn->flags & PGSQL_CONN_FREE)) - fatal("received a pgsql conn that was not free?"); - - conn->flags &= ~PGSQL_CONN_FREE; - TAILQ_REMOVE(&pgsql_conn_free, conn, list); - - req->pgsql[idx]->conn = conn; - conn->job = kore_malloc(sizeof(struct pgsql_job)); - conn->job->query = kore_strdup(query); - conn->job->start = kore_time_ms(); - conn->job->req = req; - conn->job->idx = idx; - - if (!PQsendQuery(conn->db, query)) { - pgsql_conn_cleanup(conn); - return (KORE_RESULT_ERROR); - } - - fd = PQsocket(conn->db); - if (fd < 0) - fatal("PQsocket returned < 0 fd on open connection"); - - kore_platform_schedule_read(fd, conn); - req->pgsql[idx]->state = KORE_PGSQL_STATE_WAIT; - kore_debug("query '%s' for %p sent on %p", query, req, conn); - - return (KORE_RESULT_OK); -} - -void -kore_pgsql_handle(void *c, int err) -{ - int i; - struct http_request *req; - struct pgsql_conn *conn = (struct pgsql_conn *)c; - - if (err) { - pgsql_conn_cleanup(conn); - return; - } - - i = conn->job->idx; - req = conn->job->req; - kore_debug("kore_pgsql_handle: %p (%d) (%d)", - req, i, req->pgsql[i]->state); - - if (!PQconsumeInput(conn->db)) { - req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); - } else { - pgsql_read_result(req, i, conn); - } - - if (req->pgsql[i]->state == KORE_PGSQL_STATE_WAIT) { - req->flags |= HTTP_REQUEST_SLEEPING; - } else { - req->flags &= ~HTTP_REQUEST_SLEEPING; - http_process_request(req, 1); - } -} - -void -kore_pgsql_continue(struct http_request *req, int i) -{ - int fd; - struct pgsql_conn *conn; - - kore_debug("kore_pgsql_continue: %p->%p (%d) (%d)", - req->owner, req, i, req->pgsql[i]->state); - - if (req->pgsql[i]->error) { - kore_mem_free(req->pgsql[i]->error); - req->pgsql[i]->error = NULL; - } - - if (req->pgsql[i]->result) - PQclear(req->pgsql[i]->result); - - conn = req->pgsql[i]->conn; - switch (req->pgsql[i]->state) { - case KORE_PGSQL_STATE_INIT: - case KORE_PGSQL_STATE_WAIT: - break; - case KORE_PGSQL_STATE_DONE: - req->pgsql[i]->conn = NULL; - req->flags &= ~HTTP_REQUEST_SLEEPING; - req->pgsql[i]->state = KORE_PGSQL_STATE_COMPLETE; - - kore_mem_free(conn->job->query); - kore_mem_free(conn->job); - - conn->job = NULL; - conn->flags |= PGSQL_CONN_FREE; - TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list); - - fd = PQsocket(conn->db); - kore_platform_disable_read(fd); - - http_process_request(req, 0); - break; - case KORE_PGSQL_STATE_ERROR: - case KORE_PGSQL_STATE_RESULT: - kore_pgsql_handle(conn, 0); - break; - default: - fatal("unknown pgsql state"); - } -} - -void -kore_pgsql_cleanup(struct http_request *req) -{ - int i; - struct pgsql_conn *conn; - - for (i = 0; i < HTTP_PGSQL_MAX; i++) { - if (req->pgsql[i] == NULL) - continue; - - if (req->pgsql[i]->result != NULL) { - kore_log(LOG_NOTICE, "cleaning up leaked pgsql result"); - PQclear(req->pgsql[i]->result); - } - - if (req->pgsql[i]->error != NULL) - kore_mem_free(req->pgsql[i]->error); - - if (req->pgsql[i]->conn != NULL) { - conn = req->pgsql[i]->conn; - while (PQgetResult(conn->db) != NULL) - ; - } - - req->pgsql[i]->conn = NULL; - kore_mem_free(req->pgsql[i]); - req->pgsql[i] = NULL; - } -} - -void -kore_pgsql_logerror(struct kore_pgsql *pgsql) -{ - kore_log(LOG_NOTICE, "pgsql error: %s", - (pgsql->error) ? pgsql->error : "unknown"); -} - -int -kore_pgsql_ntuples(struct kore_pgsql *pgsql) -{ - return (PQntuples(pgsql->result)); -} - -char * -kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) -{ - return (PQgetvalue(pgsql->result, row, col)); -} - -static int -pgsql_conn_create(struct http_request *req, int idx) -{ - struct pgsql_conn *conn; - - if (pgsql_conn_string == NULL) - fatal("pgsql_conn_create: no connection string"); - - pgsql_conn_count++; - conn = kore_malloc(sizeof(*conn)); - kore_debug("pgsql_conn_create(): %p", conn); - memset(conn, 0, sizeof(*conn)); - - conn->db = PQconnectdb(pgsql_conn_string); - if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) { - req->pgsql[idx]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[idx]->error = kore_strdup(PQerrorMessage(conn->db)); - pgsql_conn_cleanup(conn); - return (KORE_RESULT_ERROR); - } - - conn->job = NULL; - conn->flags = PGSQL_CONN_FREE; - conn->type = KORE_TYPE_PGSQL_CONN; - TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list); - - return (KORE_RESULT_OK); -} - -static void -pgsql_conn_cleanup(struct pgsql_conn *conn) -{ - int i; - struct http_request *req; - - kore_debug("pgsql_conn_cleanup(): %p", conn); - - if (conn->flags & PGSQL_CONN_FREE) - TAILQ_REMOVE(&pgsql_conn_free, conn, list); - - if (conn->job) { - i = conn->job->idx; - req = conn->job->req; - - req->pgsql[i]->conn = NULL; - req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); - req->flags &= ~HTTP_REQUEST_SLEEPING; - - kore_mem_free(conn->job->query); - kore_mem_free(conn->job); - conn->job = NULL; - } - - if (conn->db != NULL) - PQfinish(conn->db); - - pgsql_conn_count--; - kore_mem_free(conn); -} - -static void -pgsql_read_result(struct http_request *req, int i, struct pgsql_conn *conn) -{ - if (PQisBusy(conn->db)) { - req->pgsql[i]->state = KORE_PGSQL_STATE_WAIT; - return; - } - - req->pgsql[i]->result = PQgetResult(conn->db); - if (req->pgsql[i]->result == NULL) { - req->pgsql[i]->state = KORE_PGSQL_STATE_DONE; - return; - } - - switch (PQresultStatus(req->pgsql[i]->result)) { - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - case PGRES_NONFATAL_ERROR: - case PGRES_COPY_BOTH: - break; - case PGRES_COMMAND_OK: - req->pgsql[i]->state = KORE_PGSQL_STATE_DONE; - break; - case PGRES_TUPLES_OK: - case PGRES_SINGLE_TUPLE: - req->pgsql[i]->state = KORE_PGSQL_STATE_RESULT; - break; - case PGRES_EMPTY_QUERY: - case PGRES_BAD_RESPONSE: - case PGRES_FATAL_ERROR: - req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = - kore_strdup(PQresultErrorMessage(req->pgsql[i]->result)); - break; - } -} diff --git a/contrib/tasks/kore_tasks.c b/contrib/tasks/kore_tasks.c @@ -1,291 +0,0 @@ -/* - * Copyright (c) 2014 Joris Vink <joris@coders.se> - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -#include <sys/param.h> -#include <sys/queue.h> -#include <sys/socket.h> - -#include <pthread.h> -#include <stdio.h> -#include <stdlib.h> - -#include "kore.h" -#include "http.h" -#include "kore_tasks.h" - -static u_int8_t threads; -static pthread_mutex_t task_thread_lock; - -static TAILQ_HEAD(, kore_task_thread) task_threads; - -static void *task_thread(void *); -static void task_channel_read(int, void *, u_int32_t); -static void task_channel_write(int, void *, u_int32_t); -static void task_thread_spawn(struct kore_task_thread **); - -#define THREAD_FD_ASSIGN(t, f, i, o) \ - do { \ - if (pthread_self() == t) { \ - f = i; \ - } else { \ - f = o; \ - } \ - } while (0); - -void -kore_task_init(void) -{ - threads = 0; - - TAILQ_INIT(&task_threads); - pthread_mutex_init(&task_thread_lock, NULL); -} - -void -kore_task_create(struct kore_task **out, int (*entry)(struct kore_task *)) -{ - struct kore_task *t; - - t = kore_malloc(sizeof(struct kore_task)); - - t->req = NULL; - t->entry = entry; - t->type = KORE_TYPE_TASK; - t->state = KORE_TASK_STATE_CREATED; - - if (socketpair(AF_UNIX, SOCK_STREAM, 0,t->fds) == -1) - fatal("kore_task_create: socketpair() %s", errno_s); - - if (out != NULL) - *out = t; -} - -void -kore_task_run(struct kore_task *t) -{ - struct kore_task_thread *tt; - - pthread_mutex_lock(&task_thread_lock); - if (TAILQ_EMPTY(&task_threads)) - task_thread_spawn(&tt); - else - tt = TAILQ_FIRST(&task_threads); - - pthread_mutex_unlock(&task_thread_lock); - pthread_mutex_lock(&(tt->lock)); - - t->thread = tt; - TAILQ_INSERT_TAIL(&(tt->tasks), t, list); - - pthread_mutex_unlock(&(tt->lock)); - pthread_cond_signal(&(tt->cond)); -} - -void -kore_task_bind_request(struct kore_task *t, struct http_request *req) -{ - kore_debug("kore_task_bind_request: %p bound to %p", req, t); - - t->req = req; - req->task = t; - req->flags |= HTTP_REQUEST_SLEEPING; - - kore_platform_schedule_read(t->fds[0], t); -} - -void -kore_task_destroy(struct kore_task *t) -{ - kore_debug("kore_task_destroy: %p", t); - - if (t->req != NULL) - t->req->task = NULL; - - close(t->fds[0]); - close(t->fds[1]); /* This might already be closed. */ - - kore_mem_free(t); -} - -int -kore_task_finished(struct kore_task *t) -{ - if (t->state == KORE_TASK_STATE_FINISHED) - return (1); - - return (0); -} - -void -kore_task_finish(struct kore_task *t) -{ - kore_debug("kore_task_finished: %p (%d)", t, t->result); - close(t->fds[1]); -} - -void -kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len) -{ - int fd; - - kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len); - - THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); - task_channel_write(fd, &len, sizeof(len)); - task_channel_write(fd, data, len); -} - -u_int32_t -kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len) -{ - int fd; - u_int32_t dlen, bytes; - - kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len); - - THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); - task_channel_read(fd, &dlen, sizeof(dlen)); - - if (dlen > len) - bytes = len; - else - bytes = dlen; - - task_channel_read(fd, out, bytes); - - return (dlen); -} - -void -kore_task_handle(struct kore_task *t, int finished) -{ - kore_debug("kore_task_handle: %p, %d", t, finished); - - if (finished) { - t->state = KORE_TASK_STATE_FINISHED; - if (t->req != NULL) { - t->req->flags &= ~HTTP_REQUEST_SLEEPING; - if (t->req->flags & HTTP_REQUEST_DELETE) - kore_task_destroy(t); - } - } -} - -static void -task_channel_write(int fd, void *data, u_int32_t len) -{ - ssize_t r; - u_int8_t *d; - u_int32_t offset; - - d = data; - offset = 0; - while (offset != len) { - r = write(fd, d + offset, len - offset); - if (r == -1 && errno == EINTR) - continue; - if (r == -1) - fatal("task_channel_write: %s", errno_s); - offset += r; - } -} - -static void -task_channel_read(int fd, void *out, u_int32_t len) -{ - ssize_t r; - u_int8_t *d; - u_int32_t offset; - - d = out; - offset = 0; - while (offset != len) { - r = read(fd, d + offset, len - offset); - if (r == -1 && errno == EINTR) - continue; - if (r == -1) - fatal("task_channel_read: %s", errno_s); - if (r == 0) - fatal("task_channel_read: unexpected eof"); - - offset += r; - } -} - -static void -task_thread_spawn(struct kore_task_thread **out) -{ - struct kore_task_thread *tt; - - tt = kore_malloc(sizeof(*tt)); - tt->idx = threads++; - - TAILQ_INIT(&(tt->tasks)); - pthread_cond_init(&(tt->cond), NULL); - pthread_mutex_init(&(tt->lock), NULL); - - if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0) - fatal("pthread_create: %s", errno_s); - - *out = tt; -} - -static void * -task_thread(void *arg) -{ - struct kore_task *t; - struct kore_task_thread *tt = arg; - - kore_debug("task_thread: #%d starting", tt->idx); - - pthread_mutex_lock(&(tt->lock)); - - pthread_mutex_lock(&task_thread_lock); - TAILQ_INSERT_TAIL(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - - for (;;) { - if (TAILQ_EMPTY(&(tt->tasks))) - pthread_cond_wait(&(tt->cond), &(tt->lock)); - - kore_debug("task_thread#%d: woke up", tt->idx); - - t = TAILQ_FIRST(&(tt->tasks)); - TAILQ_REMOVE(&(tt->tasks), t, list); - pthread_mutex_unlock(&(tt->lock)); - - pthread_mutex_lock(&task_thread_lock); - TAILQ_REMOVE(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - - kore_debug("task_thread#%d: executing %p", tt->idx, t); - - t->state = KORE_TASK_STATE_RUNNING; - t->result = t->entry(t); - kore_task_finish(t); - - pthread_mutex_lock(&task_thread_lock); - TAILQ_INSERT_HEAD(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - - pthread_mutex_lock(&(tt->lock)); - } - - pthread_exit(NULL); - - /* NOTREACHED */ - return (NULL); -} diff --git a/src/pgsql.c b/src/pgsql.c @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2014 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/queue.h> + +#include <libpq-fe.h> + +#include "kore.h" +#include "http.h" +#include "kore_pgsql.h" + +struct pgsql_job { + u_int8_t idx; + struct http_request *req; + u_int64_t start; + char *query; + + TAILQ_ENTRY(pgsql_job) list; +}; + +#define PGSQL_CONN_MAX 2 +#define PGSQL_CONN_FREE 0x01 + +struct pgsql_conn { + u_int8_t type; + u_int8_t flags; + + PGconn *db; + struct pgsql_job *job; + TAILQ_ENTRY(pgsql_conn) list; +}; + +static void pgsql_conn_cleanup(struct pgsql_conn *); +static int pgsql_conn_create(struct http_request *, int); +static void pgsql_read_result(struct http_request *, int, + struct pgsql_conn *); + +static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free; +static u_int16_t pgsql_conn_count; +char *pgsql_conn_string = NULL; + +void +kore_pgsql_init(void) +{ + pgsql_conn_count = 0; + TAILQ_INIT(&pgsql_conn_free); +} + +int +kore_pgsql_query(struct http_request *req, char *query, int idx) +{ + int fd; + struct pgsql_conn *conn; + + if (idx >= HTTP_PGSQL_MAX) + fatal("kore_pgsql_query: %d > %d", idx, HTTP_PGSQL_MAX); + if (req->pgsql[idx] != NULL) + fatal("kore_pgsql_query: %d already exists", idx); + + if (TAILQ_EMPTY(&pgsql_conn_free)) { + if (pgsql_conn_count >= PGSQL_CONN_MAX) + return (KORE_RESULT_ERROR); + } + + req->pgsql[idx] = kore_malloc(sizeof(struct kore_pgsql)); + req->pgsql[idx]->state = KORE_PGSQL_STATE_INIT; + req->pgsql[idx]->result = NULL; + req->pgsql[idx]->error = NULL; + req->pgsql[idx]->conn = NULL; + + if (TAILQ_EMPTY(&pgsql_conn_free)) { + if (pgsql_conn_create(req, idx) == KORE_RESULT_ERROR) + return (KORE_RESULT_ERROR); + } + + req->flags |= HTTP_REQUEST_SLEEPING; + conn = TAILQ_FIRST(&pgsql_conn_free); + if (!(conn->flags & PGSQL_CONN_FREE)) + fatal("received a pgsql conn that was not free?"); + + conn->flags &= ~PGSQL_CONN_FREE; + TAILQ_REMOVE(&pgsql_conn_free, conn, list); + + req->pgsql[idx]->conn = conn; + conn->job = kore_malloc(sizeof(struct pgsql_job)); + conn->job->query = kore_strdup(query); + conn->job->start = kore_time_ms(); + conn->job->req = req; + conn->job->idx = idx; + + if (!PQsendQuery(conn->db, query)) { + pgsql_conn_cleanup(conn); + return (KORE_RESULT_ERROR); + } + + fd = PQsocket(conn->db); + if (fd < 0) + fatal("PQsocket returned < 0 fd on open connection"); + + kore_platform_schedule_read(fd, conn); + req->pgsql[idx]->state = KORE_PGSQL_STATE_WAIT; + kore_debug("query '%s' for %p sent on %p", query, req, conn); + + return (KORE_RESULT_OK); +} + +void +kore_pgsql_handle(void *c, int err) +{ + int i; + struct http_request *req; + struct pgsql_conn *conn = (struct pgsql_conn *)c; + + if (err) { + pgsql_conn_cleanup(conn); + return; + } + + i = conn->job->idx; + req = conn->job->req; + kore_debug("kore_pgsql_handle: %p (%d) (%d)", + req, i, req->pgsql[i]->state); + + if (!PQconsumeInput(conn->db)) { + req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; + req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); + } else { + pgsql_read_result(req, i, conn); + } + + if (req->pgsql[i]->state == KORE_PGSQL_STATE_WAIT) { + req->flags |= HTTP_REQUEST_SLEEPING; + } else { + req->flags &= ~HTTP_REQUEST_SLEEPING; + http_process_request(req, 1); + } +} + +void +kore_pgsql_continue(struct http_request *req, int i) +{ + int fd; + struct pgsql_conn *conn; + + kore_debug("kore_pgsql_continue: %p->%p (%d) (%d)", + req->owner, req, i, req->pgsql[i]->state); + + if (req->pgsql[i]->error) { + kore_mem_free(req->pgsql[i]->error); + req->pgsql[i]->error = NULL; + } + + if (req->pgsql[i]->result) + PQclear(req->pgsql[i]->result); + + conn = req->pgsql[i]->conn; + switch (req->pgsql[i]->state) { + case KORE_PGSQL_STATE_INIT: + case KORE_PGSQL_STATE_WAIT: + break; + case KORE_PGSQL_STATE_DONE: + req->pgsql[i]->conn = NULL; + req->flags &= ~HTTP_REQUEST_SLEEPING; + req->pgsql[i]->state = KORE_PGSQL_STATE_COMPLETE; + + kore_mem_free(conn->job->query); + kore_mem_free(conn->job); + + conn->job = NULL; + conn->flags |= PGSQL_CONN_FREE; + TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list); + + fd = PQsocket(conn->db); + kore_platform_disable_read(fd); + + http_process_request(req, 0); + break; + case KORE_PGSQL_STATE_ERROR: + case KORE_PGSQL_STATE_RESULT: + kore_pgsql_handle(conn, 0); + break; + default: + fatal("unknown pgsql state"); + } +} + +void +kore_pgsql_cleanup(struct http_request *req) +{ + int i; + struct pgsql_conn *conn; + + for (i = 0; i < HTTP_PGSQL_MAX; i++) { + if (req->pgsql[i] == NULL) + continue; + + if (req->pgsql[i]->result != NULL) { + kore_log(LOG_NOTICE, "cleaning up leaked pgsql result"); + PQclear(req->pgsql[i]->result); + } + + if (req->pgsql[i]->error != NULL) + kore_mem_free(req->pgsql[i]->error); + + if (req->pgsql[i]->conn != NULL) { + conn = req->pgsql[i]->conn; + while (PQgetResult(conn->db) != NULL) + ; + } + + req->pgsql[i]->conn = NULL; + kore_mem_free(req->pgsql[i]); + req->pgsql[i] = NULL; + } +} + +void +kore_pgsql_logerror(struct kore_pgsql *pgsql) +{ + kore_log(LOG_NOTICE, "pgsql error: %s", + (pgsql->error) ? pgsql->error : "unknown"); +} + +int +kore_pgsql_ntuples(struct kore_pgsql *pgsql) +{ + return (PQntuples(pgsql->result)); +} + +char * +kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) +{ + return (PQgetvalue(pgsql->result, row, col)); +} + +static int +pgsql_conn_create(struct http_request *req, int idx) +{ + struct pgsql_conn *conn; + + if (pgsql_conn_string == NULL) + fatal("pgsql_conn_create: no connection string"); + + pgsql_conn_count++; + conn = kore_malloc(sizeof(*conn)); + kore_debug("pgsql_conn_create(): %p", conn); + memset(conn, 0, sizeof(*conn)); + + conn->db = PQconnectdb(pgsql_conn_string); + if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) { + req->pgsql[idx]->state = KORE_PGSQL_STATE_ERROR; + req->pgsql[idx]->error = kore_strdup(PQerrorMessage(conn->db)); + pgsql_conn_cleanup(conn); + return (KORE_RESULT_ERROR); + } + + conn->job = NULL; + conn->flags = PGSQL_CONN_FREE; + conn->type = KORE_TYPE_PGSQL_CONN; + TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list); + + return (KORE_RESULT_OK); +} + +static void +pgsql_conn_cleanup(struct pgsql_conn *conn) +{ + int i; + struct http_request *req; + + kore_debug("pgsql_conn_cleanup(): %p", conn); + + if (conn->flags & PGSQL_CONN_FREE) + TAILQ_REMOVE(&pgsql_conn_free, conn, list); + + if (conn->job) { + i = conn->job->idx; + req = conn->job->req; + + req->pgsql[i]->conn = NULL; + req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; + req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); + req->flags &= ~HTTP_REQUEST_SLEEPING; + + kore_mem_free(conn->job->query); + kore_mem_free(conn->job); + conn->job = NULL; + } + + if (conn->db != NULL) + PQfinish(conn->db); + + pgsql_conn_count--; + kore_mem_free(conn); +} + +static void +pgsql_read_result(struct http_request *req, int i, struct pgsql_conn *conn) +{ + if (PQisBusy(conn->db)) { + req->pgsql[i]->state = KORE_PGSQL_STATE_WAIT; + return; + } + + req->pgsql[i]->result = PQgetResult(conn->db); + if (req->pgsql[i]->result == NULL) { + req->pgsql[i]->state = KORE_PGSQL_STATE_DONE; + return; + } + + switch (PQresultStatus(req->pgsql[i]->result)) { + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_NONFATAL_ERROR: + case PGRES_COPY_BOTH: + break; + case PGRES_COMMAND_OK: + req->pgsql[i]->state = KORE_PGSQL_STATE_DONE; + break; + case PGRES_TUPLES_OK: + case PGRES_SINGLE_TUPLE: + req->pgsql[i]->state = KORE_PGSQL_STATE_RESULT; + break; + case PGRES_EMPTY_QUERY: + case PGRES_BAD_RESPONSE: + case PGRES_FATAL_ERROR: + req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; + req->pgsql[i]->error = + kore_strdup(PQresultErrorMessage(req->pgsql[i]->result)); + break; + } +} diff --git a/src/tasks.c b/src/tasks.c @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2014 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/queue.h> +#include <sys/socket.h> + +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> + +#include "kore.h" +#include "http.h" +#include "kore_tasks.h" + +static u_int8_t threads; +static pthread_mutex_t task_thread_lock; + +static TAILQ_HEAD(, kore_task_thread) task_threads; + +static void *task_thread(void *); +static void task_channel_read(int, void *, u_int32_t); +static void task_channel_write(int, void *, u_int32_t); +static void task_thread_spawn(struct kore_task_thread **); + +#define THREAD_FD_ASSIGN(t, f, i, o) \ + do { \ + if (pthread_self() == t) { \ + f = i; \ + } else { \ + f = o; \ + } \ + } while (0); + +void +kore_task_init(void) +{ + threads = 0; + + TAILQ_INIT(&task_threads); + pthread_mutex_init(&task_thread_lock, NULL); +} + +void +kore_task_create(struct kore_task **out, int (*entry)(struct kore_task *)) +{ + struct kore_task *t; + + t = kore_malloc(sizeof(struct kore_task)); + + t->req = NULL; + t->entry = entry; + t->type = KORE_TYPE_TASK; + t->state = KORE_TASK_STATE_CREATED; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0,t->fds) == -1) + fatal("kore_task_create: socketpair() %s", errno_s); + + if (out != NULL) + *out = t; +} + +void +kore_task_run(struct kore_task *t) +{ + struct kore_task_thread *tt; + + pthread_mutex_lock(&task_thread_lock); + if (TAILQ_EMPTY(&task_threads)) + task_thread_spawn(&tt); + else + tt = TAILQ_FIRST(&task_threads); + + pthread_mutex_unlock(&task_thread_lock); + pthread_mutex_lock(&(tt->lock)); + + t->thread = tt; + TAILQ_INSERT_TAIL(&(tt->tasks), t, list); + + pthread_mutex_unlock(&(tt->lock)); + pthread_cond_signal(&(tt->cond)); +} + +void +kore_task_bind_request(struct kore_task *t, struct http_request *req) +{ + kore_debug("kore_task_bind_request: %p bound to %p", req, t); + + t->req = req; + req->task = t; + req->flags |= HTTP_REQUEST_SLEEPING; + + kore_platform_schedule_read(t->fds[0], t); +} + +void +kore_task_destroy(struct kore_task *t) +{ + kore_debug("kore_task_destroy: %p", t); + + if (t->req != NULL) + t->req->task = NULL; + + close(t->fds[0]); + close(t->fds[1]); /* This might already be closed. */ + + kore_mem_free(t); +} + +int +kore_task_finished(struct kore_task *t) +{ + if (t->state == KORE_TASK_STATE_FINISHED) + return (1); + + return (0); +} + +void +kore_task_finish(struct kore_task *t) +{ + kore_debug("kore_task_finished: %p (%d)", t, t->result); + close(t->fds[1]); +} + +void +kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len) +{ + int fd; + + kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len); + + THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); + task_channel_write(fd, &len, sizeof(len)); + task_channel_write(fd, data, len); +} + +u_int32_t +kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len) +{ + int fd; + u_int32_t dlen, bytes; + + kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len); + + THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); + task_channel_read(fd, &dlen, sizeof(dlen)); + + if (dlen > len) + bytes = len; + else + bytes = dlen; + + task_channel_read(fd, out, bytes); + + return (dlen); +} + +void +kore_task_handle(struct kore_task *t, int finished) +{ + kore_debug("kore_task_handle: %p, %d", t, finished); + + if (finished) { + t->state = KORE_TASK_STATE_FINISHED; + if (t->req != NULL) { + t->req->flags &= ~HTTP_REQUEST_SLEEPING; + if (t->req->flags & HTTP_REQUEST_DELETE) + kore_task_destroy(t); + } + } +} + +static void +task_channel_write(int fd, void *data, u_int32_t len) +{ + ssize_t r; + u_int8_t *d; + u_int32_t offset; + + d = data; + offset = 0; + while (offset != len) { + r = write(fd, d + offset, len - offset); + if (r == -1 && errno == EINTR) + continue; + if (r == -1) + fatal("task_channel_write: %s", errno_s); + offset += r; + } +} + +static void +task_channel_read(int fd, void *out, u_int32_t len) +{ + ssize_t r; + u_int8_t *d; + u_int32_t offset; + + d = out; + offset = 0; + while (offset != len) { + r = read(fd, d + offset, len - offset); + if (r == -1 && errno == EINTR) + continue; + if (r == -1) + fatal("task_channel_read: %s", errno_s); + if (r == 0) + fatal("task_channel_read: unexpected eof"); + + offset += r; + } +} + +static void +task_thread_spawn(struct kore_task_thread **out) +{ + struct kore_task_thread *tt; + + tt = kore_malloc(sizeof(*tt)); + tt->idx = threads++; + + TAILQ_INIT(&(tt->tasks)); + pthread_cond_init(&(tt->cond), NULL); + pthread_mutex_init(&(tt->lock), NULL); + + if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0) + fatal("pthread_create: %s", errno_s); + + *out = tt; +} + +static void * +task_thread(void *arg) +{ + struct kore_task *t; + struct kore_task_thread *tt = arg; + + kore_debug("task_thread: #%d starting", tt->idx); + + pthread_mutex_lock(&(tt->lock)); + + pthread_mutex_lock(&task_thread_lock); + TAILQ_INSERT_TAIL(&task_threads, tt, list); + pthread_mutex_unlock(&task_thread_lock); + + for (;;) { + if (TAILQ_EMPTY(&(tt->tasks))) + pthread_cond_wait(&(tt->cond), &(tt->lock)); + + kore_debug("task_thread#%d: woke up", tt->idx); + + t = TAILQ_FIRST(&(tt->tasks)); + TAILQ_REMOVE(&(tt->tasks), t, list); + pthread_mutex_unlock(&(tt->lock)); + + pthread_mutex_lock(&task_thread_lock); + TAILQ_REMOVE(&task_threads, tt, list); + pthread_mutex_unlock(&task_thread_lock); + + kore_debug("task_thread#%d: executing %p", tt->idx, t); + + t->state = KORE_TASK_STATE_RUNNING; + t->result = t->entry(t); + kore_task_finish(t); + + pthread_mutex_lock(&task_thread_lock); + TAILQ_INSERT_HEAD(&task_threads, tt, list); + pthread_mutex_unlock(&task_thread_lock); + + pthread_mutex_lock(&(tt->lock)); + } + + pthread_exit(NULL); + + /* NOTREACHED */ + return (NULL); +}