kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit f93747828c499fa53c5db126990d1b936f6d84cf
parent 856d7b0cb2c1aa326346505e7db6615630e7230d
Author: Joris Vink <joris@coders.se>
Date:   Thu, 14 Aug 2014 14:34:23 +0200

Major pgsql rework.

Gone is the ugly KORE_PGSQL macro that hid an overly complex
state machine for the pgsql api.
Gone is the pgsql array that was attached to http_requests.
Gone are the callback hacks inside the pgsql api.

Instead, I strongly encourage people to use the new state machine
api Kore offers to properly deal with asynchronous queries.

The pgsql example in examples/pgsql has been updated to reflect
these changes.

Diffstat:
examples/pgsql/README.md | 9+++++++--
examples/pgsql/conf/pgsql.conf | 12+++++-------
examples/pgsql/src/pgsql.c | 206++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
includes/http.h | 3---
includes/pgsql.h | 48+++++++++++++++---------------------------------
src/http.c | 8--------
src/pgsql.c | 177++++++++++++++++++++++++++++++++++---------------------------------------------
7 files changed, 271 insertions(+), 192 deletions(-)

diff --git a/examples/pgsql/README.md b/examples/pgsql/README.md @@ -1,7 +1,12 @@ Kore pgsql example. -* Performs 2 async pgsql queries. -* You must set the connection string before running +This example demonstrates how one can use Kore state machines and the +pgsql api to make fully asynchronous SQL queries. + +Asynchronous in this case meaning, without interrupting a Kore worker its +other clients their I/O or http requests. + +Tons of comments inside on how everything works. Run: ``` diff --git a/examples/pgsql/conf/pgsql.conf b/examples/pgsql/conf/pgsql.conf @@ -1,14 +1,12 @@ -# Kore pgsql_test configuration +# Placeholder configuration bind 127.0.0.1 8888 -load ./pgsql.so pgsql_load +load ./pgsql.so init -pgsql_conn_max 5 +http_keepalive_time 0 -domain localhost { +domain 127.0.0.1 { certfile cert/server.crt certkey cert/server.key - - accesslog kore_access.log - static / serve_pgsql_test + static / page } diff --git a/examples/pgsql/src/pgsql.c b/examples/pgsql/src/pgsql.c @@ -14,60 +14,192 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +/* + * This example demonstrates on how to use state machines and + * asynchronous pgsql queries. + * + * While this example might seem overly complex for a simple pgsql + * query, there is a reason behind its complexity: + * Asynchronous pgsql queries mean that Kore will not block while + * executing the queries, giving a worker time to continue handling + * other events such as I/O or other http requests. + * + * The state machine framework present in Kore makes it trivial + * to get going into dropping from your page handler into the right + * state that you are currently in. + * + * The example connects to a local pgsql database (test) using a table + * called "coders" (which has 2 columns): name, surname. + */ + #include <kore/kore.h> #include <kore/http.h> #include <kore/pgsql.h> -void pgsql_load(int); -int serve_pgsql_test(struct http_request *); +#define REQ_STATE_QUERY 0 +#define REQ_STATE_DB_WAIT 1 +#define REQ_STATE_DB_READ 2 +#define REQ_STATE_ERROR 3 +#define REQ_STATE_DONE 4 + +void init(int); +int page(struct http_request *); +static int request_perform_query(struct http_request *); +static int request_db_wait(struct http_request *); +static int request_db_read(struct http_request *); +static int request_error(struct http_request *); +static int request_done(struct http_request *); + +struct http_state mystates[] = { + { "REQ_STATE_QUERY", request_perform_query }, + { "REQ_STATE_DB_WAIT", request_db_wait }, + { "REQ_STATE_DB_READ", request_db_read }, + { "REQ_STATE_ERROR", request_error }, + { "REQ_STATE_DONE", request_done }, +}; + +struct rstate { + struct kore_pgsql sql; +}; + +/* Called when our module is loaded (see config) */ void -pgsql_load(int state) +init(int state) +{ + /* Set our connection string. */ + pgsql_conn_string = "host=/var/run/postgresql/ dbname=test"; +} + +/* Page handler entry point (see config) */ +int +page(struct http_request *req) +{ + /* Drop into our state machine. */ + kore_log(LOG_NOTICE, "page start"); + return (http_state_run(mystates, sizeof(mystates), req)); +} + +/* The initial state, we setup our context and fire off the pgsql query. */ +int +request_perform_query(struct http_request *req) +{ + struct rstate *state; + + /* Setup our state context. */ + state = kore_malloc(sizeof(*state)); + + /* Attach the state to our request. */ + req->hdlr_extra = state; + + /* We want to move to read result after this. */ + req->fsm_state = REQ_STATE_DB_WAIT; + + /* Fire off the query. */ + if (!kore_pgsql_async(&state->sql, req, "SELECT * FROM coders")) { + /* + * If the sql was NULL we need to retry as there was no + * available connection. + */ + if (state->sql.conn == NULL) { + req->fsm_state = REQ_STATE_QUERY; + return (HTTP_STATE_RETRY); + } + + /* + * Let the state machine continue immediately since we + * have an error anyway. + */ + return (HTTP_STATE_CONTINUE); + } + + /* Resume state machine later when the query results start coming in. */ + return (HTTP_STATE_RETRY); +} + +/* + * After firing off the query, we returned HTTP_STATE_RETRY (see above). + * When request_db_wait() finally is called by Kore we will have results + * from pgsql so we'll process them. + */ +int +request_db_wait(struct http_request *req) { - switch (state) { - case KORE_MODULE_LOAD: - pgsql_conn_string = "Your connection string"; + struct rstate *state = req->hdlr_extra; + + kore_log(LOG_NOTICE, "request_db_wait: %d", state->sql.state); + + /* + * When we get here, our asynchronous pgsql query has + * given us something, check the state to figure out what. + */ + switch (state->sql.state) { + case KORE_PGSQL_STATE_COMPLETE: + req->fsm_state = REQ_STATE_DONE; + break; + case KORE_PGSQL_STATE_ERROR: + req->fsm_state = REQ_STATE_ERROR; + kore_pgsql_logerror(&state->sql); + break; + case KORE_PGSQL_STATE_RESULT: + req->fsm_state = REQ_STATE_DB_READ; break; default: + /* This MUST be present in order to advance the pgsql state */ + kore_pgsql_continue(req, &state->sql); break; } + + return (HTTP_STATE_CONTINUE); } +/* + * Called when there's an actual result to be gotten. After we handle the + * entire result, we'll drop back into REQ_STATE_DB_WAIT (above) in order + * to continue until the pgsql API returns KORE_PGSQL_STATE_COMPLETE. + */ int -serve_pgsql_test(struct http_request *req) +request_db_read(struct http_request *req) { - int r, i; - char *col1, *col2; - - KORE_PGSQL(req, "SELECT * FROM test", 0, { - if (req->pgsql[0]->state == KORE_PGSQL_STATE_ERROR) { - kore_pgsql_logerror(req->pgsql[0]); - http_response(req, 500, "fail\n", 5); - return (KORE_RESULT_OK); - } + char *name; + int i, rows; + struct rstate *state = req->hdlr_extra; - r = kore_pgsql_ntuples(req->pgsql[0]); - for (i = 0; i < r; i++) { - col1 = kore_pgsql_getvalue(req->pgsql[0], i, 0); - col2 = kore_pgsql_getvalue(req->pgsql[0], i, 1); + /* We have sql data to read! */ + rows = kore_pgsql_ntuples(&state->sql); + for (i = 0; i < rows; i++) { + name = kore_pgsql_getvalue(&state->sql, i, 0); + kore_log(LOG_NOTICE, "name: '%s'", name); + } - kore_log(LOG_NOTICE, "%s and %s", col1, col2); - } - }); - - KORE_PGSQL(req, "SELECT * FROM foobar", 1, { - if (req->pgsql[1]->state != KORE_PGSQL_STATE_ERROR) { - kore_log(LOG_NOTICE, "expected error, got %d", - req->pgsql[1]->state); - http_response(req, 500, "fail2\n", 6); - return (KORE_RESULT_OK); - } else { - kore_pgsql_logerror(req->pgsql[1]); - } - }); + /* Continue processing our query results. */ + kore_pgsql_continue(req, &state->sql); + + /* Back to our DB waiting state. */ + req->fsm_state = REQ_STATE_DB_WAIT; + return (HTTP_STATE_CONTINUE); +} + +/* An error occured. */ +int +request_error(struct http_request *req) +{ + struct rstate *state = req->hdlr_extra; + + kore_pgsql_cleanup(&state->sql); + http_response(req, 500, NULL, 0); + + return (HTTP_STATE_COMPLETE); +} + +/* Request was completed succesfully. */ +int +request_done(struct http_request *req) +{ + struct rstate *state = req->hdlr_extra; - /* Query successfully completed */ - http_response(req, 200, "ok\n", 3); + kore_pgsql_cleanup(&state->sql); + http_response(req, 200, NULL, 0); - return (KORE_RESULT_OK); + return (HTTP_STATE_COMPLETE); } diff --git a/includes/http.h b/includes/http.h @@ -157,8 +157,6 @@ struct http_file { #define HTTP_REQUEST_DELETE 0x02 #define HTTP_REQUEST_SLEEPING 0x04 -#define HTTP_PGSQL_MAX 20 -struct kore_pgsql; struct kore_task; struct http_request { @@ -181,7 +179,6 @@ struct http_request { struct kore_module_handle *hdlr; struct kore_task *task; - struct kore_pgsql *pgsql[HTTP_PGSQL_MAX]; TAILQ_HEAD(, http_header) req_headers; TAILQ_HEAD(, http_header) resp_headers; diff --git a/includes/pgsql.h b/includes/pgsql.h @@ -19,11 +19,20 @@ #include <libpq-fe.h> +struct pgsql_conn { + u_int8_t type; + u_int8_t flags; + + PGconn *db; + struct pgsql_job *job; + TAILQ_ENTRY(pgsql_conn) list; +}; + struct kore_pgsql { u_int8_t state; char *error; PGresult *result; - void *conn; + struct pgsql_conn *conn; }; extern u_int16_t pgsql_conn_max; @@ -31,9 +40,11 @@ extern char *pgsql_conn_string; void kore_pgsql_init(void); void kore_pgsql_handle(void *, int); -void kore_pgsql_cleanup(struct http_request *); -void kore_pgsql_continue(struct http_request *, int); -int kore_pgsql_query(struct http_request *, char *, int); +void kore_pgsql_cleanup(struct kore_pgsql *); +void kore_pgsql_continue(struct http_request *, + struct kore_pgsql *); +int kore_pgsql_async(struct kore_pgsql *, + struct http_request *, const char *); int kore_pgsql_ntuples(struct kore_pgsql *); void kore_pgsql_logerror(struct kore_pgsql *); @@ -46,33 +57,4 @@ char *kore_pgsql_getvalue(struct kore_pgsql *, int, int); #define KORE_PGSQL_STATE_DONE 5 #define KORE_PGSQL_STATE_COMPLETE 6 -#define KORE_PGSQL(r, q, i, s) \ - do { \ - if (r->pgsql[i] == NULL) \ - if (!kore_pgsql_query(r, q, i)) { \ - if (r->pgsql[i] == NULL) \ - return (KORE_RESULT_RETRY); \ - s; \ - r->pgsql[i]->state = \ - KORE_PGSQL_STATE_COMPLETE; \ - } \ - if (r->pgsql[i] == NULL) \ - return (KORE_RESULT_RETRY); \ - switch (r->pgsql[i]->state) { \ - case KORE_PGSQL_STATE_ERROR: \ - case KORE_PGSQL_STATE_RESULT: \ - s; \ - case KORE_PGSQL_STATE_COMPLETE: \ - break; \ - default: \ - kore_pgsql_continue(r, i); \ - return (KORE_RESULT_RETRY); \ - } \ - if (r->pgsql[i]->state == KORE_PGSQL_STATE_ERROR || \ - r->pgsql[i]->state == KORE_PGSQL_STATE_RESULT) { \ - kore_pgsql_continue(r, i); \ - return (KORE_RESULT_RETRY); \ - } \ - } while (0); - #endif diff --git a/src/http.c b/src/http.c @@ -23,10 +23,6 @@ #include "kore.h" #include "http.h" -#if defined(KORE_USE_PGSQL) -#include "pgsql.h" -#endif - #if defined(KORE_USE_TASKS) #include "tasks.h" #endif @@ -358,10 +354,6 @@ http_request_free(struct http_request *req) kore_mem_free(f); } -#if defined(KORE_USE_PGSQL) - kore_pgsql_cleanup(req); -#endif - if (req->method == HTTP_METHOD_POST && req->post_data != NULL) kore_buf_free(req->post_data); if (req->method == HTTP_METHOD_POST && req->multipart_body != NULL) diff --git a/src/pgsql.c b/src/pgsql.c @@ -25,30 +25,24 @@ #include "pgsql.h" struct pgsql_job { - u_int8_t idx; - struct http_request *req; u_int64_t start; char *query; + struct http_request *req; + struct kore_pgsql *pgsql; + TAILQ_ENTRY(pgsql_job) list; }; +#define PGSQL_IS_BLOCKING 0 +#define PGSQL_IS_ASYNC 1 + #define PGSQL_CONN_MAX 2 #define PGSQL_CONN_FREE 0x01 -struct pgsql_conn { - u_int8_t type; - u_int8_t flags; - - PGconn *db; - struct pgsql_job *job; - TAILQ_ENTRY(pgsql_conn) list; -}; - static void pgsql_conn_cleanup(struct pgsql_conn *); -static int pgsql_conn_create(struct http_request *, int); -static void pgsql_read_result(struct http_request *, int, - struct pgsql_conn *); +static int pgsql_conn_create(struct kore_pgsql *); +static void pgsql_read_result(struct kore_pgsql *, int); static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free; static u_int16_t pgsql_conn_count; @@ -63,29 +57,20 @@ kore_pgsql_init(void) } int -kore_pgsql_query(struct http_request *req, char *query, int idx) +kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req, + const char *query) { int fd; struct pgsql_conn *conn; - if (idx >= HTTP_PGSQL_MAX) - fatal("kore_pgsql_query: %d > %d", idx, HTTP_PGSQL_MAX); - if (req->pgsql[idx] != NULL) - fatal("kore_pgsql_query: %d already exists", idx); - - if (TAILQ_EMPTY(&pgsql_conn_free)) { - if (pgsql_conn_count >= pgsql_conn_max) - return (KORE_RESULT_ERROR); - } - - req->pgsql[idx] = kore_malloc(sizeof(struct kore_pgsql)); - req->pgsql[idx]->state = KORE_PGSQL_STATE_INIT; - req->pgsql[idx]->result = NULL; - req->pgsql[idx]->error = NULL; - req->pgsql[idx]->conn = NULL; + pgsql->state = KORE_PGSQL_STATE_INIT; + pgsql->result = NULL; + pgsql->error = NULL; + pgsql->conn = NULL; if (TAILQ_EMPTY(&pgsql_conn_free)) { - if (pgsql_conn_create(req, idx) == KORE_RESULT_ERROR) + if ((pgsql_conn_count >= pgsql_conn_max) || + !pgsql_conn_create(pgsql)) return (KORE_RESULT_ERROR); } @@ -97,12 +82,12 @@ kore_pgsql_query(struct http_request *req, char *query, int idx) conn->flags &= ~PGSQL_CONN_FREE; TAILQ_REMOVE(&pgsql_conn_free, conn, list); - req->pgsql[idx]->conn = conn; + pgsql->conn = conn; conn->job = kore_malloc(sizeof(struct pgsql_job)); conn->job->query = kore_strdup(query); conn->job->start = kore_time_ms(); + conn->job->pgsql = pgsql; conn->job->req = req; - conn->job->idx = idx; if (!PQsendQuery(conn->db, query)) { pgsql_conn_cleanup(conn); @@ -114,7 +99,7 @@ kore_pgsql_query(struct http_request *req, char *query, int idx) fatal("PQsocket returned < 0 fd on open connection"); kore_platform_schedule_read(fd, conn); - req->pgsql[idx]->state = KORE_PGSQL_STATE_WAIT; + pgsql->state = KORE_PGSQL_STATE_WAIT; kore_debug("query '%s' for %p sent on %p", query, req, conn); return (KORE_RESULT_OK); @@ -123,8 +108,8 @@ kore_pgsql_query(struct http_request *req, char *query, int idx) void kore_pgsql_handle(void *c, int err) { - int i; struct http_request *req; + struct kore_pgsql *pgsql; struct pgsql_conn *conn = (struct pgsql_conn *)c; if (err) { @@ -132,52 +117,50 @@ kore_pgsql_handle(void *c, int err) return; } - i = conn->job->idx; req = conn->job->req; - kore_debug("kore_pgsql_handle: %p (%d) (%d)", - req, i, req->pgsql[i]->state); + pgsql = conn->job->pgsql; + kore_debug("kore_pgsql_handle: %p (%d)", req, pgsql->state); if (!PQconsumeInput(conn->db)) { - req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); + pgsql->state = KORE_PGSQL_STATE_ERROR; + pgsql->error = kore_strdup(PQerrorMessage(conn->db)); } else { - pgsql_read_result(req, i, conn); + pgsql_read_result(pgsql, PGSQL_IS_ASYNC); } - if (req->pgsql[i]->state == KORE_PGSQL_STATE_WAIT) { + if (pgsql->state == KORE_PGSQL_STATE_WAIT) { http_request_sleep(req); } else { http_request_wakeup(req); - http_process_request(req, 1); } } void -kore_pgsql_continue(struct http_request *req, int i) +kore_pgsql_continue(struct http_request *req, struct kore_pgsql *pgsql) { int fd; struct pgsql_conn *conn; - kore_debug("kore_pgsql_continue: %p->%p (%d) (%d)", - req->owner, req, i, req->pgsql[i]->state); + kore_debug("kore_pgsql_continue: %p->%p (%d)", + req->owner, req, pgsql->state); - if (req->pgsql[i]->error) { - kore_mem_free(req->pgsql[i]->error); - req->pgsql[i]->error = NULL; + if (pgsql->error) { + kore_mem_free(pgsql->error); + pgsql->error = NULL; } - if (req->pgsql[i]->result) - PQclear(req->pgsql[i]->result); + if (pgsql->result) + PQclear(pgsql->result); - conn = req->pgsql[i]->conn; - switch (req->pgsql[i]->state) { + conn = pgsql->conn; + switch (pgsql->state) { case KORE_PGSQL_STATE_INIT: case KORE_PGSQL_STATE_WAIT: break; case KORE_PGSQL_STATE_DONE: http_request_wakeup(req); - req->pgsql[i]->conn = NULL; - req->pgsql[i]->state = KORE_PGSQL_STATE_COMPLETE; + pgsql->conn = NULL; + pgsql->state = KORE_PGSQL_STATE_COMPLETE; kore_mem_free(conn->job->query); kore_mem_free(conn->job); @@ -188,46 +171,35 @@ kore_pgsql_continue(struct http_request *req, int i) fd = PQsocket(conn->db); kore_platform_disable_read(fd); - - http_process_request(req, 0); break; case KORE_PGSQL_STATE_ERROR: case KORE_PGSQL_STATE_RESULT: kore_pgsql_handle(conn, 0); break; default: - fatal("unknown pgsql state"); + fatal("unknown pgsql state %d", pgsql->state); } } void -kore_pgsql_cleanup(struct http_request *req) +kore_pgsql_cleanup(struct kore_pgsql *pgsql) { - int i; - struct pgsql_conn *conn; - - for (i = 0; i < HTTP_PGSQL_MAX; i++) { - if (req->pgsql[i] == NULL) - continue; - - if (req->pgsql[i]->result != NULL) { - kore_log(LOG_NOTICE, "cleaning up leaked pgsql result"); - PQclear(req->pgsql[i]->result); - } - - if (req->pgsql[i]->error != NULL) - kore_mem_free(req->pgsql[i]->error); + if (pgsql->result != NULL) { + kore_log(LOG_NOTICE, "cleaning up leaked pgsql result"); + PQclear(pgsql->result); + } - if (req->pgsql[i]->conn != NULL) { - conn = req->pgsql[i]->conn; - while (PQgetResult(conn->db) != NULL) - ; - } + if (pgsql->error != NULL) + kore_mem_free(pgsql->error); - req->pgsql[i]->conn = NULL; - kore_mem_free(req->pgsql[i]); - req->pgsql[i] = NULL; + if (pgsql->conn != NULL) { + while (PQgetResult(pgsql->conn->db) != NULL) + ; } + + pgsql->result = NULL; + pgsql->error = NULL; + pgsql->conn = NULL; } void @@ -250,7 +222,7 @@ kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) } static int -pgsql_conn_create(struct http_request *req, int idx) +pgsql_conn_create(struct kore_pgsql *pgsql) { struct pgsql_conn *conn; @@ -264,8 +236,8 @@ pgsql_conn_create(struct http_request *req, int idx) conn->db = PQconnectdb(pgsql_conn_string); if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) { - req->pgsql[idx]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[idx]->error = kore_strdup(PQerrorMessage(conn->db)); + pgsql->state = KORE_PGSQL_STATE_ERROR; + pgsql->error = kore_strdup(PQerrorMessage(conn->db)); pgsql_conn_cleanup(conn); return (KORE_RESULT_ERROR); } @@ -281,8 +253,8 @@ pgsql_conn_create(struct http_request *req, int idx) static void pgsql_conn_cleanup(struct pgsql_conn *conn) { - int i; struct http_request *req; + struct kore_pgsql *pgsql; kore_debug("pgsql_conn_cleanup(): %p", conn); @@ -290,13 +262,13 @@ pgsql_conn_cleanup(struct pgsql_conn *conn) TAILQ_REMOVE(&pgsql_conn_free, conn, list); if (conn->job) { - i = conn->job->idx; req = conn->job->req; + pgsql = conn->job->pgsql; http_request_wakeup(req); - req->pgsql[i]->conn = NULL; - req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); + pgsql->conn = NULL; + pgsql->state = KORE_PGSQL_STATE_ERROR; + pgsql->error = kore_strdup(PQerrorMessage(conn->db)); kore_mem_free(conn->job->query); kore_mem_free(conn->job); @@ -311,40 +283,41 @@ pgsql_conn_cleanup(struct pgsql_conn *conn) } static void -pgsql_read_result(struct http_request *req, int i, struct pgsql_conn *conn) +pgsql_read_result(struct kore_pgsql *pgsql, int async) { - if (PQisBusy(conn->db)) { - req->pgsql[i]->state = KORE_PGSQL_STATE_WAIT; - return; + if (async) { + if (PQisBusy(pgsql->conn->db)) { + pgsql->state = KORE_PGSQL_STATE_WAIT; + return; + } } - req->pgsql[i]->result = PQgetResult(conn->db); - if (req->pgsql[i]->result == NULL) { - req->pgsql[i]->state = KORE_PGSQL_STATE_DONE; + pgsql->result = PQgetResult(pgsql->conn->db); + if (pgsql->result == NULL) { + pgsql->state = KORE_PGSQL_STATE_DONE; return; } - switch (PQresultStatus(req->pgsql[i]->result)) { + switch (PQresultStatus(pgsql->result)) { case PGRES_COPY_OUT: case PGRES_COPY_IN: case PGRES_NONFATAL_ERROR: case PGRES_COPY_BOTH: break; case PGRES_COMMAND_OK: - req->pgsql[i]->state = KORE_PGSQL_STATE_DONE; + pgsql->state = KORE_PGSQL_STATE_DONE; break; case PGRES_TUPLES_OK: #if PG_VERSION_NUM >= 90200 case PGRES_SINGLE_TUPLE: #endif - req->pgsql[i]->state = KORE_PGSQL_STATE_RESULT; + pgsql->state = KORE_PGSQL_STATE_RESULT; break; case PGRES_EMPTY_QUERY: case PGRES_BAD_RESPONSE: case PGRES_FATAL_ERROR: - req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = - kore_strdup(PQresultErrorMessage(req->pgsql[i]->result)); + pgsql->state = KORE_PGSQL_STATE_ERROR; + pgsql->error = kore_strdup(PQresultErrorMessage(pgsql->result)); break; } }