kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit b70d1ee80f8d00a1f8120c546a2d0507d3679b54
parent 1ac131c48fcfdacd266f382dbd915a7a1dfcd661
Author: Joris Vink <joris@coders.se>
Date:   Mon, 22 Oct 2018 20:09:23 +0200

Add a locking mechanism in pykore.

Support the async with syntax:

	lock = kore.lock()

	async with lock:
		# your block

Fix some small issues with other parts of the python system.

Diffstat:
include/kore/python_methods.h | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/python.c | 194+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
2 files changed, 243 insertions(+), 13 deletions(-)

diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -18,14 +18,17 @@ #define CORO_STATE_SUSPENDED 2 struct python_coro { + u_int64_t id; int state; int error; PyObject *obj; + struct pylock *lock; struct http_request *request; TAILQ_ENTRY(python_coro) list; }; static PyObject *python_kore_log(PyObject *, PyObject *); +static PyObject *python_kore_lock(PyObject *, PyObject *); static PyObject *python_kore_bind(PyObject *, PyObject *); static PyObject *python_kore_fatal(PyObject *, PyObject *); static PyObject *python_kore_queue(PyObject *, PyObject *); @@ -47,6 +50,7 @@ static PyObject *python_websocket_broadcast(PyObject *, PyObject *); static struct PyMethodDef pykore_methods[] = { METHOD("log", python_kore_log, METH_VARARGS), + METHOD("lock", python_kore_lock, METH_NOARGS), METHOD("bind", python_kore_bind, METH_VARARGS), METHOD("queue", python_kore_queue, METH_VARARGS), METHOD("fatal", python_kore_fatal, METH_VARARGS), @@ -217,6 +221,64 @@ static PyTypeObject pyqueue_op_type = { .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, }; +struct pylock { + PyObject_HEAD + struct python_coro *owner; + TAILQ_HEAD(, pylock_op) ops; +}; + +static PyObject *pylock_aexit(struct pylock *, PyObject *); +static PyObject *pylock_aenter(struct pylock *, PyObject *); + +static PyMethodDef pylock_methods[] = { + METHOD("__aexit__", pylock_aexit, METH_VARARGS), + METHOD("__aenter__", pylock_aenter, METH_NOARGS), + METHOD(NULL, NULL, -1) +}; + +static void pylock_dealloc(struct pylock *); + +static PyTypeObject pylock_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.lock", + .tp_doc = "locking mechanism", + .tp_methods = pylock_methods, + .tp_basicsize = sizeof(struct pylock), + .tp_dealloc = (destructor)pylock_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + +struct pylock_op { + PyObject_HEAD + int locking; + int active; + struct pylock *lock; + struct python_coro *coro; + TAILQ_ENTRY(pylock_op) list; +}; + +static void pylock_op_dealloc(struct pylock_op *); + +static PyObject *pylock_op_await(PyObject *); +static PyObject *pylock_op_iternext(struct pylock_op *); + +static PyAsyncMethods pylock_op_async = { + (unaryfunc)pylock_op_await, + NULL, + NULL +}; + +static PyTypeObject pylock_op_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.lockop", + .tp_doc = "lock awaitable", + .tp_as_async = &pylock_op_async, + .tp_iternext = (iternextfunc)pylock_op_iternext, + .tp_basicsize = sizeof(struct pylock_op), + .tp_dealloc = (destructor)pylock_op_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + struct pyconnection { PyObject_HEAD struct connection *c; diff --git a/src/python.c b/src/python.c @@ -38,7 +38,8 @@ static PyObject *python_callable(PyObject *, const char *); static PyObject *pyhttp_file_alloc(struct http_file *); static PyObject *pyhttp_request_alloc(const struct http_request *); -static struct python_coro *python_coro_create(PyObject *); +static struct python_coro *python_coro_create(PyObject *, + struct http_request *); static int python_coro_run(struct python_coro *); static void python_coro_wakeup(struct python_coro *); @@ -51,6 +52,8 @@ static PyObject *pysocket_async_send(struct pysocket_op *); static PyObject *pysocket_async_accept(struct pysocket_op *); static PyObject *pysocket_async_connect(struct pysocket_op *); +static void pylock_do_release(struct pylock *); + #if defined(KORE_USE_PGSQL) static PyObject *pykore_pgsql_alloc(struct http_request *, const char *, const char *); @@ -142,6 +145,7 @@ static struct kore_pool coro_pool; static struct kore_pool queue_wait_pool; static struct kore_pool queue_object_pool; +static u_int64_t coro_id; static int coro_count; static TAILQ_HEAD(, python_coro) coro_runnable; static TAILQ_HEAD(, python_coro) coro_suspended; @@ -155,7 +159,9 @@ static struct python_coro *coro_running = NULL; void kore_python_init(void) { + coro_id = 0; coro_count = 0; + TAILQ_INIT(&coro_runnable); TAILQ_INIT(&coro_suspended); @@ -220,7 +226,10 @@ kore_python_coro_delete(void *obj) coro = obj; coro_count--; + + coro_running = coro; Py_DECREF(coro->obj); + coro_running = NULL; if (coro->state == CORO_STATE_RUNNABLE) TAILQ_REMOVE(&coro_runnable, coro, list); @@ -241,21 +250,20 @@ kore_python_log_error(const char *function) PyErr_Fetch(&type, &value, &traceback); - if (type == NULL || value == NULL || traceback == NULL) { + if (type == NULL || value == NULL) { kore_log(LOG_ERR, "unknown python exception in '%s'", function); return; } kore_log(LOG_ERR, - "python exception in '%s' - type:%s - value:%s - trace:%s", + "python exception in '%s' - type:%s - value:%s", function, PyUnicode_AsUTF8AndSize(type, NULL), - PyUnicode_AsUTF8AndSize(value, NULL), - PyUnicode_AsUTF8AndSize(traceback, NULL)); + PyUnicode_AsUTF8AndSize(value, NULL)); Py_DECREF(type); Py_DECREF(value); - Py_DECREF(traceback); + Py_XDECREF(traceback); } static void * @@ -320,7 +328,7 @@ python_module_getsym(struct kore_module *module, const char *symbol) } static struct python_coro * -python_coro_create(PyObject *obj) +python_coro_create(PyObject *obj, struct http_request *req) { struct python_coro *coro; @@ -332,7 +340,8 @@ python_coro_create(PyObject *obj) coro->obj = obj; coro->error = 0; - coro->request = req_running; + coro->request = req; + coro->id = coro_id++; coro->state = CORO_STATE_RUNNABLE; TAILQ_INSERT_HEAD(&coro_runnable, coro, list); @@ -458,7 +467,7 @@ python_runtime_http_request(void *addr, struct http_request *req) if (PyCoro_CheckExact(pyret)) { http_request_sleep(req); - req->py_coro = python_coro_create(pyret); + req->py_coro = python_coro_create(pyret, req); req_running = NULL; /* XXX merge with the above python_coro_run() block. */ if (python_coro_run(req->py_coro) == KORE_RESULT_OK) { @@ -708,6 +717,7 @@ python_module_init(void) if ((pykore = PyModule_Create(&pykore_module)) == NULL) fatal("python_module_init: failed to setup pykore module"); + python_push_type("pylock", pykore, &pylock_type); python_push_type("pyqueue", pykore, &pyqueue_type); python_push_type("pysocket", pykore, &pysocket_type); python_push_type("pysocket_op", pykore, &pysocket_op_type); @@ -824,7 +834,7 @@ python_kore_bind_unix(PyObject *self, PyObject *args) static PyObject * python_kore_task_create(PyObject *self, PyObject *args) { - PyObject *obj; + PyObject *obj; if (!PyArg_ParseTuple(args, "O", &obj)) return (NULL); @@ -832,7 +842,7 @@ python_kore_task_create(PyObject *self, PyObject *args) if (!PyCoro_CheckExact(obj)) fatal("%s: object is not a coroutine", __func__); - python_coro_create(obj); + python_coro_create(obj, NULL); Py_INCREF(obj); Py_RETURN_NONE; @@ -907,6 +917,20 @@ python_kore_queue(PyObject *self, PyObject *args) } static PyObject * +python_kore_lock(PyObject *self, PyObject *args) +{ + struct pylock *lock; + + if ((lock = PyObject_New(struct pylock, &pylock_type)) == NULL) + return (NULL); + + lock->owner = NULL; + TAILQ_INIT(&lock->ops); + + return ((PyObject *)lock); +} + +static PyObject * python_kore_fatal(PyObject *self, PyObject *args) { const char *reason; @@ -1162,7 +1186,6 @@ pysocket_op_dealloc(struct pysocket_op *op) kore_buf_cleanup(&op->data.buffer); Py_DECREF(op->data.socket); - Py_DECREF(op->data.coro->obj); PyObject_Del((PyObject *)op); } @@ -1347,8 +1370,10 @@ pysocket_async_recv(struct pysocket_op *op) ptr = (const char *)op->data.buffer.data; bytes = PyBytes_FromStringAndSize(ptr, ret); - if (bytes != NULL) + if (bytes != NULL) { PyErr_SetObject(PyExc_StopIteration, bytes); + Py_DECREF(bytes); + } return (NULL); } @@ -1548,6 +1573,149 @@ pyqueue_op_iternext(struct pyqueue_op *op) return (NULL); } +static void +pylock_dealloc(struct pylock *lock) +{ + struct pylock_op *op; + + while ((op = TAILQ_FIRST(&lock->ops)) != NULL) { + TAILQ_REMOVE(&lock->ops, op, list); + op->active = 0; + Py_DECREF((PyObject *)op); + } + + PyObject_Del((PyObject *)op); +} + +static PyObject * +pylock_aenter(struct pylock *lock, PyObject *args) +{ + struct pylock_op *op; + + if (lock->owner != NULL && lock->owner->id == coro_running->id) { + PyErr_SetString(PyExc_RuntimeError, "recursive lock detected"); + return (NULL); + } + + if ((op = PyObject_New(struct pylock_op, &pylock_op_type)) == NULL) + return (NULL); + + op->active = 1; + op->lock = lock; + op->locking = 1; + op->coro = coro_running; + + Py_INCREF((PyObject *)op); + Py_INCREF((PyObject *)lock); + + TAILQ_INSERT_TAIL(&lock->ops, op, list); + + return ((PyObject *)op); +} + +static PyObject * +pylock_aexit(struct pylock *lock, PyObject *args) +{ + struct pylock_op *op; + + if (lock->owner == NULL || lock->owner->id != coro_running->id) { + PyErr_SetString(PyExc_RuntimeError, "invalid lock owner"); + return (NULL); + } + + if ((op = PyObject_New(struct pylock_op, &pylock_op_type)) == NULL) + return (NULL); + + op->active = 1; + op->lock = lock; + op->locking = 0; + op->coro = coro_running; + + Py_INCREF((PyObject *)op); + Py_INCREF((PyObject *)lock); + + TAILQ_INSERT_TAIL(&lock->ops, op, list); + + return ((PyObject *)op); +} + +static void +pylock_do_release(struct pylock *lock) +{ + struct pylock_op *op; + + lock->owner = NULL; + + TAILQ_FOREACH(op, &lock->ops, list) { + if (op->locking == 0) + continue; + + TAILQ_REMOVE(&op->lock->ops, op, list); + + if (op->coro->request != NULL) + http_request_wakeup(op->coro->request); + else + python_coro_wakeup(op->coro); + + op->active = 0; + Py_DECREF((PyObject *)op); + break; + } +} + +static void +pylock_op_dealloc(struct pylock_op *op) +{ + if (op->active) { + TAILQ_REMOVE(&op->lock->ops, op, list); + op->active = 0; + } + + Py_DECREF((PyObject *)op->lock); + PyObject_Del((PyObject *)op); +} + +static PyObject * +pylock_op_await(PyObject *obj) +{ + Py_INCREF(obj); + return (obj); +} + +static PyObject * +pylock_op_iternext(struct pylock_op *op) +{ + if (op->locking == 0) { + if (op->lock->owner == NULL) { + PyErr_SetString(PyExc_RuntimeError, + "no lock owner set"); + return (NULL); + } + + if (op->lock->owner->id != coro_running->id) { + PyErr_SetString(PyExc_RuntimeError, + "lock not owned by caller"); + return (NULL); + } + + pylock_do_release(op->lock); + } else { + if (op->lock->owner != NULL) { + Py_RETURN_NONE; + } + + op->lock->owner = coro_running; + } + + op->active = 0; + TAILQ_REMOVE(&op->lock->ops, op, list); + PyErr_SetNone(PyExc_StopIteration); + + Py_DECREF((PyObject *)op); + + return (NULL); +} + static PyObject * pyhttp_request_alloc(const struct http_request *req) {