kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 3cc7d6e2380fe9b83a7212e58d1f31750ad5891e
parent 9cc58d45c18b2acb1e572a8018b45c6aca081b5a
Author: Joris Vink <joris@coders.se>
Date:   Fri,  7 Jun 2019 21:06:54 +0200

Allow kore.prerequests to be async.

Diffstat:
include/kore/http.h | 3+++
include/kore/python_methods.h | 4++++
src/http.c | 3+++
src/python.c | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------
4 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/include/kore/http.h b/include/kore/http.h @@ -230,6 +230,7 @@ struct http_file { #define HTTP_BODY_DIGEST_LEN 32 #define HTTP_BODY_DIGEST_STRLEN ((HTTP_BODY_DIGEST_LEN * 2) + 1) +struct reqcall; struct kore_task; struct http_client; @@ -263,7 +264,9 @@ struct http_request { void (*onfree)(struct http_request *); #if defined(KORE_USE_PYTHON) + void *py_req; void *py_coro; + struct reqcall *py_rqnext; #endif u_int8_t http_body_digest[HTTP_BODY_DIGEST_LEN]; diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -543,6 +543,10 @@ static PyTypeObject pyconnection_type = { .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, }; +#define PYHTTP_STATE_INIT 0 +#define PYHTTP_STATE_PREPROCESS 1 +#define PYHTTP_STATE_RUN 2 + struct pyhttp_request { PyObject_HEAD struct http_request *req; diff --git a/src/http.c b/src/http.c @@ -442,6 +442,7 @@ http_request_free(struct http_request *req) kore_python_coro_delete(req->py_coro); req->py_coro = NULL; } + Py_XDECREF(req->py_req); #endif #if defined(KORE_USE_PGSQL) while (!LIST_EMPTY(&(req->pgsqls))) { @@ -1621,7 +1622,9 @@ http_request_new(struct connection *c, const char *host, req->path = path; #if defined(KORE_USE_PYTHON) + req->py_req = NULL; req->py_coro = NULL; + req->py_rqnext = NULL; #endif if (qsoff > 0) { diff --git a/src/python.c b/src/python.c @@ -83,6 +83,7 @@ static void pysuspend_wakeup(void *, u_int64_t); static void pygather_reap_coro(struct pygather_op *, struct python_coro *); +static int pyhttp_preprocess(struct http_request *); static int pyhttp_iterobj_chunk_sent(struct netbuf *); static int pyhttp_iterobj_next(struct pyhttp_iterobj *); static void pyhttp_iterobj_disconnect(struct connection *); @@ -630,6 +631,7 @@ pyconnection_dealloc(struct pyconnection *pyc) static void pyhttp_dealloc(struct pyhttp_request *pyreq) { + printf("http request deallocated\n"); Py_XDECREF(pyreq->dict); Py_XDECREF(pyreq->data); PyObject_Del((PyObject *)pyreq); @@ -644,48 +646,55 @@ pyhttp_file_dealloc(struct pyhttp_file *pyfile) static int python_runtime_http_request(void *addr, struct http_request *req) { - struct reqcall *rq; - PyObject *pyret, *pyreq, *args, *callable; + int ret; + PyObject *pyret, *args, *callable; if (req->py_coro != NULL) { python_coro_wakeup(req->py_coro); if (python_coro_run(req->py_coro) == KORE_RESULT_OK) { kore_python_coro_delete(req->py_coro); req->py_coro = NULL; - return (KORE_RESULT_OK); + + if (req->fsm_state != PYHTTP_STATE_PREPROCESS) + return (KORE_RESULT_OK); } return (KORE_RESULT_RETRY); } - callable = (PyObject *)addr; - - if ((pyreq = pyhttp_request_alloc(req)) == NULL) - fatal("python_runtime_http_request: pyreq alloc failed"); - - LIST_FOREACH(rq, &prereq, list) { - PyErr_Clear(); - pyret = PyObject_CallFunctionObjArgs(rq->f, pyreq, NULL); - - if (pyret == NULL) { - Py_DECREF(pyreq); - kore_python_log_error("prerequest"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); + switch (req->fsm_state) { + case PYHTTP_STATE_INIT: + req->py_rqnext = LIST_FIRST(&prereq); + req->fsm_state = PYHTTP_STATE_PREPROCESS; + if (req->py_req == NULL) { + if ((req->py_req = pyhttp_request_alloc(req)) == NULL) + fatal("%s: pyreq alloc failed", __func__); } - - if (pyret == Py_False) { - Py_DECREF(pyreq); - Py_DECREF(pyret); + /* fallthrough */ + case PYHTTP_STATE_PREPROCESS: + ret = pyhttp_preprocess(req); + switch (ret) { + case KORE_RESULT_OK: + req->fsm_state = PYHTTP_STATE_RUN; + break; + case KORE_RESULT_RETRY: + return (KORE_RESULT_RETRY); + case KORE_RESULT_ERROR: return (KORE_RESULT_OK); + default: + fatal("invalid state pyhttp state %d", req->fsm_state); } - - Py_DECREF(pyret); + /* fallthrough */ + case PYHTTP_STATE_RUN: + break; } + callable = (PyObject *)addr; + if ((args = PyTuple_New(1)) == NULL) fatal("python_runtime_http_request: PyTuple_New failed"); - if (PyTuple_SetItem(args, 0, pyreq) != 0) + Py_INCREF(req->py_req); + if (PyTuple_SetItem(args, 0, req->py_req) != 0) fatal("python_runtime_http_request: PyTuple_SetItem failed"); PyErr_Clear(); @@ -722,7 +731,12 @@ python_runtime_validator(void *addr, struct http_request *req, const void *data) { int ret; struct python_coro *coro; - PyObject *pyret, *pyreq, *args, *callable, *arg; + PyObject *pyret, *args, *callable, *arg; + + if (req->py_req == NULL) { + if ((req->py_req = pyhttp_request_alloc(req)) == NULL) + fatal("%s: pyreq alloc failed", __func__); + } if (req->py_coro != NULL) { coro = req->py_coro; @@ -740,25 +754,21 @@ python_runtime_validator(void *addr, struct http_request *req, const void *data) callable = (PyObject *)addr; if (req->flags & HTTP_VALIDATOR_IS_REQUEST) { - if ((arg = pyhttp_request_alloc(data)) == NULL) - fatal("%s: pyreq failed", __func__); - if ((args = PyTuple_New(1)) == NULL) fatal("%s: PyTuple_New failed", __func__); - if (PyTuple_SetItem(args, 0, arg) != 0) + Py_INCREF(req->py_req); + if (PyTuple_SetItem(args, 0, req->py_req) != 0) fatal("%s: PyTuple_SetItem failed", __func__); } else { - if ((pyreq = pyhttp_request_alloc(req)) == NULL) - fatal("%s: pyreq alloc failed", __func__); - if ((arg = PyUnicode_FromString(data)) == NULL) fatal("python_runtime_validator: PyUnicode failed"); if ((args = PyTuple_New(2)) == NULL) fatal("%s: PyTuple_New failed", __func__); - if (PyTuple_SetItem(args, 0, pyreq) != 0 || + Py_INCREF(req->py_req); + if (PyTuple_SetItem(args, 0, req->py_req) != 0 || PyTuple_SetItem(args, 1, arg) != 0) fatal("%s: PyTuple_SetItem failed", __func__); } @@ -3179,6 +3189,50 @@ pyhttp_file_alloc(struct http_file *file) return ((PyObject *)pyfile); } +static int +pyhttp_preprocess(struct http_request *req) +{ + struct reqcall *rq; + PyObject *ret; + + rq = req->py_rqnext; + + while (rq) { + req->py_rqnext = LIST_NEXT(rq, list); + + PyErr_Clear(); + ret = PyObject_CallFunctionObjArgs(rq->f, req->py_req, NULL); + + if (ret == NULL) { + kore_python_log_error("preprocess"); + http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); + return (KORE_RESULT_ERROR); + } + + if (ret == Py_False) { + Py_DECREF(ret); + return (KORE_RESULT_ERROR); + } + + if (PyCoro_CheckExact(ret)) { + req->py_coro = python_coro_create(ret, req); + if (python_coro_run(req->py_coro) == KORE_RESULT_OK) { + http_request_wakeup(req); + kore_python_coro_delete(req->py_coro); + req->py_coro = NULL; + rq = req->py_rqnext; + continue; + } + return (KORE_RESULT_RETRY); + } + + Py_DECREF(ret); + rq = req->py_rqnext; + } + + return (KORE_RESULT_OK); +} + static PyObject * pyhttp_response(struct pyhttp_request *pyreq, PyObject *args) {