kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

pgsql.c (18500B)



      1 /*
      2  * Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/queue.h>
     19 
     20 #include <libpq-fe.h>
     21 #include <pg_config.h>
     22 
     23 #include "kore.h"
     24 
     25 #if !defined(KORE_NO_HTTP)
     26 #include "http.h"
     27 #endif
     28 
     29 #include "pgsql.h"
     30 
     31 #if defined(__linux__)
     32 #include "seccomp.h"
     33 
     34 static struct sock_filter filter_pgsql[] = {
     35 	/* Allow us to create sockets and call connect. */
     36 	KORE_SYSCALL_ALLOW(connect),
     37 	KORE_SYSCALL_ALLOW_ARG(socket, 0, AF_INET),
     38 	KORE_SYSCALL_ALLOW_ARG(socket, 0, AF_INET6),
     39 	KORE_SYSCALL_ALLOW_ARG(socket, 0, AF_UNIX),
     40 
     41 	/* Requires these calls. */
     42 	KORE_SYSCALL_ALLOW(getsockopt),
     43 	KORE_SYSCALL_ALLOW(getsockname),
     44 };
     45 #endif
     46 
     47 struct pgsql_wait {
     48 	struct kore_pgsql	*pgsql;
     49 	TAILQ_ENTRY(pgsql_wait)	list;
     50 };
     51 
     52 struct pgsql_job {
     53 	struct kore_pgsql	*pgsql;
     54 	TAILQ_ENTRY(pgsql_job)	list;
     55 };
     56 
     57 #define PGSQL_CONN_MAX		2
     58 #define PGSQL_CONN_FREE		0x01
     59 #define PGSQL_LIST_INSERTED	0x0100
     60 #define PGSQL_QUEUE_LIMIT	1000
     61 
     62 static void	pgsql_queue_wakeup(void);
     63 static void	pgsql_cancel(struct kore_pgsql *);
     64 static void	pgsql_set_error(struct kore_pgsql *, const char *);
     65 static void	pgsql_queue_add(struct kore_pgsql *);
     66 static void	pgsql_queue_remove(struct kore_pgsql *);
     67 static void	pgsql_conn_release(struct kore_pgsql *);
     68 static void	pgsql_conn_cleanup(struct pgsql_conn *);
     69 static void	pgsql_read_result(struct kore_pgsql *);
     70 static void	pgsql_schedule(struct kore_pgsql *);
     71 
     72 static struct pgsql_conn	*pgsql_conn_create(struct kore_pgsql *,
     73 				    struct pgsql_db *);
     74 static struct pgsql_conn	*pgsql_conn_next(struct kore_pgsql *,
     75 				    struct pgsql_db *);
     76 
     77 static struct kore_pool			pgsql_job_pool;
     78 static struct kore_pool			pgsql_wait_pool;
     79 static TAILQ_HEAD(, pgsql_conn)		pgsql_conn_free;
     80 static TAILQ_HEAD(, pgsql_wait)		pgsql_wait_queue;
     81 static LIST_HEAD(, pgsql_db)		pgsql_db_conn_strings;
     82 
     83 u_int32_t	pgsql_queue_count = 0;
     84 u_int16_t	pgsql_conn_max = PGSQL_CONN_MAX;
     85 u_int32_t	pgsql_queue_limit = PGSQL_QUEUE_LIMIT;
     86 
     87 void
     88 kore_pgsql_sys_init(void)
     89 {
     90 	TAILQ_INIT(&pgsql_conn_free);
     91 	TAILQ_INIT(&pgsql_wait_queue);
     92 	LIST_INIT(&pgsql_db_conn_strings);
     93 
     94 	kore_pool_init(&pgsql_job_pool, "pgsql_job_pool",
     95 	    sizeof(struct pgsql_job), 100);
     96 	kore_pool_init(&pgsql_wait_pool, "pgsql_wait_pool",
     97 	    sizeof(struct pgsql_wait), pgsql_queue_limit);
     98 
     99 #if defined(__linux__)
    100 	kore_seccomp_filter("pgsql", filter_pgsql,
    101 	    KORE_FILTER_LEN(filter_pgsql));
    102 #endif
    103 }
    104 
    105 void
    106 kore_pgsql_sys_cleanup(void)
    107 {
    108 	struct pgsql_conn	*conn, *next;
    109 
    110 	kore_pool_cleanup(&pgsql_job_pool);
    111 	kore_pool_cleanup(&pgsql_wait_pool);
    112 
    113 	for (conn = TAILQ_FIRST(&pgsql_conn_free); conn != NULL; conn = next) {
    114 		next = TAILQ_NEXT(conn, list);
    115 		pgsql_conn_cleanup(conn);
    116 	}
    117 }
    118 
    119 void
    120 kore_pgsql_init(struct kore_pgsql *pgsql)
    121 {
    122 	memset(pgsql, 0, sizeof(*pgsql));
    123 	pgsql->state = KORE_PGSQL_STATE_INIT;
    124 }
    125 
    126 int
    127 kore_pgsql_setup(struct kore_pgsql *pgsql, const char *dbname, int flags)
    128 {
    129 	struct pgsql_db		*db;
    130 
    131 	if ((flags & KORE_PGSQL_ASYNC) && (flags & KORE_PGSQL_SYNC)) {
    132 		pgsql_set_error(pgsql, "invalid query init parameters");
    133 		return (KORE_RESULT_ERROR);
    134 	}
    135 
    136 	if (flags & KORE_PGSQL_ASYNC) {
    137 		if (pgsql->req == NULL && pgsql->cb == NULL) {
    138 			pgsql_set_error(pgsql, "nothing was bound");
    139 			return (KORE_RESULT_ERROR);
    140 		}
    141 	}
    142 
    143 	db = NULL;
    144 	pgsql->flags |= flags;
    145 
    146 	LIST_FOREACH(db, &pgsql_db_conn_strings, rlist) {
    147 		if (!strcmp(db->name, dbname))
    148 			break;
    149 	}
    150 
    151 	if (db == NULL) {
    152 		pgsql_set_error(pgsql, "no database found");
    153 		return (KORE_RESULT_ERROR);
    154 	}
    155 
    156 	if ((pgsql->conn = pgsql_conn_next(pgsql, db)) == NULL)
    157 		return (KORE_RESULT_ERROR);
    158 
    159 	if (pgsql->flags & KORE_PGSQL_ASYNC) {
    160 		pgsql->conn->job = kore_pool_get(&pgsql_job_pool);
    161 		pgsql->conn->job->pgsql = pgsql;
    162 	}
    163 
    164 	return (KORE_RESULT_OK);
    165 }
    166 
    167 #if !defined(KORE_NO_HTTP)
    168 void
    169 kore_pgsql_bind_request(struct kore_pgsql *pgsql, struct http_request *req)
    170 {
    171 	if (pgsql->req != NULL || pgsql->cb != NULL)
    172 		fatal("kore_pgsql_bind_request: already bound");
    173 
    174 	pgsql->req = req;
    175 	pgsql->flags |= PGSQL_LIST_INSERTED;
    176 
    177 	LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist);
    178 }
    179 #endif
    180 
    181 void
    182 kore_pgsql_bind_callback(struct kore_pgsql *pgsql,
    183     void (*cb)(struct kore_pgsql *, void *), void *arg)
    184 {
    185 	if (pgsql->req != NULL)
    186 		fatal("kore_pgsql_bind_callback: already bound");
    187 
    188 	if (pgsql->cb != NULL)
    189 		fatal("kore_pgsql_bind_callback: already bound");
    190 
    191 	pgsql->cb = cb;
    192 	pgsql->arg = arg;
    193 }
    194 
    195 int
    196 kore_pgsql_query(struct kore_pgsql *pgsql, const void *query)
    197 {
    198 	if (pgsql->conn == NULL) {
    199 		pgsql_set_error(pgsql, "no connection was set before query");
    200 		return (KORE_RESULT_ERROR);
    201 	}
    202 
    203 	if (pgsql->flags & KORE_PGSQL_SYNC) {
    204 		pgsql->result = PQexec(pgsql->conn->db, query);
    205 		if ((PQresultStatus(pgsql->result) != PGRES_TUPLES_OK) &&
    206 		    (PQresultStatus(pgsql->result) != PGRES_COMMAND_OK)) {
    207 			pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
    208 			return (KORE_RESULT_ERROR);
    209 		}
    210 
    211 		pgsql->state = KORE_PGSQL_STATE_DONE;
    212 	} else {
    213 		if (!PQsendQuery(pgsql->conn->db, query)) {
    214 			pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
    215 			return (KORE_RESULT_ERROR);
    216 		}
    217 
    218 		pgsql_schedule(pgsql);
    219 	}
    220 
    221 	return (KORE_RESULT_OK);
    222 }
    223 
    224 int
    225 kore_pgsql_v_query_params(struct kore_pgsql *pgsql,
    226     const void *query, int binary, int count, va_list args)
    227 {
    228 	int		i;
    229 	const char	**values;
    230 	int		*lengths, *formats, ret;
    231 
    232 	if (pgsql->conn == NULL) {
    233 		pgsql_set_error(pgsql, "no connection was set before query");
    234 		return (KORE_RESULT_ERROR);
    235 	}
    236 
    237 	if (count > 0) {
    238 		lengths = kore_calloc(count, sizeof(int));
    239 		formats = kore_calloc(count, sizeof(int));
    240 		values = kore_calloc(count, sizeof(char *));
    241 
    242 		for (i = 0; i < count; i++) {
    243 			values[i] = va_arg(args, void *);
    244 			lengths[i] = va_arg(args, int);
    245 			formats[i] = va_arg(args, int);
    246 		}
    247 	} else {
    248 		lengths = NULL;
    249 		formats = NULL;
    250 		values = NULL;
    251 	}
    252 
    253 	ret = kore_pgsql_query_param_fields(pgsql, query, binary, count,
    254 	    values, lengths, formats);
    255 
    256 	kore_free(values);
    257 	kore_free(lengths);
    258 	kore_free(formats);
    259 
    260 	return (ret);
    261 }
    262 
    263 int
    264 kore_pgsql_query_param_fields(struct kore_pgsql *pgsql, const void *query,
    265     int binary, int count, const char **values, int *lengths, int *formats)
    266 {
    267 	if (pgsql->conn == NULL) {
    268 		pgsql_set_error(pgsql, "no connection was set before query");
    269 		return (KORE_RESULT_ERROR);
    270 	}
    271 
    272 	if (pgsql->flags & KORE_PGSQL_SYNC) {
    273 		pgsql->result = PQexecParams(pgsql->conn->db, query, count,
    274 		    NULL, (const char * const *)values, lengths, formats,
    275 		    binary);
    276 
    277 		if ((PQresultStatus(pgsql->result) != PGRES_TUPLES_OK) &&
    278 		    (PQresultStatus(pgsql->result) != PGRES_COMMAND_OK)) {
    279 			pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
    280 			return (KORE_RESULT_ERROR);
    281 		}
    282 
    283 		pgsql->state = KORE_PGSQL_STATE_DONE;
    284 	} else {
    285 		if (!PQsendQueryParams(pgsql->conn->db, query, count, NULL,
    286 		    (const char * const *)values, lengths, formats, binary)) {
    287 			pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
    288 			return (KORE_RESULT_ERROR);
    289 		}
    290 
    291 		pgsql_schedule(pgsql);
    292 	}
    293 
    294 	return (KORE_RESULT_OK);
    295 }
    296 
    297 int
    298 kore_pgsql_query_params(struct kore_pgsql *pgsql,
    299     const void *query, int binary, int count, ...)
    300 {
    301 	int		ret;
    302 	va_list		args;
    303 
    304 	va_start(args, count);
    305 	ret = kore_pgsql_v_query_params(pgsql, query, binary, count, args);
    306 	va_end(args);
    307 
    308 	return (ret);
    309 }
    310 
    311 int
    312 kore_pgsql_register(const char *dbname, const char *connstring)
    313 {
    314 	struct pgsql_db		*pgsqldb;
    315 
    316 	LIST_FOREACH(pgsqldb, &pgsql_db_conn_strings, rlist) {
    317 		if (!strcmp(pgsqldb->name, dbname))
    318 			return (KORE_RESULT_ERROR);
    319 	}
    320 
    321 	pgsqldb = kore_malloc(sizeof(*pgsqldb));
    322 	pgsqldb->name = kore_strdup(dbname);
    323 	pgsqldb->conn_count = 0;
    324 	pgsqldb->conn_max = pgsql_conn_max;
    325 	pgsqldb->conn_string = kore_strdup(connstring);
    326 	LIST_INSERT_HEAD(&pgsql_db_conn_strings, pgsqldb, rlist);
    327 
    328 	return (KORE_RESULT_OK);
    329 }
    330 
    331 void
    332 kore_pgsql_handle(void *c, int err)
    333 {
    334 	struct kore_pgsql	*pgsql;
    335 	struct pgsql_conn	*conn = (struct pgsql_conn *)c;
    336 
    337 	if (err) {
    338 		pgsql_conn_cleanup(conn);
    339 		return;
    340 	}
    341 
    342 	if (!(conn->evt.flags & KORE_EVENT_READ))
    343 		fatal("%s: read event not set", __func__);
    344 
    345 	pgsql = conn->job->pgsql;
    346 
    347 	pgsql_read_result(pgsql);
    348 
    349 	if (pgsql->state == KORE_PGSQL_STATE_WAIT) {
    350 #if !defined(KORE_NO_HTTP)
    351 		if (pgsql->req != NULL)
    352 			http_request_sleep(pgsql->req);
    353 #endif
    354 		if (pgsql->cb != NULL)
    355 			pgsql->cb(pgsql, pgsql->arg);
    356 	} else {
    357 #if !defined(KORE_NO_HTTP)
    358 		if (pgsql->req != NULL)
    359 			http_request_wakeup(pgsql->req);
    360 #endif
    361 		if (pgsql->cb != NULL)
    362 			pgsql->cb(pgsql, pgsql->arg);
    363 	}
    364 }
    365 
    366 void
    367 kore_pgsql_continue(struct kore_pgsql *pgsql)
    368 {
    369 	if (pgsql->error) {
    370 		kore_free(pgsql->error);
    371 		pgsql->error = NULL;
    372 	}
    373 
    374 	if (pgsql->result) {
    375 		PQclear(pgsql->result);
    376 		pgsql->result = NULL;
    377 	}
    378 
    379 	switch (pgsql->state) {
    380 	case KORE_PGSQL_STATE_INIT:
    381 	case KORE_PGSQL_STATE_WAIT:
    382 		break;
    383 	case KORE_PGSQL_STATE_DONE:
    384 #if !defined(KORE_NO_HTTP)
    385 		if (pgsql->req != NULL)
    386 			http_request_wakeup(pgsql->req);
    387 #endif
    388 		pgsql_conn_release(pgsql);
    389 		break;
    390 	case KORE_PGSQL_STATE_ERROR:
    391 	case KORE_PGSQL_STATE_RESULT:
    392 	case KORE_PGSQL_STATE_NOTIFY:
    393 		kore_pgsql_handle(pgsql->conn, 0);
    394 		break;
    395 	default:
    396 		fatal("unknown pgsql state %d", pgsql->state);
    397 	}
    398 }
    399 
    400 void
    401 kore_pgsql_cleanup(struct kore_pgsql *pgsql)
    402 {
    403 	pgsql_queue_remove(pgsql);
    404 
    405 	if (pgsql->result != NULL)
    406 		PQclear(pgsql->result);
    407 
    408 	if (pgsql->error != NULL)
    409 		kore_free(pgsql->error);
    410 
    411 	if (pgsql->conn != NULL)
    412 		pgsql_conn_release(pgsql);
    413 
    414 	pgsql->result = NULL;
    415 	pgsql->error = NULL;
    416 	pgsql->conn = NULL;
    417 
    418 	if (pgsql->flags & PGSQL_LIST_INSERTED) {
    419 		LIST_REMOVE(pgsql, rlist);
    420 		pgsql->flags &= ~PGSQL_LIST_INSERTED;
    421 	}
    422 }
    423 
    424 void
    425 kore_pgsql_logerror(struct kore_pgsql *pgsql)
    426 {
    427 	kore_log(LOG_NOTICE, "pgsql error: %s",
    428 	    (pgsql->error) ? pgsql->error : "unknown");
    429 }
    430 
    431 int
    432 kore_pgsql_ntuples(struct kore_pgsql *pgsql)
    433 {
    434 	return (PQntuples(pgsql->result));
    435 }
    436 
    437 int
    438 kore_pgsql_nfields(struct kore_pgsql *pgsql)
    439 {
    440 	return (PQnfields(pgsql->result));
    441 }
    442 
    443 int
    444 kore_pgsql_getlength(struct kore_pgsql *pgsql, int row, int col)
    445 {
    446 	return (PQgetlength(pgsql->result, row, col));
    447 }
    448 
    449 char *
    450 kore_pgsql_fieldname(struct kore_pgsql *pgsql, int field)
    451 {
    452 	return (PQfname(pgsql->result, field));
    453 }
    454 
    455 char *
    456 kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col)
    457 {
    458 	return (PQgetvalue(pgsql->result, row, col));
    459 }
    460 
    461 int
    462 kore_pgsql_column_binary(struct kore_pgsql *pgsql, int col)
    463 {
    464 	return (PQfformat(pgsql->result, col));
    465 }
    466 
    467 static struct pgsql_conn *
    468 pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db)
    469 {
    470 	PGTransactionStatusType		state;
    471 	struct pgsql_conn		*conn;
    472 	struct kore_pgsql		rollback;
    473 
    474 rescan:
    475 	conn = NULL;
    476 
    477 	TAILQ_FOREACH(conn, &pgsql_conn_free, list) {
    478 		if (!(conn->flags & PGSQL_CONN_FREE))
    479 			fatal("got a pgsql connection that was not free?");
    480 		if (!strcmp(conn->name, db->name))
    481 			break;
    482 	}
    483 
    484 	if (conn != NULL) {
    485 		state = PQtransactionStatus(conn->db);
    486 		if (state == PQTRANS_INERROR) {
    487 			conn->flags &= ~PGSQL_CONN_FREE;
    488 			TAILQ_REMOVE(&pgsql_conn_free, conn, list);
    489 
    490 			kore_pgsql_init(&rollback);
    491 			rollback.conn = conn;
    492 			rollback.flags = KORE_PGSQL_SYNC;
    493 
    494 			if (!kore_pgsql_query(&rollback, "ROLLBACK")) {
    495 				kore_pgsql_logerror(&rollback);
    496 				kore_pgsql_cleanup(&rollback);
    497 				pgsql_conn_cleanup(conn);
    498 			} else {
    499 				kore_pgsql_cleanup(&rollback);
    500 			}
    501 
    502 			goto rescan;
    503 		}
    504 	}
    505 
    506 	if (conn == NULL) {
    507 		if (db->conn_max != 0 &&
    508 		    db->conn_count >= db->conn_max) {
    509 			if ((pgsql->flags & KORE_PGSQL_ASYNC) &&
    510 			    pgsql_queue_count < pgsql_queue_limit) {
    511 				pgsql_queue_add(pgsql);
    512 			} else {
    513 				pgsql_set_error(pgsql,
    514 				    "no available connection");
    515 			}
    516 
    517 			return (NULL);
    518 		}
    519 
    520 		if ((conn = pgsql_conn_create(pgsql, db)) == NULL)
    521 			return (NULL);
    522 	}
    523 
    524 	conn->flags &= ~PGSQL_CONN_FREE;
    525 	TAILQ_REMOVE(&pgsql_conn_free, conn, list);
    526 
    527 	return (conn);
    528 }
    529 
    530 static void
    531 pgsql_set_error(struct kore_pgsql *pgsql, const char *msg)
    532 {
    533 	if (pgsql->error != NULL)
    534 		kore_free(pgsql->error);
    535 
    536 	pgsql->error = kore_strdup(msg);
    537 	pgsql->state = KORE_PGSQL_STATE_ERROR;
    538 }
    539 
    540 static void
    541 pgsql_schedule(struct kore_pgsql *pgsql)
    542 {
    543 	int		fd;
    544 
    545 	fd = PQsocket(pgsql->conn->db);
    546 	if (fd < 0)
    547 		fatal("PQsocket returned < 0 fd on open connection");
    548 
    549 	kore_platform_schedule_read(fd, pgsql->conn);
    550 	pgsql->state = KORE_PGSQL_STATE_WAIT;
    551 	pgsql->flags |= KORE_PGSQL_SCHEDULED;
    552 
    553 #if !defined(KORE_NO_HTTP)
    554 	if (pgsql->req != NULL)
    555 		http_request_sleep(pgsql->req);
    556 #endif
    557 	if (pgsql->cb != NULL)
    558 		pgsql->cb(pgsql, pgsql->arg);
    559 }
    560 
    561 static void
    562 pgsql_queue_add(struct kore_pgsql *pgsql)
    563 {
    564 	struct pgsql_wait	*pgw;
    565 
    566 #if !defined(KORE_NO_HTTP)
    567 	if (pgsql->req != NULL)
    568 		http_request_sleep(pgsql->req);
    569 #endif
    570 
    571 	pgw = kore_pool_get(&pgsql_wait_pool);
    572 	pgw->pgsql = pgsql;
    573 
    574 	pgsql_queue_count++;
    575 	TAILQ_INSERT_TAIL(&pgsql_wait_queue, pgw, list);
    576 }
    577 
    578 static void
    579 pgsql_queue_remove(struct kore_pgsql *pgsql)
    580 {
    581 	struct pgsql_wait	*pgw, *next;
    582 
    583 	for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) {
    584 		next = TAILQ_NEXT(pgw, list);
    585 		if (pgw->pgsql != pgsql)
    586 			continue;
    587 
    588 		pgsql_queue_count--;
    589 		TAILQ_REMOVE(&pgsql_wait_queue, pgw, list);
    590 		kore_pool_put(&pgsql_wait_pool, pgw);
    591 		return;
    592 	}
    593 }
    594 
    595 static void
    596 pgsql_queue_wakeup(void)
    597 {
    598 	struct pgsql_wait	*pgw, *next;
    599 
    600 	for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) {
    601 		next = TAILQ_NEXT(pgw, list);
    602 
    603 #if !defined(KORE_NO_HTTP)
    604 		if (pgw->pgsql->req != NULL) {
    605 			if (pgw->pgsql->req->flags & HTTP_REQUEST_DELETE) {
    606 				pgsql_queue_count--;
    607 				TAILQ_REMOVE(&pgsql_wait_queue, pgw, list);
    608 				kore_pool_put(&pgsql_wait_pool, pgw);
    609 				continue;
    610 			}
    611 
    612 			http_request_wakeup(pgw->pgsql->req);
    613 		}
    614 #endif
    615 		if (pgw->pgsql->cb != NULL)
    616 			pgw->pgsql->cb(pgw->pgsql, pgw->pgsql->arg);
    617 
    618 		pgsql_queue_count--;
    619 		TAILQ_REMOVE(&pgsql_wait_queue, pgw, list);
    620 		kore_pool_put(&pgsql_wait_pool, pgw);
    621 		return;
    622 	}
    623 }
    624 
    625 static struct pgsql_conn *
    626 pgsql_conn_create(struct kore_pgsql *pgsql, struct pgsql_db *db)
    627 {
    628 	struct pgsql_conn	*conn;
    629 
    630 	if (db == NULL || db->conn_string == NULL)
    631 		fatal("pgsql_conn_create: no connection string");
    632 
    633 	db->conn_count++;
    634 
    635 	conn = kore_calloc(1, sizeof(*conn));
    636 	conn->job = NULL;
    637 	conn->flags = PGSQL_CONN_FREE;
    638 	conn->name = kore_strdup(db->name);
    639 	TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list);
    640 
    641 	conn->evt.type = KORE_TYPE_PGSQL_CONN;
    642 	conn->evt.handle = kore_pgsql_handle;
    643 
    644 	conn->db = PQconnectdb(db->conn_string);
    645 	if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) {
    646 		pgsql_set_error(pgsql, PQerrorMessage(conn->db));
    647 		pgsql_conn_cleanup(conn);
    648 		return (NULL);
    649 	}
    650 
    651 	return (conn);
    652 }
    653 
    654 static void
    655 pgsql_conn_release(struct kore_pgsql *pgsql)
    656 {
    657 	int		fd;
    658 	PGresult	*result;
    659 
    660 	if (pgsql->conn == NULL)
    661 		return;
    662 
    663 	/* Async query cleanup */
    664 	if (pgsql->flags & KORE_PGSQL_ASYNC) {
    665 		if (pgsql->flags & KORE_PGSQL_SCHEDULED) {
    666 			fd = PQsocket(pgsql->conn->db);
    667 			kore_platform_disable_read(fd);
    668 
    669 			if (pgsql->state != KORE_PGSQL_STATE_DONE)
    670 				pgsql_cancel(pgsql);
    671 		}
    672 		kore_pool_put(&pgsql_job_pool, pgsql->conn->job);
    673 	}
    674 
    675 	/* Drain just in case. */
    676 	while ((result = PQgetResult(pgsql->conn->db)) != NULL)
    677 		PQclear(result);
    678 
    679 	pgsql->conn->job = NULL;
    680 	pgsql->conn->flags |= PGSQL_CONN_FREE;
    681 	TAILQ_INSERT_TAIL(&pgsql_conn_free, pgsql->conn, list);
    682 
    683 	pgsql->conn = NULL;
    684 	pgsql->state = KORE_PGSQL_STATE_COMPLETE;
    685 
    686 	if (pgsql->cb != NULL)
    687 		pgsql->cb(pgsql, pgsql->arg);
    688 
    689 	pgsql_queue_wakeup();
    690 }
    691 
    692 static void
    693 pgsql_conn_cleanup(struct pgsql_conn *conn)
    694 {
    695 	struct kore_pgsql	*pgsql;
    696 	struct pgsql_db		*pgsqldb;
    697 
    698 	if (conn->flags & PGSQL_CONN_FREE)
    699 		TAILQ_REMOVE(&pgsql_conn_free, conn, list);
    700 
    701 	if (conn->job) {
    702 		pgsql = conn->job->pgsql;
    703 #if !defined(KORE_NO_HTTP)
    704 		if (pgsql->req != NULL)
    705 			http_request_wakeup(pgsql->req);
    706 #endif
    707 		pgsql->conn = NULL;
    708 		pgsql_set_error(pgsql, PQerrorMessage(conn->db));
    709 
    710 		kore_pool_put(&pgsql_job_pool, conn->job);
    711 		conn->job = NULL;
    712 	}
    713 
    714 	if (conn->db != NULL)
    715 		PQfinish(conn->db);
    716 
    717 	LIST_FOREACH(pgsqldb, &pgsql_db_conn_strings, rlist) {
    718 		if (!strcmp(pgsqldb->name, conn->name)) {
    719 			pgsqldb->conn_count--;
    720 			break;
    721 		}
    722 	}
    723 
    724 	kore_free(conn->name);
    725 	kore_free(conn);
    726 }
    727 
    728 static void
    729 pgsql_read_result(struct kore_pgsql *pgsql)
    730 {
    731 	struct pgsql_conn	*conn;
    732 	PGnotify		*notify;
    733 	int			saved_errno;
    734 
    735 	conn = pgsql->conn;
    736 
    737 	for (;;) {
    738 		if (!PQconsumeInput(conn->db)) {
    739 			pgsql->state = KORE_PGSQL_STATE_ERROR;
    740 			pgsql->error = kore_strdup(PQerrorMessage(conn->db));
    741 			return;
    742 		}
    743 
    744 		saved_errno = errno;
    745 
    746 		if (PQisBusy(conn->db)) {
    747 			if (saved_errno != EAGAIN && saved_errno != EWOULDBLOCK)
    748 				continue;
    749 			pgsql->state = KORE_PGSQL_STATE_WAIT;
    750 			conn->evt.flags &= ~KORE_EVENT_READ;
    751 			return;
    752 		}
    753 
    754 		break;
    755 	}
    756 
    757 	while ((notify = PQnotifies(conn->db)) != NULL) {
    758 		pgsql->state = KORE_PGSQL_STATE_NOTIFY;
    759 		pgsql->notify.extra = notify->extra;
    760 		pgsql->notify.channel = notify->relname;
    761 
    762 		if (pgsql->cb != NULL)
    763 			pgsql->cb(pgsql, pgsql->arg);
    764 
    765 		PQfreemem(notify);
    766 	}
    767 
    768 	pgsql->result = PQgetResult(conn->db);
    769 	if (pgsql->result == NULL) {
    770 		pgsql->state = KORE_PGSQL_STATE_DONE;
    771 		return;
    772 	}
    773 
    774 	switch (PQresultStatus(pgsql->result)) {
    775 #if PG_VERSION_NUM >= 140000
    776 	case PGRES_PIPELINE_SYNC:
    777 	case PGRES_PIPELINE_ABORTED:
    778 #endif
    779 	case PGRES_COPY_OUT:
    780 	case PGRES_COPY_IN:
    781 	case PGRES_NONFATAL_ERROR:
    782 	case PGRES_COPY_BOTH:
    783 		break;
    784 	case PGRES_COMMAND_OK:
    785 		pgsql->state = KORE_PGSQL_STATE_DONE;
    786 		break;
    787 	case PGRES_TUPLES_OK:
    788 #if PG_VERSION_NUM >= 90200
    789 	case PGRES_SINGLE_TUPLE:
    790 #endif
    791 		pgsql->state = KORE_PGSQL_STATE_RESULT;
    792 		break;
    793 	case PGRES_EMPTY_QUERY:
    794 	case PGRES_BAD_RESPONSE:
    795 	case PGRES_FATAL_ERROR:
    796 		pgsql_set_error(pgsql, PQresultErrorMessage(pgsql->result));
    797 		break;
    798 	}
    799 }
    800 
    801 static void
    802 pgsql_cancel(struct kore_pgsql *pgsql)
    803 {
    804 	PGcancel	*cancel;
    805 	char		buf[256];
    806 
    807 	if ((cancel = PQgetCancel(pgsql->conn->db)) != NULL) {
    808 		if (!PQcancel(cancel, buf, sizeof(buf)))
    809 			kore_log(LOG_ERR, "failed to cancel: %s", buf);
    810 		PQfreeCancel(cancel);
    811 	}
    812 }