kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit f348feef820afd60ca5e6c5284f5f0c1fca3d3e9
parent b3a48f3c15ef4516b3dac641f413938bfa8146e7
Author: Joris Vink <joris@coders.se>
Date:   Tue, 13 Feb 2018 13:21:27 +0100

Add pgsql_queue_limit configuration option.

Limits the number of queued asynchronous queries kore will allow
before starting to return errors for KORE_PGSQL_ASYNC setups.

By default set to 1000.

Avoids uncontrolled growth of the pgsql_queue_wait pool.

Diffstat:
includes/pgsql.h | 1+
src/config.c | 16++++++++++++++++
src/pgsql.c | 13++++++++++---
3 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/includes/pgsql.h b/includes/pgsql.h @@ -64,6 +64,7 @@ struct kore_pgsql { }; extern u_int16_t pgsql_conn_max; +extern u_int32_t pgsql_queue_limit; void kore_pgsql_sys_init(void); void kore_pgsql_sys_cleanup(void); diff --git a/src/config.c b/src/config.c @@ -99,6 +99,7 @@ static int configure_websocket_timeout(char *); #if defined(KORE_USE_PGSQL) static int configure_pgsql_conn_max(char *); +static int configure_pgsql_queue_limit(char *); #endif #if defined(KORE_USE_TASKS) @@ -165,6 +166,7 @@ static struct { #endif #if defined(KORE_USE_PGSQL) { "pgsql_conn_max", configure_pgsql_conn_max }, + { "pgsql_queue_limit", configure_pgsql_queue_limit }, #endif #if defined(KORE_USE_TASKS) { "task_threads", configure_task_threads }, @@ -1100,6 +1102,20 @@ configure_pgsql_conn_max(char *option) return (KORE_RESULT_OK); } + +static int +configure_pgsql_queue_limit(char *option) +{ + int err; + + pgsql_queue_limit = kore_strtonum(option, 10, 0, UINT_MAX, &err); + if (err != KORE_RESULT_OK) { + printf("bad value for pgsql_queue_limit: %s\n", option); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} #endif #if defined(KORE_USE_TASKS) diff --git a/src/pgsql.c b/src/pgsql.c @@ -41,6 +41,7 @@ struct pgsql_job { #define PGSQL_CONN_MAX 2 #define PGSQL_CONN_FREE 0x01 #define PGSQL_LIST_INSERTED 0x0100 +#define PGSQL_QUEUE_LIMIT 1000 static void pgsql_queue_wakeup(void); static void pgsql_cancel(struct kore_pgsql *); @@ -64,7 +65,9 @@ static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free; static TAILQ_HEAD(, pgsql_wait) pgsql_wait_queue; static LIST_HEAD(, pgsql_db) pgsql_db_conn_strings; -u_int16_t pgsql_conn_max = PGSQL_CONN_MAX; +u_int32_t pgsql_queue_count = 0; +u_int16_t pgsql_conn_max = PGSQL_CONN_MAX; +u_int32_t pgsql_queue_limit = PGSQL_QUEUE_LIMIT; void kore_pgsql_sys_init(void) @@ -76,7 +79,7 @@ kore_pgsql_sys_init(void) kore_pool_init(&pgsql_job_pool, "pgsql_job_pool", sizeof(struct pgsql_job), 100); kore_pool_init(&pgsql_wait_pool, "pgsql_wait_pool", - sizeof(struct pgsql_wait), 100); + sizeof(struct pgsql_wait), pgsql_queue_limit); } void @@ -477,7 +480,8 @@ rescan: if (conn == NULL) { if (db->conn_max != 0 && db->conn_count >= db->conn_max) { - if (pgsql->flags & KORE_PGSQL_ASYNC) { + if ((pgsql->flags & KORE_PGSQL_ASYNC) && + pgsql_queue_count < pgsql_queue_limit) { pgsql_queue_add(pgsql); } else { pgsql_set_error(pgsql, @@ -540,6 +544,8 @@ pgsql_queue_add(struct kore_pgsql *pgsql) pgw = kore_pool_get(&pgsql_wait_pool); pgw->pgsql = pgsql; + + pgsql_queue_count++; TAILQ_INSERT_TAIL(&pgsql_wait_queue, pgw, list); } @@ -553,6 +559,7 @@ pgsql_queue_remove(struct kore_pgsql *pgsql) if (pgw->pgsql != pgsql) continue; + pgsql_queue_count--; TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); kore_pool_put(&pgsql_wait_pool, pgw); return;