kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 2dd66586ffff57177278c5478993f886e4b89c0d
parent 5456f2e1d5656398347377e7a830fc47b7cfefbf
Author: Joris Vink <joris@coders.se>
Date:   Tue, 30 Oct 2018 20:28:27 +0100

several python improvements.

- add kore.time() as equivalent for kore_time_ms().
- call waitpid() until no more children are available for reaping otherwise
  we risk missing a process if several die at the same time and only one
  SIGCHLD is delivered to us.
- drain a RECV socket operation if eof is set but no exception was given.

Diffstat:
include/kore/python_methods.h | 2++
src/http.c | 4++++
src/python.c | 75++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
3 files changed, 52 insertions(+), 29 deletions(-)

diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -32,6 +32,7 @@ struct python_coro { TAILQ_HEAD(coro_list, python_coro); static PyObject *python_kore_log(PyObject *, PyObject *); +static PyObject *python_kore_time(PyObject *, PyObject *); static PyObject *python_kore_lock(PyObject *, PyObject *); static PyObject *python_kore_proc(PyObject *, PyObject *); static PyObject *python_kore_bind(PyObject *, PyObject *); @@ -59,6 +60,7 @@ static PyObject *python_websocket_broadcast(PyObject *, PyObject *); static struct PyMethodDef pykore_methods[] = { METHOD("log", python_kore_log, METH_VARARGS), + METHOD("time", python_kore_time, METH_NOARGS), METHOD("lock", python_kore_lock, METH_NOARGS), METHOD("proc", python_kore_proc, METH_VARARGS), METHOD("bind", python_kore_bind, METH_VARARGS), diff --git a/src/http.c b/src/http.c @@ -1501,6 +1501,10 @@ http_request_new(struct connection *c, const char *host, req->host = host; req->path = path; +#if defined(KORE_USE_PYTHON) + req->py_coro = NULL; +#endif + if (qsoff > 0) { req->query_string = path + qsoff; *(req->query_string)++ = '\0'; diff --git a/src/python.c b/src/python.c @@ -324,33 +324,43 @@ kore_python_proc_reap(void) pid_t child; int status; - if ((child = waitpid(-1, &status, 0)) == -1) { - if (errno == ECHILD) + for (;;) { + if ((child = waitpid(-1, &status, WNOHANG)) == -1) { + if (errno == ECHILD) + return; + if (errno == EINTR) + continue; + kore_log(LOG_NOTICE, "waitpid: %s", errno_s); return; - kore_log(LOG_NOTICE, "waitpid: %s", errno_s); - return; - } + } - proc = NULL; + if (child == 0) + return; - TAILQ_FOREACH(proc, &procs, list) { - if (proc->pid == child) - break; - } + proc = NULL; - if (proc == NULL) { - kore_log(LOG_NOTICE, "SIGCHLD for unknown proc (%d)", child); - return; - } + TAILQ_FOREACH(proc, &procs, list) { + if (proc->pid == child) + break; + } - proc->pid = -1; - proc->reaped = 1; - proc->status = status; + if (proc == NULL) + continue; - if (proc->coro->request != NULL) - http_request_wakeup(proc->coro->request); - else - python_coro_wakeup(proc->coro); + proc->pid = -1; + proc->reaped = 1; + proc->status = status; + + if (proc->timer != NULL) { + kore_timer_remove(proc->timer); + proc->timer = NULL; + } + + if (proc->coro->request != NULL) + http_request_wakeup(proc->coro->request); + else + python_coro_wakeup(proc->coro); + } } static void * @@ -881,6 +891,16 @@ python_kore_log(PyObject *self, PyObject *args) } static PyObject * +python_kore_time(PyObject *self, PyObject *args) +{ + u_int64_t now; + + now = kore_time_ms(); + + return (PyLong_FromUnsignedLongLong(now)); +} + +static PyObject * python_kore_bind(PyObject *self, PyObject *args) { const char *ip, *port; @@ -1238,11 +1258,8 @@ python_kore_proc(PyObject *self, PyObject *args) close(out_pipe[1]); if (!kore_connection_nonblock(in_pipe[1], 0) || - !kore_connection_nonblock(out_pipe[0], 0)) { - Py_DECREF((PyObject *)proc); - PyErr_SetString(PyExc_RuntimeError, errno_s); - return (NULL); - } + !kore_connection_nonblock(out_pipe[0], 0)) + fatal("failed to mark kore.proc pipes are non-blocking"); proc->in->fd = in_pipe[1]; proc->out->fd = out_pipe[0]; @@ -1705,9 +1722,9 @@ pysocket_op_iternext(struct pysocket_op *op) return (NULL); } - PyErr_SetNone(PyExc_StopIteration); - - return (NULL); + /* Drain the recv socket. */ + op->data.evt.flags |= KORE_EVENT_READ; + return (pysocket_async_recv(op)); } switch (op->data.type) {