kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 59f7e85f459cb4bd2e2d5f0394a546d1ac598b00
parent add6d724e304158f289021ac8398323ed2841536
Author: Joris Vink <joris@coders.se>
Date:   Fri, 24 Mar 2017 12:53:07 +0100

Decouple pgsql from the http layer.

When the pgsql layer was introduced it was tightly coupled with the
http layer in order to make async work fluently.

The time has come to split these up and follow the same method we
used for tasks, allowing either http requests to be tied to a pgsql
data structure or a simple callback function.

This also reworks the internal queueing of pgsql requests until
connections to the db are available again.

The following API functions were changes:
	- kore_pgsql_query_init() -> kore_pgsql_setup()
		no longer takes an http_request parameter.
	- NEW kore_pgsql_init()
		must be called before operating on an kore_pgsql structure.
	- NEW kore_pgsql_bind_request()
		binds an http_request to a kore_pgsql data structure.
	- NEW kore_pgsql_bind_callback()
		binds a callback to a kore_pgsql data structure.

With all of this you can now build kore with PGSQL=1 NOHTTP=1.

The pgsql/ example has been updated to reflect these changes and
new features.

Diffstat:
examples/pgsql/conf/pgsql.conf | 6+++++-
examples/pgsql/src/init.c | 47+++++++++++++++++++++++++++++++++++++++++++++++
examples/pgsql/src/pgsql.c | 88+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
examples/pgsql/src/pgsql_cb.c | 136+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
includes/http.h | 7++++++-
includes/pgsql.h | 18++++++++++++------
includes/python_methods.h | 7++++---
src/http.c | 36+++++++++++++++++++++++++++++++++---
src/pgsql.c | 197++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
src/python.c | 15++++++++++-----
src/worker.c | 2+-
11 files changed, 433 insertions(+), 126 deletions(-)

