kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit e2651889e089315e88e468cbb92321e65a346da9
parent ea7ea488401a0eadc3ba3d3c69fc46da1b990410
Author: Joris Vink <joris@coders.se>
Date:   Fri, 26 Oct 2018 19:19:47 +0200

Add asynchronous subprocess support.

This adds kore.proc to the python runtime allowing async processing
handling:

The kore.proc method takes the command to run and an optional timeout
parameter in milliseconds. If the process did not exit normally after
that amount of time a TimeoutError exception is raised.

For instance:

async def run(cmd):
	proc = kore.proc(cmd, 1000)

	try:
		await proc.send("hello")
		proc.close_stdin()
	except TimeoutError:
		proc.kill()

	retcode = await proc.reap()

	return retcode

Diffstat:
include/kore/python_api.h | 1+
include/kore/python_methods.h | 73++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/connection.c | 6++++++
src/kore.c | 13++++++++++++-
src/python.c | 521++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
src/worker.c | 43++++++++++++++++++++++++-------------------
6 files changed, 578 insertions(+), 79 deletions(-)

diff --git a/include/kore/python_api.h b/include/kore/python_api.h @@ -26,6 +26,7 @@ void kore_python_init(void); void kore_python_cleanup(void); void kore_python_coro_run(void); +void kore_python_proc_reap(void); void kore_python_path(const char *); void kore_python_coro_delete(void *); void kore_python_log_error(const char *); diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -20,14 +20,17 @@ struct python_coro { u_int64_t id; int state; - int error; PyObject *obj; + struct pysocket_op *sockop; struct http_request *request; + PyObject *exception; + char *exception_msg; TAILQ_ENTRY(python_coro) list; }; static PyObject *python_kore_log(PyObject *, PyObject *); static PyObject *python_kore_lock(PyObject *, PyObject *); +static PyObject *python_kore_proc(PyObject *, PyObject *); static PyObject *python_kore_bind(PyObject *, PyObject *); static PyObject *python_kore_timer(PyObject *, PyObject *); static PyObject *python_kore_fatal(PyObject *, PyObject *); @@ -53,6 +56,7 @@ static PyObject *python_websocket_broadcast(PyObject *, PyObject *); static struct PyMethodDef pykore_methods[] = { METHOD("log", python_kore_log, METH_VARARGS), METHOD("lock", python_kore_lock, METH_NOARGS), + METHOD("proc", python_kore_proc, METH_VARARGS), METHOD("bind", python_kore_bind, METH_VARARGS), METHOD("timer", python_kore_timer, METH_VARARGS), METHOD("queue", python_kore_queue, METH_VARARGS), @@ -182,6 +186,7 @@ static PyTypeObject pysocket_type = { struct pysocket_data { struct kore_event evt; int fd; + int eof; int type; void *self; struct python_coro *coro; @@ -344,6 +349,72 @@ static PyTypeObject pylock_op_type = { .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, }; +struct pyproc { + PyObject_HEAD + pid_t pid; + int reaped; + int status; + struct pysocket *in; + struct pysocket *out; + struct python_coro *coro; + struct kore_timer *timer; + TAILQ_ENTRY(pyproc) list; +}; + +static void pyproc_dealloc(struct pyproc *); + +static PyObject *pyproc_kill(struct pyproc *, PyObject *); +static PyObject *pyproc_reap(struct pyproc *, PyObject *); +static PyObject *pyproc_recv(struct pyproc *, PyObject *); +static PyObject *pyproc_send(struct pyproc *, PyObject *); +static PyObject *pyproc_close_stdin(struct pyproc *, PyObject *); + +static PyMethodDef pyproc_methods[] = { + METHOD("kill", pyproc_kill, METH_NOARGS), + METHOD("reap", pyproc_reap, METH_NOARGS), + METHOD("recv", pyproc_recv, METH_VARARGS), + METHOD("send", pyproc_send, METH_VARARGS), + METHOD("close_stdin", pyproc_close_stdin, METH_NOARGS), + METHOD(NULL, NULL, -1), +}; + +static PyTypeObject pyproc_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.proc", + .tp_doc = "async process", + .tp_methods = pyproc_methods, + .tp_basicsize = sizeof(struct pyproc), + .tp_dealloc = (destructor)pyproc_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + +struct pyproc_op { + PyObject_HEAD + struct pyproc *proc; +}; + +static void pyproc_op_dealloc(struct pyproc_op *); + +static PyObject *pyproc_op_await(PyObject *); +static PyObject *pyproc_op_iternext(struct pyproc_op *); + +static PyAsyncMethods pyproc_op_async = { + (unaryfunc)pyproc_op_await, + NULL, + NULL +}; + +static PyTypeObject pyproc_op_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.proc_op", + .tp_doc = "proc reaper awaitable", + .tp_as_async = &pyproc_op_async, + .tp_iternext = (iternextfunc)pyproc_op_iternext, + .tp_basicsize = sizeof(struct pyproc_op), + .tp_dealloc = (destructor)pyproc_op_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + struct pyconnection { PyObject_HEAD struct connection *c; diff --git a/src/connection.c b/src/connection.c @@ -135,6 +135,12 @@ kore_connection_accept(struct listener *listener, struct connection **out) return (KORE_RESULT_ERROR); } + if (fcntl(c->fd, F_SETFD, FD_CLOEXEC) == -1) { + close(c->fd); + kore_pool_put(&connection_pool, c); + return (KORE_RESULT_ERROR); + } + c->handle = kore_connection_handle; TAILQ_INSERT_TAIL(&connections, c, list); diff --git a/src/kore.c b/src/kore.c @@ -21,6 +21,7 @@ #include <sys/socket.h> #include <sys/resource.h> +#include <fcntl.h> #include <stdio.h> #include <netdb.h> #include <signal.h> @@ -417,6 +418,12 @@ kore_listener_alloc(int family, const char *ccb) return (NULL); } + if (fcntl(l->fd, F_SETFD, FD_CLOEXEC) == -1) { + kore_listener_free(l); + kore_log(LOG_ERR, "fcntl(): %s", errno_s); + return (NULL); + } + if (!kore_connection_nonblock(l->fd, family != AF_UNIX)) { kore_listener_free(l); kore_log(LOG_ERR, "kore_connection_nonblock(): %s", errno_s); @@ -519,6 +526,8 @@ kore_signal_setup(void) fatal("sigaction: %s", errno_s); if (sigaction(SIGUSR1, &sa, NULL) == -1) fatal("sigaction: %s", errno_s); + if (sigaction(SIGCHLD, &sa, NULL) == -1) + fatal("sigaction: %s", errno_s); if (foreground) { if (sigaction(SIGINT, &sa, NULL) == -1) @@ -644,6 +653,9 @@ kore_server_start(int argc, char *argv[]) case SIGUSR1: kore_worker_dispatch_signal(sig_recv); break; + case SIGCHLD: + kore_worker_wait(0); + break; default: break; } @@ -651,7 +663,6 @@ kore_server_start(int argc, char *argv[]) sig_recv = 0; } - kore_worker_wait(0); kore_platform_event_wait(100); kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); } diff --git a/src/python.c b/src/python.c @@ -17,8 +17,10 @@ #include <sys/types.h> #include <sys/socket.h> +#include <sys/wait.h> #include <libgen.h> +#include <signal.h> #include "kore.h" #include "http.h" @@ -47,6 +49,7 @@ static void pysocket_evt_handle(void *, int); static PyObject *pysocket_op_create(struct pysocket *, int, const void *, size_t); +static struct pysocket *pysocket_alloc(void); static PyObject *pysocket_async_recv(struct pysocket_op *); static PyObject *pysocket_async_send(struct pysocket_op *); static PyObject *pysocket_async_accept(struct pysocket_op *); @@ -55,6 +58,7 @@ static PyObject *pysocket_async_connect(struct pysocket_op *); static void pylock_do_release(struct pylock *); static void pytimer_run(void *, u_int64_t); +static void pyproc_timeout(void *, u_int64_t); static void pysuspend_wakeup(void *, u_int64_t); #if defined(KORE_USE_PGSQL) @@ -145,6 +149,8 @@ static PyMemAllocatorEx allocator = { .free = python_free }; +static TAILQ_HEAD(, pyproc) procs; + static struct kore_pool coro_pool; static struct kore_pool queue_wait_pool; static struct kore_pool queue_object_pool; @@ -166,6 +172,7 @@ kore_python_init(void) coro_id = 0; coro_count = 0; + TAILQ_INIT(&procs); TAILQ_INIT(&coro_runnable); TAILQ_INIT(&coro_suspended); @@ -216,9 +223,11 @@ kore_python_coro_run(void) } /* - * If something was woken up, let Kore do HTTP processing - * so they run ASAP without having to wait for a tick from - * the event loop. + * Let Kore do HTTP processing so awoken coroutines run asap without + * having to wait for a tick from the event loop. + * + * Maybe it is more beneficial that we track if something related + * to HTTP requests was awoken and only run if true? */ http_process(); } @@ -243,11 +252,11 @@ kore_python_coro_delete(void *obj) kore_pool_put(&coro_pool, coro); } -/* XXX - Fix this (show error type + traceback). */ void kore_python_log_error(const char *function) { - PyObject *type, *value, *traceback; + const char *sval; + PyObject *repr, *type, *value, *traceback; if (!PyErr_Occurred() || PyErr_ExceptionMatches(PyExc_StopIteration)) return; @@ -259,17 +268,56 @@ kore_python_log_error(const char *function) return; } - kore_log(LOG_ERR, - "python exception in '%s' - type:%s - value:%s", - function, - PyUnicode_AsUTF8AndSize(type, NULL), - PyUnicode_AsUTF8AndSize(value, NULL)); + if ((repr = PyObject_Repr(value)) == NULL) + sval = "unknown"; + else + sval = PyUnicode_AsUTF8(repr); + + kore_log(LOG_ERR, "uncaught exception %s in '%s'", sval, function); + + Py_XDECREF(repr); Py_DECREF(type); Py_DECREF(value); Py_XDECREF(traceback); } +void +kore_python_proc_reap(void) +{ + struct pyproc *proc; + pid_t child; + int status; + + if ((child = waitpid(-1, &status, 0)) == -1) { + if (errno == ECHILD) + return; + kore_log(LOG_NOTICE, "waitpid: %s", errno_s); + return; + } + + proc = NULL; + + TAILQ_FOREACH(proc, &procs, list) { + if (proc->pid == child) + break; + } + + if (proc == NULL) { + kore_log(LOG_NOTICE, "SIGCHLD for unknown proc (%d)", child); + return; + } + + proc->pid = -1; + proc->reaped = 1; + proc->status = status; + + if (proc->coro->request != NULL) + http_request_wakeup(proc->coro->request); + else + python_coro_wakeup(proc->coro); +} + static void * python_malloc(void *ctx, size_t len) { @@ -342,8 +390,11 @@ python_coro_create(PyObject *obj, struct http_request *req) coro = kore_pool_get(&coro_pool); coro_count++; + coro->sockop = NULL; + coro->exception = NULL; + coro->exception_msg = NULL; + coro->obj = obj; - coro->error = 0; coro->request = req; coro->id = coro_id++; coro->state = CORO_STATE_RUNNABLE; @@ -369,9 +420,6 @@ python_coro_run(struct python_coro *coro) for (;;) { PyErr_Clear(); - if (coro->error) - PyErr_SetString(PyExc_RuntimeError, "i/o error"); - item = _PyGen_Send((PyGenObject *)coro->obj, NULL); if (item == NULL) { kore_python_log_error("coroutine"); @@ -721,6 +769,7 @@ python_module_init(void) if ((pykore = PyModule_Create(&pykore_module)) == NULL) fatal("python_module_init: failed to setup pykore module"); + python_push_type("pyproc", pykore, &pyproc_type); python_push_type("pylock", pykore, &pylock_type); python_push_type("pytimer", pykore, &pytimer_type); python_push_type("pyqueue", pykore, &pyqueue_type); @@ -875,7 +924,7 @@ python_kore_socket_wrap(PyObject *self, PyObject *args) if ((pyproto = PyObject_GetAttrString(pysock, "proto")) == NULL) goto out; - if ((sock = PyObject_New(struct pysocket, &pysocket_type)) == NULL) + if ((sock = pysocket_alloc()) == NULL) goto out; sock->socket = pysock; @@ -965,22 +1014,22 @@ python_kore_fatalx(PyObject *self, PyObject *args) static PyObject * python_kore_suspend(PyObject *self, PyObject *args) { - struct pysuspend_op *sop; + struct pysuspend_op *op; int delay; if (!PyArg_ParseTuple(args, "i", &delay)) return (NULL); - sop = PyObject_New(struct pysuspend_op, &pysuspend_op_type); - if (sop == NULL) + op = PyObject_New(struct pysuspend_op, &pysuspend_op_type); + if (op == NULL) return (NULL); - sop->timer = NULL; - sop->delay = delay; - sop->coro = coro_running; - sop->state = PYSUSPEND_OP_INIT; + op->timer = NULL; + op->delay = delay; + op->coro = coro_running; + op->state = PYSUSPEND_OP_INIT; - return ((PyObject *)sop); + return ((PyObject *)op); } static PyObject * @@ -1021,6 +1070,108 @@ python_kore_timer(PyObject *self, PyObject *args) } static PyObject * +python_kore_proc(PyObject *self, PyObject *args) +{ + const char *cmd; + struct pyproc *proc; + char *copy, *argv[30]; + int timeo, in_pipe[2], out_pipe[2]; + + timeo = -1; + + if (coro_running == NULL) { + PyErr_SetString(PyExc_RuntimeError, + "kore.proc only available in coroutines"); + return (NULL); + } + + if (!PyArg_ParseTuple(args, "s|i", &cmd, &timeo)) + return (NULL); + + if (pipe(in_pipe) == -1) { + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + if (pipe(out_pipe) == -1) { + close(in_pipe[0]); + close(in_pipe[1]); + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + if ((proc = PyObject_New(struct pyproc, &pyproc_type)) == NULL) { + close(in_pipe[0]); + close(in_pipe[1]); + close(out_pipe[0]); + close(out_pipe[1]); + return (NULL); + } + + proc->pid = -1; + proc->reaped = 0; + proc->status = 0; + proc->timer = NULL; + proc->coro = coro_running; + proc->in = pysocket_alloc(); + proc->out = pysocket_alloc(); + + if (proc->in == NULL || proc->out == NULL) { + Py_DECREF((PyObject *)proc); + return (NULL); + } + + TAILQ_INSERT_TAIL(&procs, proc, list); + + proc->pid = fork(); + if (proc->pid == -1) { + if (errno == ENOSYS) { + Py_DECREF((PyObject *)proc); + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + fatal("python_kore_proc: fork(): %s", errno_s); + } + + if (proc->pid == 0) { + close(in_pipe[1]); + close(out_pipe[0]); + + if (dup2(out_pipe[1], STDOUT_FILENO) == -1 || + dup2(out_pipe[1], STDERR_FILENO) == -1 || + dup2(in_pipe[0], STDIN_FILENO) == -1) + fatal("dup2: %s", errno_s); + + copy = kore_strdup(cmd); + kore_split_string(copy, " ", argv, 30); + (void)execve(argv[0], argv, NULL); + printf("kore.proc failed to execute %s (%s)\n", + argv[0], errno_s); + exit(1); + } + + close(in_pipe[0]); + close(out_pipe[1]); + + if (!kore_connection_nonblock(in_pipe[1], 0) || + !kore_connection_nonblock(out_pipe[0], 0)) { + Py_DECREF((PyObject *)proc); + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + proc->in->fd = in_pipe[1]; + proc->out->fd = out_pipe[0]; + + if (timeo != -1) { + proc->timer = kore_timer_add(pyproc_timeout, + timeo, proc, KORE_TIMER_ONESHOT); + } + + return ((PyObject *)proc); +} + +static PyObject * python_import(const char *path) { PyObject *module; @@ -1175,14 +1326,14 @@ pytimer_close(struct pytimer *timer, PyObject *args) } static void -pysuspend_op_dealloc(struct pysuspend_op *sop) +pysuspend_op_dealloc(struct pysuspend_op *op) { - if (sop->timer != NULL) { - kore_timer_remove(sop->timer); - sop->timer = NULL; + if (op->timer != NULL) { + kore_timer_remove(op->timer); + op->timer = NULL; } - PyObject_Del((PyObject *)sop); + PyObject_Del((PyObject *)op); } static PyObject * @@ -1193,13 +1344,13 @@ pysuspend_op_await(PyObject *sop) } static PyObject * -pysuspend_op_iternext(struct pysuspend_op *sop) +pysuspend_op_iternext(struct pysuspend_op *op) { - switch (sop->state) { + switch (op->state) { case PYSUSPEND_OP_INIT: - sop->timer = kore_timer_add(pysuspend_wakeup, sop->delay, - sop, KORE_TIMER_ONESHOT); - sop->state = PYSUSPEND_OP_WAIT; + op->timer = kore_timer_add(pysuspend_wakeup, op->delay, + op, KORE_TIMER_ONESHOT); + op->state = PYSUSPEND_OP_WAIT; break; case PYSUSPEND_OP_WAIT: break; @@ -1207,7 +1358,7 @@ pysuspend_op_iternext(struct pysuspend_op *sop) PyErr_SetNone(PyExc_StopIteration); return (NULL); default: - fatal("unknown state %d for pysuspend_op", sop->state); + fatal("unknown state %d for pysuspend_op", op->state); } Py_RETURN_NONE; @@ -1216,20 +1367,43 @@ pysuspend_op_iternext(struct pysuspend_op *sop) static void pysuspend_wakeup(void *arg, u_int64_t now) { - struct pysuspend_op *sop = arg; + struct pysuspend_op *op = arg; - sop->timer = NULL; - sop->state = PYSUSPEND_OP_CONTINUE; + op->timer = NULL; + op->state = PYSUSPEND_OP_CONTINUE; - if (sop->coro->request != NULL) - http_request_wakeup(sop->coro->request); + if (op->coro->request != NULL) + http_request_wakeup(op->coro->request); else - python_coro_wakeup(sop->coro); + python_coro_wakeup(op->coro); +} + +static struct pysocket * +pysocket_alloc(void) +{ + struct pysocket *sock; + + if ((sock = PyObject_New(struct pysocket, &pysocket_type)) == NULL) + return (NULL); + + sock->fd = -1; + sock->family = -1; + sock->protocol = -1; + sock->socket = NULL; + + return (sock); } static void pysocket_dealloc(struct pysocket *sock) { + if (sock->socket != NULL) { + (void)PyObject_CallMethod(sock->socket, "close", NULL); + Py_DECREF(sock->socket); + } else if (sock->fd != -1) { + (void)close(sock->fd); + } + PyObject_Del((PyObject *)sock); } @@ -1317,8 +1491,10 @@ pysocket_close(struct pysocket *sock, PyObject *args) if (sock->socket != NULL) { (void)PyObject_CallMethod(sock->socket, "close", NULL); Py_DECREF(sock->socket); + sock->socket = NULL; } else if (sock->fd != -1) { (void)close(sock->fd); + sock->fd = -1; } Py_RETURN_TRUE; @@ -1349,6 +1525,7 @@ pysocket_op_dealloc(struct pysocket_op *op) op->data.type == PYSOCKET_TYPE_SEND) kore_buf_cleanup(&op->data.buffer); + op->data.coro->sockop = NULL; Py_DECREF(op->data.socket); PyObject_Del((PyObject *)op); @@ -1359,6 +1536,9 @@ pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len) { struct pysocket_op *op; + if (coro_running->sockop != NULL) + fatal("pysocket_op_create: coro has active socketop"); + op = PyObject_New(struct pysocket_op, &pysocket_op_type); if (op == NULL) return (NULL); @@ -1376,6 +1556,7 @@ pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len) op->data.fd = sock->fd; #endif + op->data.eof = 0; op->data.self = op; op->data.type = type; op->data.socket = sock; @@ -1384,6 +1565,7 @@ pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len) op->data.evt.type = KORE_TYPE_PYSOCKET; op->data.evt.handle = pysocket_evt_handle; + coro_running->sockop = op; Py_INCREF(op->data.socket); switch (type) { @@ -1426,6 +1608,24 @@ pysocket_op_iternext(struct pysocket_op *op) { PyObject *ret; + if (op->data.eof) { + if (op->data.coro->exception != NULL) { + PyErr_SetString(op->data.coro->exception, + op->data.coro->exception_msg); + op->data.coro->exception = NULL; + return (NULL); + } + + if (op->data.type != PYSOCKET_TYPE_RECV) { + PyErr_SetString(PyExc_RuntimeError, "socket EOF"); + return (NULL); + } + + PyErr_SetNone(PyExc_StopIteration); + + return (NULL); + } + switch (op->data.type) { case PYSOCKET_TYPE_CONNECT: ret = pysocket_async_connect(op); @@ -1473,7 +1673,7 @@ pysocket_async_accept(struct pysocket_op *op) int fd; struct pysocket *sock; - if ((sock = PyObject_New(struct pysocket, &pysocket_type)) == NULL) + if ((sock = pysocket_alloc() ) == NULL) return (NULL); sock->addr_len = sizeof(sock->addr); @@ -1516,14 +1716,21 @@ pysocket_async_recv(struct pysocket_op *op) Py_RETURN_NONE; } - ret = read(op->data.fd, op->data.buffer.data, op->data.buffer.length); - if (ret == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - op->data.evt.flags &= ~KORE_EVENT_READ; - Py_RETURN_NONE; + for (;;) { + ret = read(op->data.fd, op->data.buffer.data, + op->data.buffer.length); + if (ret == -1) { + if (errno == EINTR) + continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + op->data.evt.flags &= ~KORE_EVENT_READ; + Py_RETURN_NONE; + } + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); } - PyErr_SetString(PyExc_RuntimeError, errno_s); - return (NULL); + + break; } if (ret == 0) { @@ -1551,15 +1758,21 @@ pysocket_async_send(struct pysocket_op *op) Py_RETURN_NONE; } - ret = write(op->data.fd, op->data.buffer.data + op->data.buffer.offset, - op->data.buffer.length - op->data.buffer.offset); - if (ret == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - op->data.evt.flags &= ~KORE_EVENT_WRITE; - Py_RETURN_NONE; + for (;;) { + ret = write(op->data.fd, + op->data.buffer.data + op->data.buffer.offset, + op->data.buffer.length - op->data.buffer.offset); + if (ret == -1) { + if (errno == EINTR) + continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + op->data.evt.flags &= ~KORE_EVENT_WRITE; + Py_RETURN_NONE; + } + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); } - PyErr_SetString(PyExc_RuntimeError, errno_s); - return (NULL); + break; } op->data.buffer.offset += (size_t)ret; @@ -1573,11 +1786,14 @@ pysocket_async_send(struct pysocket_op *op) } static void -pysocket_evt_handle(void *arg, int error) +pysocket_evt_handle(void *arg, int eof) { struct pysocket_data *data = arg; struct python_coro *coro = data->coro; + if (coro->sockop == NULL) + fatal("pysocket_evt_handle: sockop == NULL"); + /* * If we are a coroutine tied to an HTTP request wake-up the * HTTP request instead. That in turn will wakeup the coro and @@ -1590,7 +1806,7 @@ pysocket_evt_handle(void *arg, int error) else python_coro_wakeup(coro); - coro->error = error; + coro->sockop->data.eof = eof; } static void @@ -1880,6 +2096,195 @@ pylock_op_iternext(struct pylock_op *op) return (NULL); } +static void +pyproc_timeout(void *arg, u_int64_t now) +{ + struct pyproc *proc = arg; + + proc->timer = NULL; + + if (proc->coro->sockop != NULL) + proc->coro->sockop->data.eof = 1; + + proc->coro->exception = PyExc_TimeoutError; + proc->coro->exception_msg = "timeout before process exited"; + + if (proc->coro->request != NULL) + http_request_wakeup(proc->coro->request); + else + python_coro_wakeup(proc->coro); +} + +static void +pyproc_dealloc(struct pyproc *proc) +{ + int status; + + TAILQ_REMOVE(&procs, proc, list); + + if (proc->timer != NULL) { + kore_timer_remove(proc->timer); + proc->timer = NULL; + } + + if (proc->pid != -1) { + if (kill(proc->pid, SIGKILL) == -1) { + kore_log(LOG_NOTICE, + "kore.proc failed to send SIGKILL %d (%s)", + proc->pid, errno_s); + } + + for (;;) { + if (waitpid(proc->pid, &status, 0) == -1) { + if (errno == EINTR) + continue; + kore_log(LOG_NOTICE, + "kore.proc failed to wait for %d (%s)", + proc->pid, errno_s); + } + break; + } + } + + if (proc->in != NULL) { + Py_DECREF((PyObject *)proc->in); + proc->in = NULL; + } + + if (proc->out != NULL) { + Py_DECREF((PyObject *)proc->out); + proc->out = NULL; + } + + PyObject_Del((PyObject *)proc); +} + +static PyObject * +pyproc_kill(struct pyproc *proc, PyObject *args) +{ + if (proc->pid != -1 && kill(proc->pid, SIGKILL) == -1) + kore_log(LOG_NOTICE, "kill(%d): %s", proc->pid, errno_s); + + Py_RETURN_TRUE; +} + +static PyObject * +pyproc_reap(struct pyproc *proc, PyObject *args) +{ + struct pyproc_op *op; + + if (proc->timer != NULL) { + kore_timer_remove(proc->timer); + proc->timer = NULL; + } + + if ((op = PyObject_New(struct pyproc_op, &pyproc_op_type)) == NULL) + return (NULL); + + op->proc = proc; + + Py_INCREF((PyObject *)proc); + + return ((PyObject *)op); +} + +static PyObject * +pyproc_recv(struct pyproc *proc, PyObject *args) +{ + Py_ssize_t len; + + if (proc->out == NULL) { + PyErr_SetString(PyExc_RuntimeError, "stdout closed"); + return (NULL); + } + + if (!PyArg_ParseTuple(args, "n", &len)) + return (NULL); + + return (pysocket_op_create(proc->out, PYSOCKET_TYPE_RECV, NULL, len)); +} + +static PyObject * +pyproc_send(struct pyproc *proc, PyObject *args) +{ + Py_buffer buf; + PyObject *ret; + + if (proc->in == NULL) { + PyErr_SetString(PyExc_RuntimeError, "stdin closed"); + return (NULL); + } + + if (!PyArg_ParseTuple(args, "y*", &buf)) + return (NULL); + + ret = pysocket_op_create(proc->in, + PYSOCKET_TYPE_SEND, buf.buf, buf.len); + + return (ret); +} + +static PyObject * +pyproc_close_stdin(struct pyproc *proc, PyObject *args) +{ + if (proc->in != NULL) { + Py_DECREF((PyObject *)proc->in); + proc->in = NULL; + } + + Py_RETURN_TRUE; +} + +static void +pyproc_op_dealloc(struct pyproc_op *op) +{ + Py_DECREF((PyObject *)op->proc); + PyObject_Del((PyObject *)op); +} + +static PyObject * +pyproc_op_await(PyObject *sop) +{ + Py_INCREF(sop); + return (sop); +} + +static PyObject * +pyproc_op_iternext(struct pyproc_op *op) +{ + int ret; + PyObject *res; + + if (op->proc->coro->exception != NULL) { + PyErr_SetString(op->proc->coro->exception, + op->proc->coro->exception_msg); + op->proc->coro->exception = NULL; + return (NULL); + } + + if (op->proc->reaped == 0) + Py_RETURN_NONE; + + if (WIFSTOPPED(op->proc->status)) { + op->proc->reaped = 0; + Py_RETURN_NONE; + } + + if (WIFEXITED(op->proc->status)) { + ret = WEXITSTATUS(op->proc->status); + } else { + ret = op->proc->status; + } + + if ((res = PyLong_FromLong(ret)) == NULL) + return (NULL); + + PyErr_SetObject(PyExc_StopIteration, res); + Py_DECREF(res); + + return (NULL); +} + static PyObject * pyhttp_request_alloc(const struct http_request *req) { diff --git a/src/worker.c b/src/worker.c @@ -90,8 +90,8 @@ extern volatile sig_atomic_t sig_recv; struct kore_worker *worker = NULL; u_int8_t worker_set_affinity = 1; u_int32_t worker_accept_threshold = 0; -u_int32_t worker_rlimit_nofiles = 1024; -u_int32_t worker_max_connections = 250; +u_int32_t worker_rlimit_nofiles = 768; +u_int32_t worker_max_connections = 512; u_int32_t worker_active_connections = 0; void @@ -379,23 +379,6 @@ kore_worker_entry(struct kore_worker *kw) worker->restarted = 0; for (;;) { - if (sig_recv != 0) { - switch (sig_recv) { - case SIGHUP: - kore_module_reload(1); - break; - case SIGQUIT: - case SIGINT: - case SIGTERM: - quit = 1; - break; - default: - break; - } - - sig_recv = 0; - } - netwait = 100; now = kore_time_ms(); @@ -446,6 +429,28 @@ kore_worker_entry(struct kore_worker *kw) } } + if (sig_recv != 0) { + switch (sig_recv) { + case SIGHUP: + kore_module_reload(1); + break; + case SIGQUIT: + case SIGINT: + case SIGTERM: + quit = 1; + break; + case SIGCHLD: +#if defined(KORE_USE_PYTHON) + kore_python_proc_reap(); +#endif + break; + default: + break; + } + + sig_recv = 0; + } + #if !defined(KORE_NO_HTTP) http_process(); #endif