kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 8ea32983aea01706ca01ce1996e69be9b32b4570
parent 47776a9fbbb49a64525a626c15569acab41dcc44
Author: Joris Vink <joris@coders.se>
Date:   Tue, 23 Oct 2018 21:32:08 +0200

Add kore.suspend(delay) to python.

Will suspend the coroutine for a number of milliseconds.

Example:

async def page(req):
	await kore.suspend(1000)
	req.response(200, b'')

Diffstat:
include/kore/python_methods.h | 37++++++++++++++++++++++++++++++++++++-
src/python.c | 88+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
2 files changed, 115 insertions(+), 10 deletions(-)

diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -22,7 +22,6 @@ struct python_coro { int state; int error; PyObject *obj; - struct pylock *lock; struct http_request *request; TAILQ_ENTRY(python_coro) list; }; @@ -34,6 +33,7 @@ static PyObject *python_kore_timer(PyObject *, PyObject *); static PyObject *python_kore_fatal(PyObject *, PyObject *); static PyObject *python_kore_queue(PyObject *, PyObject *); static PyObject *python_kore_fatalx(PyObject *, PyObject *); +static PyObject *python_kore_suspend(PyObject *, PyObject *); static PyObject *python_kore_shutdown(PyObject *, PyObject *); static PyObject *python_kore_bind_unix(PyObject *, PyObject *); static PyObject *python_kore_task_create(PyObject *, PyObject *); @@ -58,6 +58,7 @@ static struct PyMethodDef pykore_methods[] = { METHOD("queue", python_kore_queue, METH_VARARGS), METHOD("fatal", python_kore_fatal, METH_VARARGS), METHOD("fatalx", python_kore_fatalx, METH_VARARGS), + METHOD("suspend", python_kore_suspend, METH_VARARGS), METHOD("shutdown", python_kore_shutdown, METH_NOARGS), METHOD("bind_unix", python_kore_bind_unix, METH_VARARGS), METHOD("task_create", python_kore_task_create, METH_VARARGS), @@ -73,6 +74,40 @@ static struct PyModuleDef pykore_module = { PyModuleDef_HEAD_INIT, "kore", NULL, -1, pykore_methods }; +#define PYSUSPEND_OP_INIT 1 +#define PYSUSPEND_OP_WAIT 2 +#define PYSUSPEND_OP_CONTINUE 3 + +struct pysuspend_op { + PyObject_HEAD + int state; + int delay; + struct python_coro *coro; + struct kore_timer *timer; +}; + +static void pysuspend_op_dealloc(struct pysuspend_op *); + +static PyObject *pysuspend_op_await(PyObject *); +static PyObject *pysuspend_op_iternext(struct pysuspend_op *); + +static PyAsyncMethods pysuspend_op_async = { + (unaryfunc)pysuspend_op_await, + NULL, + NULL +}; + +static PyTypeObject pysuspend_op_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.suspend", + .tp_doc = "suspension operation", + .tp_as_async = &pysuspend_op_async, + .tp_iternext = (iternextfunc)pysuspend_op_iternext, + .tp_basicsize = sizeof(struct pysuspend_op), + .tp_dealloc = (destructor)pysuspend_op_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + struct pytimer { PyObject_HEAD int flags; diff --git a/src/python.c b/src/python.c @@ -53,7 +53,9 @@ static PyObject *pysocket_async_accept(struct pysocket_op *); static PyObject *pysocket_async_connect(struct pysocket_op *); static void pylock_do_release(struct pylock *); + static void pytimer_run(void *, u_int64_t); +static void pysuspend_wakeup(void *, u_int64_t); #if defined(KORE_USE_PGSQL) static PyObject *pykore_pgsql_alloc(struct http_request *, @@ -723,17 +725,16 @@ python_module_init(void) python_push_type("pytimer", pykore, &pytimer_type); python_push_type("pyqueue", pykore, &pyqueue_type); python_push_type("pysocket", pykore, &pysocket_type); - python_push_type("pysocket_op", pykore, &pysocket_op_type); python_push_type("pyconnection", pykore, &pyconnection_type); + python_push_type("pyhttp_file", pykore, &pyhttp_file_type); + python_push_type("pyhttp_request", pykore, &pyhttp_request_type); + for (i = 0; python_integers[i].symbol != NULL; i++) { python_push_integer(pykore, python_integers[i].symbol, python_integers[i].value); } - python_push_type("pyhttp_file", pykore, &pyhttp_file_type); - python_push_type("pyhttp_request", pykore, &pyhttp_request_type); - return (pykore); } @@ -962,6 +963,27 @@ python_kore_fatalx(PyObject *self, PyObject *args) } static PyObject * +python_kore_suspend(PyObject *self, PyObject *args) +{ + struct pysuspend_op *sop; + int delay; + + if (!PyArg_ParseTuple(args, "i", &delay)) + return (NULL); + + sop = PyObject_New(struct pysuspend_op, &pysuspend_op_type); + if (sop == NULL) + return (NULL); + + sop->timer = NULL; + sop->delay = delay; + sop->coro = coro_running; + sop->state = PYSUSPEND_OP_INIT; + + return ((PyObject *)sop); +} + +static PyObject * python_kore_shutdown(PyObject *self, PyObject *args) { kore_shutdown(); @@ -992,9 +1014,6 @@ python_kore_timer(PyObject *self, PyObject *args) timer->callable = obj; timer->run = kore_timer_add(pytimer_run, ms, timer, flags); - printf("timer of %llu ms started (0x%04x)\n", ms, flags); - - Py_INCREF((PyObject *)timer); Py_INCREF(timer->callable); return ((PyObject *)timer); @@ -1150,12 +1169,63 @@ pytimer_close(struct pytimer *timer, PyObject *args) timer->callable = NULL; } - Py_DECREF((PyObject *)timer); - Py_RETURN_TRUE; } static void +pysuspend_op_dealloc(struct pysuspend_op *sop) +{ + if (sop->timer != NULL) { + kore_timer_remove(sop->timer); + sop->timer = NULL; + } + + PyObject_Del((PyObject *)sop); +} + +static PyObject * +pysuspend_op_await(PyObject *sop) +{ + Py_INCREF(sop); + return (sop); +} + +static PyObject * +pysuspend_op_iternext(struct pysuspend_op *sop) +{ + switch (sop->state) { + case PYSUSPEND_OP_INIT: + sop->timer = kore_timer_add(pysuspend_wakeup, sop->delay, + sop, KORE_TIMER_ONESHOT); + sop->state = PYSUSPEND_OP_WAIT; + break; + case PYSUSPEND_OP_WAIT: + break; + case PYSUSPEND_OP_CONTINUE: + PyErr_SetNone(PyExc_StopIteration); + return (NULL); + default: + fatal("unknown state %d for pysuspend_op", sop->state); + } + + Py_RETURN_NONE; +} + +static void +pysuspend_wakeup(void *arg, u_int64_t now) +{ + struct pysuspend_op *sop = arg; + + sop->timer = NULL; + sop->state = PYSUSPEND_OP_CONTINUE; + + if (sop->coro->request != NULL) + http_request_wakeup(sop->coro->request); + else + python_coro_wakeup(sop->coro); +} + +static void pysocket_dealloc(struct pysocket *sock) { PyObject_Del((PyObject *)sock);