diff --git a/examples/pgsql/conf/pgsql.conf b/examples/pgsql/conf/pgsql.conf @@ -1,8 +1,10 @@ # Placeholder configuration -bind 127.0.0.1 8888 load ./pgsql.so init +bind 127.0.0.1 8888 +bind 127.0.0.1 8889 connection_new + tls_dhparam dh2048.pem http_keepalive_time 0 @@ -10,5 +12,7 @@ http_keepalive_time 0 domain 127.0.0.1 { certfile cert/server.pem certkey cert/key.pem + static / page + static /hello hello } diff --git a/examples/pgsql/src/init.c b/examples/pgsql/src/init.c @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <kore/kore.h> +#include <kore/pgsql.h> + +#if !defined(KORE_NO_HTTP) +#include <kore/http.h> +#endif + +int init(int); + +#if !defined(KORE_NO_HTTP) +int hello(struct http_request *); +#endif + +/* Called when our module is loaded (see config) */ +int +init(int state) +{ + /* Register our database. */ + kore_pgsql_register("db", "host=/tmp dbname=test"); + + return (KORE_RESULT_OK); +} + +#if !defined(KORE_NO_HTTP) +int +hello(struct http_request *req) +{ + http_response(req, HTTP_STATUS_OK, "hello", 5); + return (KORE_RESULT_OK); +} +#endif diff --git a/examples/pgsql/src/pgsql.c b/examples/pgsql/src/pgsql.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Joris Vink <joris@coders.se> + * Copyright (c) 2014-2017 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -28,11 +28,10 @@ * The state machine framework present in Kore makes it trivial * to get going into dropping from your page handler into the right * state that you are currently in. - * - * The example connects to a local pgsql database (test) using a table - * called "coders" (which has 2 columns): name, surname. */ +#if !defined(KORE_NO_HTTP) + #include <kore/kore.h> #include <kore/http.h> #include <kore/pgsql.h> @@ -44,15 +43,14 @@ #define REQ_STATE_ERROR 4 #define REQ_STATE_DONE 5 -int init(int); -int page(struct http_request *); +int page(struct http_request *); -static int request_perform_init(struct http_request *); -static int request_perform_query(struct http_request *); -static int request_db_wait(struct http_request *); -static int request_db_read(struct http_request *); -static int request_error(struct http_request *); -static int request_done(struct http_request *); +static int request_perform_init(struct http_request *); +static int request_perform_query(struct http_request *); +static int request_db_wait(struct http_request *); +static int request_db_read(struct http_request *); +static int request_error(struct http_request *); +static int request_done(struct http_request *); struct http_state mystates[] = { { "REQ_STATE_INIT", request_perform_init }, @@ -70,22 +68,12 @@ struct rstate { struct kore_pgsql sql; }; -/* Called when our module is loaded (see config) */ -int -init(int state) -{ - /* Register our database. */ - kore_pgsql_register("db", "host=/tmp dbname=test"); - - return (KORE_RESULT_OK); -} - /* Page handler entry point (see config) */ int page(struct http_request *req) { /* Drop into our state machine. */ - kore_log(LOG_NOTICE, "page start"); + kore_log(LOG_NOTICE, "%p: page start", (void *)req); return (http_state_run(mystates, mystates_size, req)); } @@ -96,16 +84,30 @@ request_perform_init(struct http_request *req) struct rstate *state; /* Setup our state context (if not yet set). */ - if (req->hdlr_extra == NULL) { - state = kore_malloc(sizeof(*state)); - req->hdlr_extra = state; + if (!http_state_exists(req)) { + state = http_state_create(req, sizeof(*state)); + + /* + * Initialize the kore_pgsql data structure and bind it + * to this request so we can be put to sleep / woken up + * by the pgsql layer when required. + */ + kore_pgsql_init(&state->sql); + kore_pgsql_bind_request(&state->sql, req); } else { - state = req->hdlr_extra; + state = http_state_get(req); } - /* Initialize our kore_pgsql data structure. */ - if (!kore_pgsql_query_init(&state->sql, req, "db", KORE_PGSQL_ASYNC)) { - /* If the state was still INIT, we'll try again later. */ + /* + * Setup the query to be asynchronous in nature, aka just fire it + * off and return back to us. + */ + if (!kore_pgsql_setup(&state->sql, "db", KORE_PGSQL_ASYNC)) { + /* + * If the state was still in INIT we need to go to sleep and + * wait until the pgsql layer wakes us up again when there + * an available connection to the database. + */ if (state->sql.state == KORE_PGSQL_STATE_INIT) { req->fsm_state = REQ_STATE_INIT; return (HTTP_STATE_RETRY); @@ -114,6 +116,9 @@ request_perform_init(struct http_request *req) kore_pgsql_logerror(&state->sql); req->fsm_state = REQ_STATE_ERROR; } else { + /* + * The initial setup was complete, go for query. + */ req->fsm_state = REQ_STATE_QUERY; } @@ -124,13 +129,14 @@ request_perform_init(struct http_request *req) int request_perform_query(struct http_request *req) { - struct rstate *state = req->hdlr_extra; + struct rstate *state = http_state_get(req); /* We want to move to read result after this. */ req->fsm_state = REQ_STATE_DB_WAIT; /* Fire off the query. */ - if (!kore_pgsql_query(&state->sql, "SELECT * FROM coders")) { + if (!kore_pgsql_query(&state->sql, + "SELECT * FROM coders, pg_sleep(5)")) { /* * Let the state machine continue immediately since we * have an error anyway. @@ -150,7 +156,7 @@ request_perform_query(struct http_request *req) int request_db_wait(struct http_request *req) { - struct rstate *state = req->hdlr_extra; + struct rstate *state = http_state_get(req); kore_log(LOG_NOTICE, "request_db_wait: %d", state->sql.state); @@ -173,7 +179,7 @@ request_db_wait(struct http_request *req) break; default: /* This MUST be present in order to advance the pgsql state */ - kore_pgsql_continue(req, &state->sql); + kore_pgsql_continue(&state->sql); break; } @@ -190,7 +196,7 @@ request_db_read(struct http_request *req) { char *name; int i, rows; - struct rstate *state = req->hdlr_extra; + struct rstate *state = http_state_get(req); /* We have sql data to read! */ rows = kore_pgsql_ntuples(&state->sql); @@ -200,7 +206,7 @@ request_db_read(struct http_request *req) } /* Continue processing our query results. */ - kore_pgsql_continue(req, &state->sql); + kore_pgsql_continue(&state->sql); /* Back to our DB waiting state. */ req->fsm_state = REQ_STATE_DB_WAIT; @@ -211,9 +217,11 @@ request_db_read(struct http_request *req) int request_error(struct http_request *req) { - struct rstate *state = req->hdlr_extra; + struct rstate *state = http_state_get(req); kore_pgsql_cleanup(&state->sql); + http_state_cleanup(req); + http_response(req, 500, NULL, 0); return (HTTP_STATE_COMPLETE); @@ -223,10 +231,14 @@ request_error(struct http_request *req) int request_done(struct http_request *req) { - struct rstate *state = req->hdlr_extra; + struct rstate *state = http_state_get(req); kore_pgsql_cleanup(&state->sql); + http_state_cleanup(req); + http_response(req, 200, NULL, 0); return (HTTP_STATE_COMPLETE); } + +#endif /* !KORE_NO_HTTP */ diff --git a/examples/pgsql/src/pgsql_cb.c b/examples/pgsql/src/pgsql_cb.c @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2017 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* + * This is the same as pgsql.c except the query is fired off when + * a new connection is made to Kore on port 8889. + * + * Instead of binding an http_request to the pgsql data structure we + * use a callback function that is called for every state change. + * + * We pass the connection as an argument to this function. + */ + +#include <kore/kore.h> +#include <kore/http.h> +#include <kore/pgsql.h> + +void connection_del(struct connection *c); +void connection_new(struct connection *); + +void db_state_change(struct kore_pgsql *, void *); +void db_init(struct connection *, struct kore_pgsql *); +void db_results(struct kore_pgsql *, struct connection *); + +void +connection_new(struct connection *c) +{ + struct kore_pgsql *pgsql; + + c->disconnect = connection_del; + c->proto = CONN_PROTO_UNKNOWN; + c->state = CONN_STATE_ESTABLISHED; + + pgsql = kore_calloc(1, sizeof(*pgsql)); + + kore_pgsql_init(pgsql); + kore_pgsql_bind_callback(pgsql, db_state_change, c); + + c->hdlr_extra = pgsql; + printf("new connection %p\n", (void *)c); + + db_init(c, pgsql); +} + +void +db_init(struct connection *c, struct kore_pgsql *pgsql) +{ + if (!kore_pgsql_setup(pgsql, "db", KORE_PGSQL_ASYNC)) { + if (pgsql->state == KORE_PGSQL_STATE_INIT) { + printf("\twaiting for available pgsql connection\n"); + return; + } + + kore_pgsql_logerror(pgsql); + kore_connection_disconnect(c); + return; + } + + printf("\tgot pgsql connection\n"); + if (!kore_pgsql_query(pgsql, "SELECT * FROM coders, pg_sleep(5)")) { + kore_pgsql_logerror(pgsql); + kore_connection_disconnect(c); + return; + } + printf("\tquery fired off!\n"); +} + +void +connection_del(struct connection *c) +{ + printf("%p: disconnecting\n", (void *)c); + + if (c->hdlr_extra != NULL) + kore_pgsql_cleanup(c->hdlr_extra); + + kore_free(c->hdlr_extra); + c->hdlr_extra = NULL; +} + +void +db_state_change(struct kore_pgsql *pgsql, void *arg) +{ + struct connection *c = arg; + + printf("%p: state change on pgsql %d\n", arg, pgsql->state); + + switch (pgsql->state) { + case KORE_PGSQL_STATE_INIT: + db_init(c, pgsql); + break; + case KORE_PGSQL_STATE_WAIT: + break; + case KORE_PGSQL_STATE_COMPLETE: + kore_connection_disconnect(c); + break; + case KORE_PGSQL_STATE_ERROR: + kore_pgsql_logerror(pgsql); + kore_connection_disconnect(c); + break; + case KORE_PGSQL_STATE_RESULT: + db_results(pgsql, c); + break; + default: + kore_pgsql_continue(pgsql); + break; + } +} + +void +db_results(struct kore_pgsql *pgsql, struct connection *c) +{ + char *name; + int i, rows; + + rows = kore_pgsql_ntuples(pgsql); + for (i = 0; i < rows; i++) { + name = kore_pgsql_getvalue(pgsql, i, 0); + net_send_queue(c, name, strlen(name)); + } + + net_send_flush(c); + kore_pgsql_continue(pgsql); +} diff --git a/includes/http.h b/includes/http.h @@ -175,7 +175,6 @@ struct http_file { #define HTTP_REQUEST_COMPLETE 0x0001 #define HTTP_REQUEST_DELETE 0x0002 #define HTTP_REQUEST_SLEEPING 0x0004 -#define HTTP_REQUEST_PGSQL_QUEUE 0x0010 #define HTTP_REQUEST_EXPECT_BODY 0x0020 #define HTTP_REQUEST_RETAIN_EXTRA 0x0040 #define HTTP_REQUEST_NO_CONTENT_LENGTH 0x0080 @@ -204,6 +203,7 @@ struct http_request { size_t http_body_offset; size_t content_length; void *hdlr_extra; + size_t state_len; char *query_string; struct kore_module_handle *hdlr; @@ -273,6 +273,11 @@ void http_response_cookie(struct http_request *, const char *, const char *, const char *, time_t, u_int32_t, struct http_cookie **); +void *http_state_get(struct http_request *); +int http_state_exists(struct http_request *); +void http_state_cleanup(struct http_request *); +void *http_state_create(struct http_request *, size_t); + int http_argument_urldecode(char *); int http_header_recv(struct netbuf *); void http_populate_get(struct http_request *); diff --git a/includes/pgsql.h b/includes/pgsql.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Joris Vink <joris@coders.se> + * Copyright (c) 2014-2017 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -54,18 +54,25 @@ struct kore_pgsql { PGresult *result; struct pgsql_conn *conn; + struct http_request *req; + void *arg; + void (*cb)(struct kore_pgsql *, void *); + LIST_ENTRY(kore_pgsql) rlist; }; extern u_int16_t pgsql_conn_max; -void kore_pgsql_init(void); +void kore_pgsql_sys_init(void); void kore_pgsql_sys_cleanup(void); -int kore_pgsql_query_init(struct kore_pgsql *, struct http_request *, - const char *, int); +void kore_pgsql_init(struct kore_pgsql *); +void kore_pgsql_bind_request(struct kore_pgsql *, struct http_request *); +void kore_pgsql_bind_callback(struct kore_pgsql *, + void (*cb)(struct kore_pgsql *, void *), void *); +int kore_pgsql_setup(struct kore_pgsql *, const char *, int); void kore_pgsql_handle(void *, int); void kore_pgsql_cleanup(struct kore_pgsql *); -void kore_pgsql_continue(struct http_request *, struct kore_pgsql *); +void kore_pgsql_continue(struct kore_pgsql *); int kore_pgsql_query(struct kore_pgsql *, const char *); int kore_pgsql_query_params(struct kore_pgsql *, const char *, int, int, ...); @@ -75,7 +82,6 @@ int kore_pgsql_register(const char *, const char *); int kore_pgsql_ntuples(struct kore_pgsql *); int kore_pgsql_nfields(struct kore_pgsql *); void kore_pgsql_logerror(struct kore_pgsql *); -void kore_pgsql_queue_remove(struct http_request *); char *kore_pgsql_fieldname(struct kore_pgsql *, int); char *kore_pgsql_getvalue(struct kore_pgsql *, int, int); int kore_pgsql_getlength(struct kore_pgsql *, int, int); diff --git a/includes/python_methods.h b/includes/python_methods.h @@ -189,9 +189,10 @@ static PyTypeObject pyhttp_file_type = { #if defined(KORE_USE_PGSQL) -#define PYKORE_PGSQL_INITIALIZE 1 -#define PYKORE_PGSQL_QUERY 2 -#define PYKORE_PGSQL_WAIT 3 +#define PYKORE_PGSQL_PREINIT 1 +#define PYKORE_PGSQL_INITIALIZE 2 +#define PYKORE_PGSQL_QUERY 3 +#define PYKORE_PGSQL_WAIT 4 struct pykore_pgsql { PyObject_HEAD diff --git a/src/http.c b/src/http.c @@ -456,9 +456,6 @@ http_request_free(struct http_request *req) pgsql = LIST_FIRST(&(req->pgsqls)); kore_pgsql_cleanup(pgsql); } - - if (req->flags & HTTP_REQUEST_PGSQL_QUEUE) - kore_pgsql_queue_remove(req); #endif kore_debug("http_request_free: %p->%p", req->owner, req); @@ -1288,6 +1285,39 @@ http_state_run(struct http_state *states, u_int8_t elm, return (KORE_RESULT_OK); } +int +http_state_exists(struct http_request *req) +{ + return (req->hdlr_extra != NULL); +} + +void * +http_state_create(struct http_request *req, size_t len) +{ + if (req->hdlr_extra != NULL) { + if (req->state_len != len) + fatal("http_state_create: state already set"); + } else { + req->state_len = len; + req->hdlr_extra = kore_calloc(1, len); + } + + return (req->hdlr_extra); +} + +void * +http_state_get(struct http_request *req) +{ + return (req->hdlr_extra); +} + +void +http_state_cleanup(struct http_request *req) +{ + kore_free(req->hdlr_extra); + req->hdlr_extra = NULL; +} + static int multipart_find_data(struct kore_buf *in, struct kore_buf *out, size_t *olen, struct http_request *req, const void *needle, size_t len) diff --git a/src/pgsql.c b/src/pgsql.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2016 Joris Vink <joris@coders.se> + * Copyright (c) 2014-2017 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -21,19 +21,21 @@ #include <pg_config.h> #include "kore.h" + +#if !defined(KORE_NO_HTTP) #include "http.h" +#endif + #include "pgsql.h" -struct pgsql_job { - struct http_request *req; +struct pgsql_wait { struct kore_pgsql *pgsql; - - TAILQ_ENTRY(pgsql_job) list; + TAILQ_ENTRY(pgsql_wait) list; }; -struct pgsql_wait { - struct http_request *req; - TAILQ_ENTRY(pgsql_wait) list; +struct pgsql_job { + struct kore_pgsql *pgsql; + TAILQ_ENTRY(pgsql_job) list; }; #define PGSQL_CONN_MAX 2 @@ -43,7 +45,8 @@ struct pgsql_wait { static void pgsql_queue_wakeup(void); static void pgsql_cancel(struct kore_pgsql *); static void pgsql_set_error(struct kore_pgsql *, const char *); -static void pgsql_queue_add(struct http_request *); +static void pgsql_queue_add(struct kore_pgsql *); +static void pgsql_queue_remove(struct kore_pgsql *); static void pgsql_conn_release(struct kore_pgsql *); static void pgsql_conn_cleanup(struct pgsql_conn *); static void pgsql_read_result(struct kore_pgsql *); @@ -52,8 +55,7 @@ static void pgsql_schedule(struct kore_pgsql *); static struct pgsql_conn *pgsql_conn_create(struct kore_pgsql *, struct pgsql_db *); static struct pgsql_conn *pgsql_conn_next(struct kore_pgsql *, - struct pgsql_db *, - struct http_request *); + struct pgsql_db *); static struct kore_pool pgsql_job_pool; static struct kore_pool pgsql_wait_pool; @@ -64,7 +66,7 @@ static u_int16_t pgsql_conn_count; u_int16_t pgsql_conn_max = PGSQL_CONN_MAX; void -kore_pgsql_init(void) +kore_pgsql_sys_init(void) { pgsql_conn_count = 0; TAILQ_INIT(&pgsql_conn_free); @@ -91,23 +93,31 @@ kore_pgsql_sys_cleanup(void) } } -int -kore_pgsql_query_init(struct kore_pgsql *pgsql, struct http_request *req, - const char *dbname, int flags) +void +kore_pgsql_init(struct kore_pgsql *pgsql) { - struct pgsql_db *db; - memset(pgsql, 0, sizeof(*pgsql)); - pgsql->flags = flags; pgsql->state = KORE_PGSQL_STATE_INIT; +} + +int +kore_pgsql_setup(struct kore_pgsql *pgsql, const char *dbname, int flags) +{ + struct pgsql_db *db; - if ((req == NULL && (flags & KORE_PGSQL_ASYNC)) || - ((flags & KORE_PGSQL_ASYNC) && (flags & KORE_PGSQL_SYNC))) { + if ((flags & KORE_PGSQL_ASYNC) && (flags & KORE_PGSQL_SYNC)) { pgsql_set_error(pgsql, "invalid query init parameters"); return (KORE_RESULT_ERROR); } + if (pgsql->req == NULL && pgsql->cb == NULL) { + pgsql_set_error(pgsql, "nothing was bound"); + return (KORE_RESULT_ERROR); + } + db = NULL; + pgsql->flags |= flags; + LIST_FOREACH(db, &pgsql_db_conn_strings, rlist) { if (!strcmp(db->name, dbname)) break; @@ -118,22 +128,45 @@ kore_pgsql_query_init(struct kore_pgsql *pgsql, struct http_request *req, return (KORE_RESULT_ERROR); } - if ((pgsql->conn = pgsql_conn_next(pgsql, db, req)) == NULL) + if ((pgsql->conn = pgsql_conn_next(pgsql, db)) == NULL) return (KORE_RESULT_ERROR); if (pgsql->flags & KORE_PGSQL_ASYNC) { pgsql->conn->job = kore_pool_get(&pgsql_job_pool); - pgsql->conn->job->req = req; pgsql->conn->job->pgsql = pgsql; - - http_request_sleep(req); - pgsql->flags |= PGSQL_LIST_INSERTED; - LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist); } return (KORE_RESULT_OK); } +#if !defined(KORE_NO_HTTP) +void +kore_pgsql_bind_request(struct kore_pgsql *pgsql, struct http_request *req) +{ + if (pgsql->req != NULL || pgsql->cb != NULL) + fatal("kore_pgsql_bind_request: already bound"); + + pgsql->req = req; + pgsql->flags |= PGSQL_LIST_INSERTED; + + LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist); +} +#endif + +void +kore_pgsql_bind_callback(struct kore_pgsql *pgsql, + void (*cb)(struct kore_pgsql *, void *), void *arg) +{ + if (pgsql->req != NULL) + fatal("kore_pgsql_bind_callback: already bound"); + + if (pgsql->cb != NULL) + fatal("kore_pgsql_bind_callback: already bound"); + + pgsql->cb = cb; + pgsql->arg = arg; +} + int kore_pgsql_query(struct kore_pgsql *pgsql, const char *query) { @@ -264,7 +297,6 @@ kore_pgsql_register(const char *dbname, const char *connstring) void kore_pgsql_handle(void *c, int err) { - struct http_request *req; struct kore_pgsql *pgsql; struct pgsql_conn *conn = (struct pgsql_conn *)c; @@ -273,9 +305,7 @@ kore_pgsql_handle(void *c, int err) return; } - req = conn->job->req; pgsql = conn->job->pgsql; - kore_debug("kore_pgsql_handle: %p (%d)", req, pgsql->state); if (!PQconsumeInput(conn->db)) { pgsql->state = KORE_PGSQL_STATE_ERROR; @@ -285,18 +315,25 @@ kore_pgsql_handle(void *c, int err) } if (pgsql->state == KORE_PGSQL_STATE_WAIT) { - http_request_sleep(req); +#if !defined(KORE_NO_HTTP) + if (pgsql->req != NULL) + http_request_sleep(pgsql->req); +#endif + if (pgsql->cb != NULL) + pgsql->cb(pgsql, pgsql->arg); } else { - http_request_wakeup(req); +#if !defined(KORE_NO_HTTP) + if (pgsql->req != NULL) + http_request_wakeup(pgsql->req); +#endif + if (pgsql->cb != NULL) + pgsql->cb(pgsql, pgsql->arg); } } void -kore_pgsql_continue(struct http_request *req, struct kore_pgsql *pgsql) +kore_pgsql_continue(struct kore_pgsql *pgsql) { - kore_debug("kore_pgsql_continue: %p->%p (%d)", - req->owner, req, pgsql->state); - if (pgsql->error) { kore_free(pgsql->error); pgsql->error = NULL; @@ -312,7 +349,10 @@ kore_pgsql_continue(struct http_request *req, struct kore_pgsql *pgsql) case KORE_PGSQL_STATE_WAIT: break; case KORE_PGSQL_STATE_DONE: - http_request_wakeup(req); +#if !defined(KORE_NO_HTTP) + if (pgsql->req != NULL) + http_request_wakeup(pgsql->req); +#endif pgsql_conn_release(pgsql); break; case KORE_PGSQL_STATE_ERROR: @@ -329,6 +369,8 @@ kore_pgsql_cleanup(struct kore_pgsql *pgsql) { kore_debug("kore_pgsql_cleanup(%p)", pgsql); + pgsql_queue_remove(pgsql); + if (pgsql->result != NULL) PQclear(pgsql->result); @@ -385,25 +427,8 @@ kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) return (PQgetvalue(pgsql->result, row, col)); } -void -kore_pgsql_queue_remove(struct http_request *req) -{ - struct pgsql_wait *pgw, *next; - - for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) { - next = TAILQ_NEXT(pgw, list); - if (pgw->req != req) - continue; - - TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); - kore_pool_put(&pgsql_wait_pool, pgw); - return; - } -} - static struct pgsql_conn * -pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db, - struct http_request *req) +pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db) { struct pgsql_conn *conn; @@ -419,7 +444,7 @@ pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db, if (conn == NULL) { if (pgsql_conn_count >= pgsql_conn_max) { if (pgsql->flags & KORE_PGSQL_ASYNC) { - pgsql_queue_add(req); + pgsql_queue_add(pgsql); } else { pgsql_set_error(pgsql, "no available connection"); @@ -460,34 +485,67 @@ pgsql_schedule(struct kore_pgsql *pgsql) kore_platform_schedule_read(fd, pgsql->conn); pgsql->state = KORE_PGSQL_STATE_WAIT; pgsql->flags |= KORE_PGSQL_SCHEDULED; + +#if !defined(KORE_NO_HTTP) + if (pgsql->req != NULL) + http_request_sleep(pgsql->req); +#endif + if (pgsql->cb != NULL) + pgsql->cb(pgsql, pgsql->arg); } static void -pgsql_queue_add(struct http_request *req) +pgsql_queue_add(struct kore_pgsql *pgsql) { struct pgsql_wait *pgw; - http_request_sleep(req); +#if !defined(KORE_NO_HTTP) + if (pgsql->req != NULL) + http_request_sleep(pgsql->req); +#endif pgw = kore_pool_get(&pgsql_wait_pool); - pgw->req = req; - pgw->req->flags |= HTTP_REQUEST_PGSQL_QUEUE; - + pgw->pgsql = pgsql; TAILQ_INSERT_TAIL(&pgsql_wait_queue, pgw, list); } static void -pgsql_queue_wakeup(void) +pgsql_queue_remove(struct kore_pgsql *pgsql) { struct pgsql_wait *pgw, *next; for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) { next = TAILQ_NEXT(pgw, list); - if (pgw->req->flags & HTTP_REQUEST_DELETE) + if (pgw->pgsql != pgsql) continue; - http_request_wakeup(pgw->req); - pgw->req->flags &= ~HTTP_REQUEST_PGSQL_QUEUE; + TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); + kore_pool_put(&pgsql_wait_pool, pgw); + return; + } +} + +static void +pgsql_queue_wakeup(void) +{ + struct pgsql_wait *pgw, *next; + + for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) { + next = TAILQ_NEXT(pgw, list); + +#if !defined(KORE_NO_HTTP) + if (pgw->pgsql->req != NULL) { + if (pgw->pgsql->req->flags & HTTP_REQUEST_DELETE) { + TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); + kore_pool_put(&pgsql_wait_pool, pgw); + continue; + } + + http_request_wakeup(pgw->pgsql->req); + } +#endif + if (pgw->pgsql->cb != NULL) + pgw->pgsql->cb(pgw->pgsql, pgw->pgsql->arg); TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); kore_pool_put(&pgsql_wait_pool, pgw); @@ -555,13 +613,15 @@ pgsql_conn_release(struct kore_pgsql *pgsql) pgsql->conn = NULL; pgsql->state = KORE_PGSQL_STATE_COMPLETE; + if (pgsql->cb != NULL) + pgsql->cb(pgsql, pgsql->arg); + pgsql_queue_wakeup(); } static void pgsql_conn_cleanup(struct pgsql_conn *conn) { - struct http_request *req; struct kore_pgsql *pgsql; kore_debug("pgsql_conn_cleanup(): %p", conn); @@ -570,10 +630,11 @@ pgsql_conn_cleanup(struct pgsql_conn *conn) TAILQ_REMOVE(&pgsql_conn_free, conn, list); if (conn->job) { - req = conn->job->req; pgsql = conn->job->pgsql; - http_request_wakeup(req); - +#if !defined(KORE_NO_HTTP) + if (pgsql->req != NULL) + http_request_wakeup(pgsql->req); +#endif pgsql->conn = NULL; pgsql_set_error(pgsql, PQerrorMessage(conn->db)); diff --git a/src/python.c b/src/python.c @@ -1250,7 +1250,7 @@ pykore_pgsql_alloc(struct http_request *req, const char *db, const char *query) pysql->result = NULL; pysql->db = kore_strdup(db); pysql->query = kore_strdup(query); - pysql->state = PYKORE_PGSQL_INITIALIZE; + pysql->state = PYKORE_PGSQL_PREINIT; memset(&pysql->sql, 0, sizeof(pysql->sql)); @@ -1261,9 +1261,14 @@ static PyObject * pykore_pgsql_iternext(struct pykore_pgsql *pysql) { switch (pysql->state) { + case PYKORE_PGSQL_PREINIT: + kore_pgsql_init(&pysql->sql); + kore_pgsql_bind_request(&pysql->sql, pysql->req); + pysql->state = PYKORE_PGSQL_INITIALIZE; + /* fallthrough */ case PYKORE_PGSQL_INITIALIZE: - if (!kore_pgsql_query_init(&pysql->sql, pysql->req, - pysql->db, KORE_PGSQL_ASYNC)) { + if (!kore_pgsql_setup(&pysql->sql, pysql->db, + KORE_PGSQL_ASYNC)) { if (pysql->sql.state == KORE_PGSQL_STATE_INIT) break; kore_pgsql_logerror(&pysql->sql); @@ -1304,7 +1309,7 @@ wait_again: return (NULL); goto wait_again; default: - kore_pgsql_continue(pysql->req, &pysql->sql); + kore_pgsql_continue(&pysql->sql); goto wait_again; } break; @@ -1385,7 +1390,7 @@ pykore_pgsql_result(struct pykore_pgsql *pysql) } pysql->result = list; - kore_pgsql_continue(pysql->req, &pysql->sql); + kore_pgsql_continue(&pysql->sql); return (KORE_RESULT_OK); } diff --git a/src/worker.c b/src/worker.c @@ -338,7 +338,7 @@ kore_worker_entry(struct kore_worker *kw) kore_msg_worker_init(); #if defined(KORE_USE_PGSQL) - kore_pgsql_init(); + kore_pgsql_sys_init(); #endif #if defined(KORE_USE_TASKS)