kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit de7a6d48552f96bdc12512148fc013238838330e
parent c3d298493988fbecb6b7adde8bf6eac145ac92e4
Author: Joris Vink <joris@coders.se>
Date:   Mon,  6 Feb 2017 11:40:33 +0100

pgsql improvements.

- adds new cleanup function that workers will call.
- adds kore_pgsql_nfields() to return number of fields in result.
- add kore_pgsql_fieldname() to return name of a given field.

This commit also changes the behaviour of pgsql_conn_release() in
that it will now cancel the active query before releasing the connection.

This makes sure that if long running queries are active they are hopefully
cancelled if an http request is removed while such queries are still running.

Diffstat:
includes/pgsql.h | 3+++
src/pgsql.c | 44+++++++++++++++++++++++++++++++++++++++-----
2 files changed, 42 insertions(+), 5 deletions(-)

diff --git a/includes/pgsql.h b/includes/pgsql.h @@ -60,6 +60,7 @@ struct kore_pgsql { extern u_int16_t pgsql_conn_max; void kore_pgsql_init(void); +void kore_pgsql_sys_cleanup(void); int kore_pgsql_query_init(struct kore_pgsql *, struct http_request *, const char *, int); void kore_pgsql_handle(void *, int); @@ -72,8 +73,10 @@ int kore_pgsql_v_query_params(struct kore_pgsql *, const char *, int, int, va_list); int kore_pgsql_register(const char *, const char *); int kore_pgsql_ntuples(struct kore_pgsql *); +int kore_pgsql_nfields(struct kore_pgsql *); void kore_pgsql_logerror(struct kore_pgsql *); void kore_pgsql_queue_remove(struct http_request *); +char *kore_pgsql_fieldname(struct kore_pgsql *, int); char *kore_pgsql_getvalue(struct kore_pgsql *, int, int); int kore_pgsql_getlength(struct kore_pgsql *, int, int); diff --git a/src/pgsql.c b/src/pgsql.c @@ -76,6 +76,20 @@ kore_pgsql_init(void) sizeof(struct pgsql_wait), 100); } +void +kore_pgsql_sys_cleanup(void) +{ + struct pgsql_conn *conn, *next; + + kore_pool_cleanup(&pgsql_job_pool); + kore_pool_cleanup(&pgsql_wait_pool); + + for (conn = TAILQ_FIRST(&pgsql_conn_free); conn != NULL; conn = next) { + next = TAILQ_NEXT(conn, list); + pgsql_conn_cleanup(conn); + } +} + int kore_pgsql_query_init(struct kore_pgsql *pgsql, struct http_request *req, const char *dbname, int flags) @@ -347,12 +361,24 @@ kore_pgsql_ntuples(struct kore_pgsql *pgsql) } int +kore_pgsql_nfields(struct kore_pgsql *pgsql) +{ + return (PQnfields(pgsql->result)); +} + +int kore_pgsql_getlength(struct kore_pgsql *pgsql, int row, int col) { return (PQgetlength(pgsql->result, row, col)); } char * +kore_pgsql_fieldname(struct kore_pgsql *pgsql, int field) +{ + return (PQfname(pgsql->result, field)); +} + +char * kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) { return (PQgetvalue(pgsql->result, row, col)); @@ -501,19 +527,27 @@ static void pgsql_conn_release(struct kore_pgsql *pgsql) { int fd; + PGcancel *cancel; + char buf[256]; if (pgsql->conn == NULL) return; /* Async query cleanup */ if (pgsql->flags & KORE_PGSQL_ASYNC) { - if (pgsql->conn != NULL) { - if (pgsql->flags & KORE_PGSQL_SCHEDULED) { - fd = PQsocket(pgsql->conn->db); - kore_platform_disable_read(fd); + if (pgsql->flags & KORE_PGSQL_SCHEDULED) { + fd = PQsocket(pgsql->conn->db); + kore_platform_disable_read(fd); + + if ((cancel = PQgetCancel(pgsql->conn->db)) != NULL) { + if (!PQcancel(cancel, buf, sizeof(buf))) { + kore_log(LOG_ERR, + "failed to cancel: %s", buf); + } + PQfreeCancel(cancel); } - kore_pool_put(&pgsql_job_pool, pgsql->conn->job); } + kore_pool_put(&pgsql_job_pool, pgsql->conn->job); } /* Drain just in case. */