kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 7f1a9b8092454eba6ab91467a8f88bbdf307dd65
parent 64156707536cb11e2fca8abece1e0173af3214ee
Author: Joris Vink <joris@coders.se>
Date:   Tue, 11 Jul 2017 15:11:13 +0200

Several postgresql improvements.

- Make pgsql_conn_count count per database rather then globally.
  This means you now define the number of clients *per* database registered
  rather then the number of clients in total of all databases.

- In case a connection is in failed transaction state Kore will now
  automatically rollback the transaction before placing that connection
  back in the connection pool.

Diffstat:
includes/pgsql.h | 2++
src/pgsql.c | 74+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
2 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/includes/pgsql.h b/includes/pgsql.h @@ -43,6 +43,8 @@ struct pgsql_conn { struct pgsql_db { char *name; char *conn_string; + u_int16_t pgsql_conn_max; + u_int16_t pgsql_conn_count; LIST_ENTRY(pgsql_db) rlist; }; diff --git a/src/pgsql.c b/src/pgsql.c @@ -51,6 +51,7 @@ static void pgsql_conn_release(struct kore_pgsql *); static void pgsql_conn_cleanup(struct pgsql_conn *); static void pgsql_read_result(struct kore_pgsql *); static void pgsql_schedule(struct kore_pgsql *); +static void pgsql_rollback_state(struct kore_pgsql *, void *); static struct pgsql_conn *pgsql_conn_create(struct kore_pgsql *, struct pgsql_db *); @@ -62,13 +63,12 @@ static struct kore_pool pgsql_wait_pool; static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free; static TAILQ_HEAD(, pgsql_wait) pgsql_wait_queue; static LIST_HEAD(, pgsql_db) pgsql_db_conn_strings; -static u_int16_t pgsql_conn_count; + u_int16_t pgsql_conn_max = PGSQL_CONN_MAX; void kore_pgsql_sys_init(void) { - pgsql_conn_count = 0; TAILQ_INIT(&pgsql_conn_free); TAILQ_INIT(&pgsql_wait_queue); LIST_INIT(&pgsql_db_conn_strings); @@ -179,7 +179,6 @@ kore_pgsql_query(struct kore_pgsql *pgsql, const char *query) if (pgsql->flags & KORE_PGSQL_SYNC) { pgsql->result = PQexec(pgsql->conn->db, query); - if ((PQresultStatus(pgsql->result) != PGRES_TUPLES_OK) && (PQresultStatus(pgsql->result) != PGRES_COMMAND_OK)) { pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db)); @@ -290,6 +289,7 @@ kore_pgsql_register(const char *dbname, const char *connstring) pgsqldb = kore_malloc(sizeof(*pgsqldb)); pgsqldb->name = kore_strdup(dbname); + pgsqldb->pgsql_conn_max = pgsql_conn_max; pgsqldb->conn_string = kore_strdup(connstring); LIST_INSERT_HEAD(&pgsql_db_conn_strings, pgsqldb, rlist); @@ -432,8 +432,11 @@ kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) static struct pgsql_conn * pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db) { - struct pgsql_conn *conn; + PGTransactionStatusType state; + struct pgsql_conn *conn; + struct kore_pgsql *rollback; +rescan: conn = NULL; TAILQ_FOREACH(conn, &pgsql_conn_free, list) { @@ -443,8 +446,36 @@ pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db) break; } + if (conn != NULL) { + state = PQtransactionStatus(conn->db); + if (state == PQTRANS_INERROR) { + conn->flags &= ~PGSQL_CONN_FREE; + TAILQ_REMOVE(&pgsql_conn_free, conn, list); + + rollback = kore_malloc(sizeof(*rollback)); + kore_pgsql_init(rollback); + kore_pgsql_bind_callback(rollback, + pgsql_rollback_state, NULL); + rollback->flags |= KORE_PGSQL_ASYNC; + + rollback->conn = conn; + rollback->conn->job = kore_pool_get(&pgsql_job_pool); + rollback->conn->job->pgsql = rollback; + + if (!kore_pgsql_query(rollback, "ROLLBACK")) { + kore_pgsql_logerror(rollback); + kore_pgsql_cleanup(rollback); + kore_free(rollback); + pgsql_conn_cleanup(conn); + return (NULL); + } + + goto rescan; + } + } + if (conn == NULL) { - if (pgsql_conn_count >= pgsql_conn_max) { + if (db->pgsql_conn_count >= db->pgsql_conn_max) { if (pgsql->flags & KORE_PGSQL_ASYNC) { pgsql_queue_add(pgsql); } else { @@ -563,7 +594,7 @@ pgsql_conn_create(struct kore_pgsql *pgsql, struct pgsql_db *db) if (db == NULL || db->conn_string == NULL) fatal("pgsql_conn_create: no connection string"); - pgsql_conn_count++; + db->pgsql_conn_count++; conn = kore_malloc(sizeof(*conn)); conn->job = NULL; @@ -588,6 +619,7 @@ static void pgsql_conn_release(struct kore_pgsql *pgsql) { int fd; + PGresult *result; if (pgsql->conn == NULL) return; @@ -605,8 +637,8 @@ pgsql_conn_release(struct kore_pgsql *pgsql) } /* Drain just in case. */ - while (PQgetResult(pgsql->conn->db) != NULL) - ; + while ((result = PQgetResult(pgsql->conn->db)) != NULL) + PQclear(result); pgsql->conn->job = NULL; pgsql->conn->flags |= PGSQL_CONN_FREE; @@ -625,6 +657,7 @@ static void pgsql_conn_cleanup(struct pgsql_conn *conn) { struct kore_pgsql *pgsql; + struct pgsql_db *pgsqldb; kore_debug("pgsql_conn_cleanup(): %p", conn); @@ -647,7 +680,13 @@ pgsql_conn_cleanup(struct pgsql_conn *conn) if (conn->db != NULL) PQfinish(conn->db); - pgsql_conn_count--; + LIST_FOREACH(pgsqldb, &pgsql_db_conn_strings, rlist) { + if (strcmp(pgsqldb->name, conn->name)) { + pgsqldb->pgsql_conn_count--; + break; + } + } + kore_free(conn->name); kore_free(conn); } @@ -701,3 +740,20 @@ pgsql_cancel(struct kore_pgsql *pgsql) PQfreeCancel(cancel); } } + +static void +pgsql_rollback_state(struct kore_pgsql *pgsql, void *arg) +{ + switch (pgsql->state) { + case KORE_PGSQL_STATE_ERROR: + kore_pgsql_logerror(pgsql); + kore_pgsql_cleanup(pgsql); + break; + case KORE_PGSQL_STATE_COMPLETE: + kore_pgsql_cleanup(pgsql); + break; + default: + kore_pgsql_continue(pgsql); + break; + } +}