commit ace8c4e80c3ffce6ce2db9622e5cb6781632ac30
parent 80698ee40aa56648e9ac7edf390e8b518e2afa66
Author: Joris Vink <joris@coders.se>
Date: Mon, 6 Feb 2017 11:42:53 +0100
Add asynchronous pgsql query support to python.
This commit adds the ability to use python "await" to suspend
execution of your page handler until the query sent to postgresql
has returned a result.
This is built upon the existing asynchrous query framework Kore had.
With this you can now write stuff like:
async def page(req):
result = await req.pgsql("db", "SELECT name FROM table");
req.response(200, json.dumps(result).encode("utf-8"))
The above code will fire off a query and suspend itself so Kore can
take care of business as usual until the query is successful at which
point Kore will jump back into the handler and resume.
This does not use threading, it's purely based on Python's excellent
coroutines and generators and Kore its built-in pgsql support.
Diffstat:
8 files changed, 342 insertions(+), 63 deletions(-)
diff --git a/includes/http.h b/includes/http.h
@@ -19,6 +19,10 @@
#ifndef __H_HTTP_H
#define __H_HTTP_H
+#if defined(KORE_USE_PYTHON)
+#include "python_api.h"
+#endif
+
#include <sys/types.h>
#include <sys/queue.h>
@@ -189,7 +193,7 @@ struct http_request {
struct kore_module_handle *hdlr;
#if defined(KORE_USE_PYTHON)
- void *py_object;
+ PyObject *py_coro;
#endif
LIST_HEAD(, kore_task) tasks;
@@ -210,7 +214,7 @@ struct http_state {
extern int http_request_count;
extern u_int16_t http_header_max;
-extern u_int64_t http_body_max;
+extern size_t http_body_max;
extern u_int64_t http_hsts_enable;
extern u_int16_t http_keepalive_time;
extern u_int32_t http_request_limit;
diff --git a/includes/python_methods.h b/includes/python_methods.h
@@ -17,6 +17,11 @@
static PyObject *python_kore_log(PyObject *, PyObject *);
static PyObject *python_kore_fatal(PyObject *, PyObject *);
static PyObject *python_kore_listen(PyObject *, PyObject *);
+
+#if defined(KORE_USE_PGSQL)
+static PyObject *python_kore_pgsql_register(PyObject *, PyObject *);
+#endif
+
static PyObject *python_websocket_send(PyObject *, PyObject *);
static PyObject *python_websocket_broadcast(PyObject *, PyObject *);
@@ -31,6 +36,9 @@ static struct PyMethodDef pykore_methods[] = {
METHOD("listen", python_kore_listen, METH_VARARGS),
METHOD("websocket_send", python_websocket_send, METH_VARARGS),
METHOD("websocket_broadcast", python_websocket_broadcast, METH_VARARGS),
+#if defined(KORE_USE_PGSQL)
+ METHOD("register_database", python_kore_pgsql_register, METH_VARARGS),
+#endif
{ NULL, NULL, 0, NULL }
};
@@ -64,10 +72,10 @@ static PyTypeObject pyconnection_type = {
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
};
-#if !defined(KORE_NO_HTTP)
struct pyhttp_request {
PyObject_HEAD
struct http_request *req;
+ PyObject *data;
};
struct pyhttp_file {
@@ -78,6 +86,9 @@ struct pyhttp_file {
static void pyhttp_dealloc(struct pyhttp_request *);
static void pyhttp_file_dealloc(struct pyhttp_file *);
+#if defined(KORE_USE_PGSQL)
+static PyObject *pyhttp_pgsql(struct pyhttp_request *, PyObject *);
+#endif
static PyObject *pyhttp_response(struct pyhttp_request *, PyObject *);
static PyObject *pyhttp_argument(struct pyhttp_request *, PyObject *);
static PyObject *pyhttp_body_read(struct pyhttp_request *, PyObject *);
@@ -91,6 +102,9 @@ static PyObject *pyhttp_websocket_handshake(struct pyhttp_request *,
PyObject *);
static PyMethodDef pyhttp_request_methods[] = {
+#if defined(KORE_USE_PGSQL)
+ METHOD("pgsql", pyhttp_pgsql, METH_VARARGS),
+#endif
METHOD("response", pyhttp_response, METH_VARARGS),
METHOD("argument", pyhttp_argument, METH_VARARGS),
METHOD("body_read", pyhttp_body_read, METH_VARARGS),
@@ -104,13 +118,10 @@ static PyMethodDef pyhttp_request_methods[] = {
METHOD(NULL, NULL, -1)
};
-static int pyhttp_set_state(struct pyhttp_request *, PyObject *, void *);
-
static PyObject *pyhttp_get_host(struct pyhttp_request *, void *);
static PyObject *pyhttp_get_path(struct pyhttp_request *, void *);
static PyObject *pyhttp_get_body(struct pyhttp_request *, void *);
static PyObject *pyhttp_get_agent(struct pyhttp_request *, void *);
-static PyObject *pyhttp_get_state(struct pyhttp_request *, void *);
static PyObject *pyhttp_get_method(struct pyhttp_request *, void *);
static PyObject *pyhttp_get_connection(struct pyhttp_request *, void *);
@@ -121,7 +132,6 @@ static PyGetSetDef pyhttp_request_getset[] = {
GETTER("agent", pyhttp_get_agent),
GETTER("method", pyhttp_get_method),
GETTER("connection", pyhttp_get_connection),
- GETSET("state", pyhttp_get_state, pyhttp_set_state),
GETTER(NULL, NULL)
};
@@ -162,4 +172,43 @@ static PyTypeObject pyhttp_file_type = {
.tp_basicsize = sizeof(struct pyhttp_file),
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
};
+
+#if defined(KORE_USE_PGSQL)
+
+#define PYKORE_PGSQL_INITIALIZE 1
+#define PYKORE_PGSQL_QUERY 2
+#define PYKORE_PGSQL_WAIT 3
+
+struct pykore_pgsql {
+ PyObject_HEAD
+ int state;
+ char *db;
+ char *query;
+ struct http_request *req;
+ PyObject *result;
+ struct kore_pgsql sql;
+};
+
+static void pykore_pgsql_dealloc(struct pykore_pgsql *);
+int pykore_pgsql_result(struct pykore_pgsql *);
+
+static PyObject *pykore_pgsql_await(PyObject *);
+static PyObject *pykore_pgsql_iternext(struct pykore_pgsql *);
+
+static PyAsyncMethods pykore_pgsql_async = {
+ (unaryfunc)pykore_pgsql_await,
+ NULL,
+ NULL
+};
+
+static PyTypeObject pykore_pgsql_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ .tp_name = "kore.pgsql",
+ .tp_doc = "struct kore_pgsql",
+ .tp_as_async = &pykore_pgsql_async,
+ .tp_iternext = (iternextfunc)pykore_pgsql_iternext,
+ .tp_basicsize = sizeof(struct pykore_pgsql),
+ .tp_dealloc = (destructor)pykore_pgsql_dealloc,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+};
#endif
diff --git a/src/config.c b/src/config.c
@@ -603,7 +603,7 @@ configure_http_body_max(char *option)
{
int err;
- http_body_max = kore_strtonum(option, 10, 0, LONG_MAX, &err);
+ http_body_max = kore_strtonum(option, 10, 0, SIZE_MAX, &err);
if (err != KORE_RESULT_OK) {
printf("bad http_body_max value: %s\n", option);
return (KORE_RESULT_ERROR);
diff --git a/src/http.c b/src/http.c
@@ -69,7 +69,7 @@ u_int32_t http_request_limit = HTTP_REQUEST_LIMIT;
u_int64_t http_hsts_enable = HTTP_HSTS_ENABLE;
u_int16_t http_header_max = HTTP_HEADER_MAX_LEN;
u_int16_t http_keepalive_time = HTTP_KEEPALIVE_TIME;
-u_int64_t http_body_max = HTTP_BODY_MAX_LEN;
+size_t http_body_max = HTTP_BODY_MAX_LEN;
u_int64_t http_body_disk_offload = HTTP_BODY_DISK_OFFLOAD;
char *http_body_disk_path = HTTP_BODY_DISK_PATH;
@@ -235,7 +235,7 @@ http_request_new(struct connection *c, const char *host,
req->http_body_path = NULL;
#if defined(KORE_USE_PYTHON)
- req->py_object = NULL;
+ req->py_coro = NULL;
#endif
req->host = kore_pool_get(&http_host_pool);
@@ -431,6 +431,9 @@ http_request_free(struct http_request *req)
}
#endif
+#if defined(KORE_USE_PYTHON)
+ Py_XDECREF(req->py_coro);
+#endif
#if defined(KORE_USE_PGSQL)
while (!LIST_EMPTY(&(req->pgsqls))) {
pgsql = LIST_FIRST(&(req->pgsqls));
@@ -541,6 +544,9 @@ http_response(struct http_request *req, int status, const void *d, size_t l)
{
kore_debug("http_response(%p, %d, %p, %zu)", req, status, d, l);
+ if (req->owner == NULL)
+ return;
+
req->status = status;
switch (req->owner->proto) {
@@ -560,6 +566,9 @@ http_response_stream(struct http_request *req, int status, void *base,
{
struct netbuf *nb;
+ if (req->owner == NULL)
+ return;
+
req->status = status;
switch (req->owner->proto) {
@@ -721,7 +730,7 @@ http_header_recv(struct netbuf *nb)
}
if (req->content_length > http_body_max) {
- kore_log(LOG_NOTICE, "body too large (%ld > %ld)",
+ kore_log(LOG_NOTICE, "body too large (%zu > %zu)",
req->content_length, http_body_max);
req->flags |= HTTP_REQUEST_DELETE;
http_error_response(req->owner, 413);
diff --git a/src/kore.c b/src/kore.c
@@ -236,6 +236,12 @@ main(int argc, char *argv[])
kore_listener_cleanup();
kore_log(LOG_NOTICE, "goodbye");
+#if defined(KORE_USE_PYTHON)
+ kore_python_cleanup();
+#endif
+
+ kore_mem_cleanup();
+
return (0);
}
diff --git a/src/pool.c b/src/pool.c
@@ -81,7 +81,7 @@ kore_pool_get(struct kore_pool *pool)
#endif
if (LIST_EMPTY(&(pool->freelist))) {
- kore_log(LOG_NOTICE, "pool %s is exhausted (%d/%d)",
+ kore_log(LOG_NOTICE, "pool %s is exhausted (%zu/%zu)",
pool->name, pool->inuse, pool->elms);
pool_region_create(pool, pool->elms);
}
@@ -136,7 +136,7 @@ pool_region_create(struct kore_pool *pool, size_t elms)
struct kore_pool_region *reg;
struct kore_pool_entry *entry;
- kore_debug("pool_region_create(%p, %d)", pool, elms);
+ kore_debug("pool_region_create(%p, %zu)", pool, elms);
if ((reg = calloc(1, sizeof(struct kore_pool_region))) == NULL)
fatal("pool_region_create: calloc: %s", errno_s);
diff --git a/src/python.c b/src/python.c
@@ -20,9 +20,10 @@
#include <libgen.h>
#include "kore.h"
-
-#if !defined(KORE_NO_HTTP)
#include "http.h"
+
+#if defined(KORE_USE_PGSQL)
+#include "pgsql.h"
#endif
#include "python_api.h"
@@ -34,21 +35,23 @@ static void python_log_error(const char *);
static PyObject *pyconnection_alloc(struct connection *);
static PyObject *python_callable(PyObject *, const char *);
-#if !defined(KORE_NO_HTTP)
static PyObject *pyhttp_file_alloc(struct http_file *);
static PyObject *pyhttp_request_alloc(struct http_request *);
+
+#if defined(KORE_USE_PGSQL)
+static PyObject *pykore_pgsql_alloc(struct http_request *,
+ const char *, const char *);
#endif
static void python_append_path(const char *);
+static int python_coroutine_run(struct http_request *);
static void python_push_integer(PyObject *, const char *, long);
static void python_push_type(const char *, PyObject *, PyTypeObject *);
-#if !defined(KORE_NO_HTTP)
static int python_runtime_http_request(void *, struct http_request *);
static int python_runtime_validator(void *, struct http_request *, void *);
static void python_runtime_wsmessage(void *, struct connection *,
u_int8_t, const void *, size_t);
-#endif
static void python_runtime_execute(void *);
static int python_runtime_onload(void *, int);
static void python_runtime_connect(void *, struct connection *);
@@ -72,13 +75,11 @@ struct kore_module_functions kore_python_module = {
struct kore_runtime kore_python_runtime = {
KORE_RUNTIME_PYTHON,
-#if !defined(KORE_NO_HTTP)
.http_request = python_runtime_http_request,
.validator = python_runtime_validator,
.wsconnect = python_runtime_connect,
.wsmessage = python_runtime_wsmessage,
.wsdisconnect = python_runtime_connect,
-#endif
.onload = python_runtime_onload,
.connect = python_runtime_connect,
.execute = python_runtime_execute
@@ -100,7 +101,6 @@ static struct {
{ "CONN_PROTO_UNKNOWN", CONN_PROTO_UNKNOWN },
{ "CONN_PROTO_WEBSOCKET", CONN_PROTO_WEBSOCKET },
{ "CONN_STATE_ESTABLISHED", CONN_STATE_ESTABLISHED },
-#if !defined(KORE_NO_HTTP)
{ "METHOD_GET", HTTP_METHOD_GET },
{ "METHOD_PUT", HTTP_METHOD_PUT },
{ "METHOD_HEAD", HTTP_METHOD_HEAD },
@@ -110,7 +110,6 @@ static struct {
{ "WEBSOCKET_OP_BINARY", WEBSOCKET_OP_BINARY },
{ "WEBSOCKET_BROADCAST_LOCAL", WEBSOCKET_BROADCAST_LOCAL },
{ "WEBSOCKET_BROADCAST_GLOBAL", WEBSOCKET_BROADCAST_GLOBAL },
-#endif
{ NULL, -1 }
};
@@ -177,7 +176,7 @@ python_log_error(const char *function)
{
PyObject *type, *value, *traceback;
- if (!PyErr_Occurred())
+ if (!PyErr_Occurred() || PyErr_ExceptionMatches(PyExc_StopIteration))
return;
PyErr_Fetch(&type, &value, &traceback);
@@ -241,10 +240,10 @@ pyconnection_dealloc(struct pyconnection *pyc)
PyObject_Del((PyObject *)pyc);
}
-#if !defined(KORE_NO_HTTP)
static void
pyhttp_dealloc(struct pyhttp_request *pyreq)
{
+ Py_XDECREF(pyreq->data);
PyObject_Del((PyObject *)pyreq);
}
@@ -255,11 +254,39 @@ pyhttp_file_dealloc(struct pyhttp_file *pyfile)
}
static int
+python_coroutine_run(struct http_request *req)
+{
+ PyObject *item;
+
+ for (;;) {
+ PyErr_Clear();
+ item = _PyGen_Send((PyGenObject *)req->py_coro, NULL);
+ if (item == NULL) {
+ python_log_error("coroutine");
+ Py_DECREF(req->py_coro);
+ req->py_coro = NULL;
+ return (KORE_RESULT_OK);
+ }
+
+ if (item == Py_None) {
+ Py_DECREF(item);
+ break;
+ }
+
+ Py_DECREF(item);
+ }
+
+ return (KORE_RESULT_RETRY);
+}
+
+static int
python_runtime_http_request(void *addr, struct http_request *req)
{
- int ret;
PyObject *pyret, *pyreq, *args, *callable;
+ if (req->py_coro != NULL)
+ return (python_coroutine_run(req));
+
callable = (PyObject *)addr;
if ((pyreq = pyhttp_request_alloc(req)) == NULL)
@@ -280,16 +307,17 @@ python_runtime_http_request(void *addr, struct http_request *req)
fatal("failed to execute python call");
}
- if (!PyLong_Check(pyret))
- fatal("python_runtime_http_request: unexpected return type");
+ if (PyCoro_CheckExact(pyret)) {
+ req->py_coro = pyret;
+ return (python_coroutine_run(req));
+ }
- ret = (int)PyLong_AsLong(pyret);
- if (ret != KORE_RESULT_RETRY)
- Py_XDECREF(req->py_object);
+ if (pyret != Py_None)
+ fatal("python_runtime_http_request: unexpected return type");
Py_DECREF(pyret);
- return (ret);
+ return (KORE_RESULT_OK);
}
static int
@@ -372,7 +400,6 @@ python_runtime_wsmessage(void *addr, struct connection *c, u_int8_t op,
Py_DECREF(pyret);
}
-#endif
static void
python_runtime_execute(void *addr)
@@ -475,10 +502,8 @@ python_module_init(void)
python_integers[i].value);
}
-#if !defined(KORE_NO_HTTP)
python_push_type("pyhttp_file", pykore, &pyhttp_file_type);
python_push_type("pyhttp_request", pykore, &pyhttp_request_type);
-#endif
return (pykore);
}
@@ -519,6 +544,23 @@ python_push_integer(PyObject *module, const char *name, long value)
fatal("python_push_integer: failed to add %s", name);
}
+#if defined(KORE_USE_PGSQL)
+static PyObject *
+python_kore_pgsql_register(PyObject *self, PyObject *args)
+{
+ const char *db, *conninfo;
+
+ if (!PyArg_ParseTuple(args, "ss", &db, &conninfo)) {
+ PyErr_SetString(PyExc_TypeError, "invalid parameters");
+ return (NULL);
+ }
+
+ (void)kore_pgsql_register(db, conninfo);
+
+ Py_RETURN_TRUE;
+}
+#endif
+
static PyObject *
python_kore_log(PyObject *self, PyObject *args)
{
@@ -626,7 +668,6 @@ pyconnection_alloc(struct connection *c)
return ((PyObject *)pyc);
}
-#if !defined(KORE_NO_HTTP)
static PyObject *
pyhttp_request_alloc(struct http_request *req)
{
@@ -637,6 +678,7 @@ pyhttp_request_alloc(struct http_request *req)
return (NULL);
pyreq->req = req;
+ pyreq->data = NULL;
return ((PyObject *)pyreq);
}
@@ -910,7 +952,6 @@ python_websocket_send(PyObject *self, PyObject *args)
return (NULL);
}
-
kore_websocket_send(pyc->c, op, data.buf, data.len);
PyBuffer_Release(&data);
@@ -988,33 +1029,6 @@ pyhttp_get_agent(struct pyhttp_request *pyreq, void *closure)
return (agent);
}
-static int
-pyhttp_set_state(struct pyhttp_request *pyreq, PyObject *value, void *closure)
-{
- if (value == NULL) {
- PyErr_SetString(PyExc_TypeError,
- "pyhttp_set_state: value is NULL");
- return (-1);
- }
-
- Py_XDECREF(pyreq->req->py_object);
- pyreq->req->py_object = value;
- Py_INCREF(pyreq->req->py_object);
-
- return (0);
-}
-
-static PyObject *
-pyhttp_get_state(struct pyhttp_request *pyreq, void *closure)
-{
- if (pyreq->req->py_object == NULL)
- Py_RETURN_NONE;
-
- Py_INCREF(pyreq->req->py_object);
-
- return (pyreq->req->py_object);
-}
-
static PyObject *
pyhttp_get_method(struct pyhttp_request *pyreq, void *closure)
{
@@ -1031,6 +1045,10 @@ pyhttp_get_connection(struct pyhttp_request *pyreq, void *closure)
{
PyObject *pyc;
+ if (pyreq->req->owner == NULL) {
+ Py_RETURN_NONE;
+ }
+
if ((pyc = pyconnection_alloc(pyreq->req->owner)) == NULL)
return (PyErr_NoMemory());
@@ -1058,4 +1076,191 @@ pyhttp_file_get_filename(struct pyhttp_file *pyfile, void *closure)
return (name);
}
+
+#if defined(KORE_USE_PGSQL)
+static void
+pykore_pgsql_dealloc(struct pykore_pgsql *pysql)
+{
+ kore_free(pysql->db);
+ kore_free(pysql->query);
+ kore_pgsql_cleanup(&pysql->sql);
+
+ if (pysql->result != NULL)
+ Py_DECREF(pysql->result);
+
+ PyObject_Del((PyObject *)pysql);
+}
+
+static PyObject *
+pykore_pgsql_alloc(struct http_request *req, const char *db, const char *query)
+{
+ struct pykore_pgsql *pysql;
+
+ pysql = PyObject_New(struct pykore_pgsql, &pykore_pgsql_type);
+ if (pysql == NULL)
+ return (NULL);
+
+ pysql->req = req;
+ pysql->result = NULL;
+ pysql->db = kore_strdup(db);
+ pysql->query = kore_strdup(query);
+ pysql->state = PYKORE_PGSQL_INITIALIZE;
+
+ memset(&pysql->sql, 0, sizeof(pysql->sql));
+
+ return ((PyObject *)pysql);
+}
+
+static PyObject *
+pykore_pgsql_iternext(struct pykore_pgsql *pysql)
+{
+ switch (pysql->state) {
+ case PYKORE_PGSQL_INITIALIZE:
+ if (!kore_pgsql_query_init(&pysql->sql, pysql->req,
+ pysql->db, KORE_PGSQL_ASYNC)) {
+ if (pysql->sql.state == KORE_PGSQL_STATE_INIT)
+ break;
+ kore_pgsql_logerror(&pysql->sql);
+ PyErr_SetString(PyExc_RuntimeError, "pgsql error");
+ return (NULL);
+ }
+ /* fallthrough */
+ case PYKORE_PGSQL_QUERY:
+ if (!kore_pgsql_query(&pysql->sql, pysql->query)) {
+ kore_pgsql_logerror(&pysql->sql);
+ PyErr_SetString(PyExc_RuntimeError, "pgsql error");
+ return (NULL);
+ }
+ pysql->state = PYKORE_PGSQL_WAIT;
+ break;
+wait_again:
+ case PYKORE_PGSQL_WAIT:
+ switch (pysql->sql.state) {
+ case KORE_PGSQL_STATE_WAIT:
+ break;
+ case KORE_PGSQL_STATE_COMPLETE:
+ PyErr_SetNone(PyExc_StopIteration);
+ if (pysql->result != NULL) {
+ PyErr_SetObject(PyExc_StopIteration,
+ pysql->result);
+ Py_DECREF(pysql->result);
+ } else {
+ PyErr_SetObject(PyExc_StopIteration, Py_None);
+ }
+ return (NULL);
+ case KORE_PGSQL_STATE_ERROR:
+ kore_pgsql_logerror(&pysql->sql);
+ PyErr_SetString(PyExc_RuntimeError,
+ "failed to perform query");
+ return (NULL);
+ case KORE_PGSQL_STATE_RESULT:
+ if (!pykore_pgsql_result(pysql))
+ return (NULL);
+ goto wait_again;
+ default:
+ kore_pgsql_continue(pysql->req, &pysql->sql);
+ goto wait_again;
+ }
+ break;
+ default:
+ PyErr_SetString(PyExc_RuntimeError, "bad pykore_pgsql state");
+ return (NULL);
+ }
+
+ /* tell caller to wait. */
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+pykore_pgsql_await(PyObject *obj)
+{
+ Py_INCREF(obj);
+ return (obj);
+}
+
+int
+pykore_pgsql_result(struct pykore_pgsql *pysql)
+{
+ const char *val;
+ char key[64];
+ PyObject *list, *pyrow, *pyval;
+ int rows, row, field, fields;
+
+ if ((list = PyList_New(0)) == NULL) {
+ PyErr_SetNone(PyExc_MemoryError);
+ return (KORE_RESULT_ERROR);
+ }
+
+ rows = kore_pgsql_ntuples(&pysql->sql);
+ fields = kore_pgsql_nfields(&pysql->sql);
+
+ for (row = 0; row < rows; row++) {
+ if ((pyrow = PyDict_New()) == NULL) {
+ Py_DECREF(list);
+ PyErr_SetNone(PyExc_MemoryError);
+ return (KORE_RESULT_ERROR);
+ }
+
+ for (field = 0; field < fields; field++) {
+ val = kore_pgsql_getvalue(&pysql->sql, row, field);
+
+ pyval = PyUnicode_FromString(val);
+ if (pyval == NULL) {
+ Py_DECREF(pyrow);
+ Py_DECREF(list);
+ PyErr_SetNone(PyExc_MemoryError);
+ return (KORE_RESULT_ERROR);
+ }
+
+ (void)snprintf(key, sizeof(key), "%s",
+ kore_pgsql_fieldname(&pysql->sql, field));
+
+ if (PyDict_SetItemString(pyrow, key, pyval) == -1) {
+ Py_DECREF(pyval);
+ Py_DECREF(pyrow);
+ Py_DECREF(list);
+ PyErr_SetString(PyExc_RuntimeError,
+ "failed to add new value to row");
+ return (KORE_RESULT_ERROR);
+ }
+
+ Py_DECREF(pyval);
+ }
+
+ if (PyList_Insert(list, row, pyrow) == -1) {
+ Py_DECREF(pyrow);
+ Py_DECREF(list);
+ PyErr_SetString(PyExc_RuntimeError,
+ "failed to add new row to list");
+ return (KORE_RESULT_ERROR);
+ }
+
+ Py_DECREF(pyrow);
+ }
+
+ pysql->result = list;
+ kore_pgsql_continue(pysql->req, &pysql->sql);
+
+ return (KORE_RESULT_OK);
+}
+
+static PyObject *
+pyhttp_pgsql(struct pyhttp_request *pyreq, PyObject *args)
+{
+ PyObject *obj;
+ const char *db, *query;
+
+ if (!PyArg_ParseTuple(args, "ss", &db, &query)) {
+ PyErr_SetString(PyExc_TypeError, "invalid parameters");
+ return (NULL);
+ }
+
+ if ((obj = pykore_pgsql_alloc(pyreq->req, db, query)) == NULL)
+ return (PyErr_NoMemory());
+
+ Py_INCREF(obj);
+ pyreq->data = obj;
+
+ return ((PyObject *)obj);
+}
#endif
diff --git a/src/worker.c b/src/worker.c
@@ -410,7 +410,13 @@ kore_worker_entry(struct kore_worker *kw)
kore_python_cleanup();
#endif
+#if defined(KORE_USE_PGSQL)
+ kore_pgsql_sys_cleanup();
+#endif
+
kore_debug("worker %d shutting down", kw->id);
+
+ kore_mem_cleanup();
exit(0);
}