commit 0cda9ecfb0593d54a5de13427fcb6635ced788c1
parent b5958f7d7d7568cd59c8e1d60d644e657b3cc1ce
Author: Joris Vink <joris@coders.se>
Date: Thu, 18 Oct 2018 22:15:21 +0200
Add an asynchronous queue mechanism.
This allows coroutines to submit messages to and pop
messages from a queue in an asynchronous way.
Diffstat:
include/kore/python_methods.h | | | 83 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- |
src/python.c | | | 151 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ |
2 files changed, 220 insertions(+), 14 deletions(-)
diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h
@@ -14,10 +14,22 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
+#define CORO_STATE_RUNNABLE 1
+#define CORO_STATE_SUSPENDED 2
+
+struct python_coro {
+ int state;
+ int error;
+ PyObject *obj;
+ struct http_request *request;
+ TAILQ_ENTRY(python_coro) list;
+};
+
static PyObject *python_kore_log(PyObject *, PyObject *);
+static PyObject *python_kore_bind(PyObject *, PyObject *);
static PyObject *python_kore_fatal(PyObject *, PyObject *);
+static PyObject *python_kore_queue(PyObject *, PyObject *);
static PyObject *python_kore_fatalx(PyObject *, PyObject *);
-static PyObject *python_kore_bind(PyObject *, PyObject *);
static PyObject *python_kore_bind_unix(PyObject *, PyObject *);
static PyObject *python_kore_task_create(PyObject *, PyObject *);
static PyObject *python_kore_socket_wrap(PyObject *, PyObject *);
@@ -35,9 +47,10 @@ static PyObject *python_websocket_broadcast(PyObject *, PyObject *);
static struct PyMethodDef pykore_methods[] = {
METHOD("log", python_kore_log, METH_VARARGS),
+ METHOD("bind", python_kore_bind, METH_VARARGS),
+ METHOD("queue", python_kore_queue, METH_VARARGS),
METHOD("fatal", python_kore_fatal, METH_VARARGS),
METHOD("fatalx", python_kore_fatalx, METH_VARARGS),
- METHOD("bind", python_kore_bind, METH_VARARGS),
METHOD("bind_unix", python_kore_bind_unix, METH_VARARGS),
METHOD("task_create", python_kore_task_create, METH_VARARGS),
METHOD("socket_wrap", python_kore_socket_wrap, METH_VARARGS),
@@ -102,7 +115,7 @@ struct pysocket_data {
int fd;
int type;
void *self;
- void *coro;
+ struct python_coro *coro;
int state;
size_t length;
struct kore_buf buffer;
@@ -136,6 +149,70 @@ static PyTypeObject pysocket_op_type = {
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
};
+struct pyqueue_waiting {
+ struct python_coro *coro;
+ TAILQ_ENTRY(pyqueue_waiting) list;
+};
+
+struct pyqueue_object {
+ PyObject *obj;
+ TAILQ_ENTRY(pyqueue_object) list;
+};
+
+struct pyqueue {
+ PyObject_HEAD
+ TAILQ_HEAD(, pyqueue_object) objects;
+ TAILQ_HEAD(, pyqueue_waiting) waiting;
+};
+
+static PyObject *pyqueue_pop(struct pyqueue *, PyObject *);
+static PyObject *pyqueue_push(struct pyqueue *, PyObject *);
+
+static PyMethodDef pyqueue_methods[] = {
+ METHOD("pop", pyqueue_pop, METH_NOARGS),
+ METHOD("push", pyqueue_push, METH_VARARGS),
+ METHOD(NULL, NULL, -1)
+};
+
+static void pyqueue_dealloc(struct pyqueue *);
+
+static PyTypeObject pyqueue_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ .tp_name = "kore.queue",
+ .tp_doc = "queue",
+ .tp_methods = pyqueue_methods,
+ .tp_basicsize = sizeof(struct pyqueue),
+ .tp_dealloc = (destructor)pyqueue_dealloc,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+};
+
+struct pyqueue_op {
+ PyObject_HEAD
+ struct pyqueue *queue;
+};
+
+static void pyqueue_op_dealloc(struct pyqueue_op *);
+
+static PyObject *pyqueue_op_await(PyObject *);
+static PyObject *pyqueue_op_iternext(struct pyqueue_op *);
+
+static PyAsyncMethods pyqueue_op_async = {
+ (unaryfunc)pyqueue_op_await,
+ NULL,
+ NULL
+};
+
+static PyTypeObject pyqueue_op_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ .tp_name = "kore.queueop",
+ .tp_doc = "queue waitable",
+ .tp_as_async = &pyqueue_op_async,
+ .tp_iternext = (iternextfunc)pyqueue_op_iternext,
+ .tp_basicsize = sizeof(struct pyqueue_op),
+ .tp_dealloc = (destructor)pyqueue_op_dealloc,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+};
+
struct pyconnection {
PyObject_HEAD
struct connection *c;
diff --git a/src/python.c b/src/python.c
@@ -30,17 +30,6 @@
#include "python_api.h"
#include "python_methods.h"
-#define CORO_STATE_RUNNABLE 1
-#define CORO_STATE_SUSPENDED 2
-
-struct python_coro {
- int state;
- int error;
- PyObject *obj;
- struct http_request *request;
- TAILQ_ENTRY(python_coro) list;
-};
-
static PyMODINIT_FUNC python_module_init(void);
static PyObject *python_import(const char *);
static PyObject *pyconnection_alloc(struct connection *);
@@ -150,6 +139,9 @@ static PyMemAllocatorEx allocator = {
};
static struct kore_pool coro_pool;
+static struct kore_pool queue_wait_pool;
+static struct kore_pool queue_object_pool;
+
static int coro_count;
static TAILQ_HEAD(, python_coro) coro_runnable;
static TAILQ_HEAD(, python_coro) coro_suspended;
@@ -169,6 +161,11 @@ kore_python_init(void)
kore_pool_init(&coro_pool, "coropool", sizeof(struct python_coro), 100);
+ kore_pool_init(&queue_wait_pool, "queue_wait_pool",
+ sizeof(struct pyqueue_waiting), 100);
+ kore_pool_init(&queue_object_pool, "queue_object_pool",
+ sizeof(struct pyqueue_object), 100);
+
PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &allocator);
PyMem_SetAllocator(PYMEM_DOMAIN_MEM, &allocator);
PyMem_SetAllocator(PYMEM_DOMAIN_RAW, &allocator);
@@ -207,6 +204,13 @@ kore_python_coro_run(void)
if (python_coro_run(coro) == KORE_RESULT_OK)
kore_python_coro_delete(coro);
}
+
+ /*
+ * If something was woken up, let Kore do HTTP processing
+ * so they run ASAP without having to wait for a tick from
+ * the event loop.
+ */
+ http_process();
}
void
@@ -704,6 +708,7 @@ python_module_init(void)
if ((pykore = PyModule_Create(&pykore_module)) == NULL)
fatal("python_module_init: failed to setup pykore module");
+ python_push_type("pyqueue", pykore, &pyqueue_type);
python_push_type("pysocket", pykore, &pysocket_type);
python_push_type("pysocket_op", pykore, &pysocket_op_type);
python_push_type("pyconnection", pykore, &pyconnection_type);
@@ -888,6 +893,20 @@ out:
}
static PyObject *
+python_kore_queue(PyObject *self, PyObject *args)
+{
+ struct pyqueue *queue;
+
+ if ((queue = PyObject_New(struct pyqueue, &pyqueue_type)) == NULL)
+ return (NULL);
+
+ TAILQ_INIT(&queue->objects);
+ TAILQ_INIT(&queue->waiting);
+
+ return ((PyObject *)queue);
+}
+
+static PyObject *
python_kore_fatal(PyObject *self, PyObject *args)
{
const char *reason;
@@ -1143,6 +1162,8 @@ pysocket_op_dealloc(struct pysocket_op *op)
kore_buf_cleanup(&op->data.buffer);
Py_DECREF(op->data.socket);
+ Py_INCREF(op->data.coro->obj);
+
PyObject_Del((PyObject *)op);
}
@@ -1177,6 +1198,7 @@ pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len)
op->data.evt.handle = pysocket_evt_handle;
Py_INCREF(op->data.socket);
+ Py_INCREF(op->data.coro->obj);
switch (type) {
case PYSOCKET_TYPE_RECV:
@@ -1383,6 +1405,113 @@ pysocket_evt_handle(void *arg, int error)
coro->error = error;
}
+static void
+pyqueue_dealloc(struct pyqueue *queue)
+{
+ PyObject_Del((PyObject *)queue);
+}
+
+static PyObject *
+pyqueue_pop(struct pyqueue *queue, PyObject *args)
+{
+ struct pyqueue_op *op;
+ struct pyqueue_waiting *waiting;
+
+ if ((op = PyObject_New(struct pyqueue_op, &pyqueue_op_type)) == NULL)
+ return (NULL);
+
+ op->queue = queue;
+
+ waiting = kore_pool_get(&queue_wait_pool);
+
+ waiting->coro = coro_running;
+ TAILQ_INSERT_TAIL(&queue->waiting, waiting, list);
+
+ Py_INCREF((PyObject *)queue);
+ Py_INCREF(waiting->coro->obj);
+
+ return ((PyObject *)op);
+}
+
+static PyObject *
+pyqueue_push(struct pyqueue *queue, PyObject *args)
+{
+ PyObject *obj;
+ struct pyqueue_object *object;
+ struct pyqueue_waiting *waiting;
+
+ if (!PyArg_ParseTuple(args, "O", &obj))
+ return (NULL);
+
+ Py_INCREF(obj);
+
+ object = kore_pool_get(&queue_object_pool);
+ object->obj = obj;
+
+ TAILQ_INSERT_TAIL(&queue->objects, object, list);
+
+ /* Wakeup first in line if any. */
+ if ((waiting = TAILQ_FIRST(&queue->waiting)) != NULL) {
+ TAILQ_REMOVE(&queue->waiting, waiting, list);
+
+ /* wakeup HTTP request if one is tied. */
+ if (waiting->coro->request != NULL)
+ http_request_wakeup(waiting->coro->request);
+ else
+ python_coro_wakeup(waiting->coro);
+
+ Py_DECREF(waiting->coro->obj);
+ kore_pool_put(&queue_wait_pool, waiting);
+ }
+
+ Py_RETURN_TRUE;
+}
+
+static void
+pyqueue_op_dealloc(struct pyqueue_op *op)
+{
+ Py_DECREF((PyObject *)op->queue);
+ PyObject_Del((PyObject *)op);
+}
+
+static PyObject *
+pyqueue_op_await(PyObject *obj)
+{
+ Py_INCREF(obj);
+ return (obj);
+}
+
+static PyObject *
+pyqueue_op_iternext(struct pyqueue_op *op)
+{
+ PyObject *obj;
+ struct pyqueue_object *object;
+ struct pyqueue_waiting *waiting;
+
+ if ((object = TAILQ_FIRST(&op->queue->objects)) == NULL) {
+ Py_RETURN_NONE;
+ }
+
+ TAILQ_REMOVE(&op->queue->objects, object, list);
+
+ obj = object->obj;
+ kore_pool_put(&queue_object_pool, object);
+
+ TAILQ_FOREACH(waiting, &op->queue->waiting, list) {
+ if (waiting->coro == coro_running) {
+ TAILQ_REMOVE(&op->queue->waiting, waiting, list);
+ Py_DECREF(waiting->coro->obj);
+ kore_pool_put(&queue_wait_pool, waiting);
+ break;
+ }
+ }
+
+ PyErr_SetObject(PyExc_StopIteration, obj);
+ Py_DECREF(obj);
+
+ return (NULL);
+}
+
static PyObject *
pyhttp_request_alloc(const struct http_request *req)
{