commit 1c30da855c60f12451b67fb88354dbb1d71686b9
parent 740acb4760f11409055459d0a7d3624a9b86706c
Author: Joris Vink <joris@coders.se>
Date: Mon, 29 Oct 2018 21:16:08 +0100
Add kore.gather() to the python api.
Allows one to run coroutines concurrently and gather all their
results in a single returned list.
If any of the coroutines throw an exception the exception is
returned as the value of that coroutine in the returned list.
Diffstat:
2 files changed, 248 insertions(+), 20 deletions(-)
diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h
@@ -22,12 +22,15 @@ struct python_coro {
int state;
PyObject *obj;
struct pysocket_op *sockop;
+ struct pygather_op *gatherop;
struct http_request *request;
PyObject *exception;
char *exception_msg;
TAILQ_ENTRY(python_coro) list;
};
+TAILQ_HEAD(coro_list, python_coro);
+
static PyObject *python_kore_log(PyObject *, PyObject *);
static PyObject *python_kore_lock(PyObject *, PyObject *);
static PyObject *python_kore_proc(PyObject *, PyObject *);
@@ -35,6 +38,7 @@ static PyObject *python_kore_bind(PyObject *, PyObject *);
static PyObject *python_kore_timer(PyObject *, PyObject *);
static PyObject *python_kore_fatal(PyObject *, PyObject *);
static PyObject *python_kore_queue(PyObject *, PyObject *);
+static PyObject *python_kore_gather(PyObject *, PyObject *);
static PyObject *python_kore_fatalx(PyObject *, PyObject *);
static PyObject *python_kore_suspend(PyObject *, PyObject *);
static PyObject *python_kore_shutdown(PyObject *, PyObject *);
@@ -60,6 +64,7 @@ static struct PyMethodDef pykore_methods[] = {
METHOD("bind", python_kore_bind, METH_VARARGS),
METHOD("timer", python_kore_timer, METH_VARARGS),
METHOD("queue", python_kore_queue, METH_VARARGS),
+ METHOD("gather", python_kore_gather, METH_VARARGS),
METHOD("fatal", python_kore_fatal, METH_VARARGS),
METHOD("fatalx", python_kore_fatalx, METH_VARARGS),
METHOD("suspend", python_kore_suspend, METH_VARARGS),
@@ -415,6 +420,47 @@ static PyTypeObject pyproc_op_type = {
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
};
+struct pygather_coro {
+ struct python_coro *coro;
+ PyObject *result;
+ TAILQ_ENTRY(pygather_coro) list;
+};
+
+struct pygather_result {
+ PyObject *obj;
+ TAILQ_ENTRY(pygather_result) list;
+};
+
+struct pygather_op {
+ PyObject_HEAD
+ int count;
+ struct python_coro *coro;
+ TAILQ_HEAD(, pygather_result) results;
+ TAILQ_HEAD(, pygather_coro) coroutines;
+};
+
+static void pygather_op_dealloc(struct pygather_op *);
+
+static PyObject *pygather_op_await(PyObject *);
+static PyObject *pygather_op_iternext(struct pygather_op *);
+
+static PyAsyncMethods pygather_op_async = {
+ (unaryfunc)pygather_op_await,
+ NULL,
+ NULL
+};
+
+static PyTypeObject pygather_op_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ .tp_name = "kore.pygather_op",
+ .tp_doc = "coroutine gathering",
+ .tp_as_async = &pygather_op_async,
+ .tp_iternext = (iternextfunc)pygather_op_iternext,
+ .tp_basicsize = sizeof(struct pygather_op),
+ .tp_dealloc = (destructor)pygather_op_dealloc,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+};
+
struct pyconnection {
PyObject_HEAD
struct connection *c;
diff --git a/src/python.c b/src/python.c
@@ -61,6 +61,9 @@ static void pytimer_run(void *, u_int64_t);
static void pyproc_timeout(void *, u_int64_t);
static void pysuspend_wakeup(void *, u_int64_t);
+static void pygather_reap_coro(struct pygather_op *,
+ struct python_coro *);
+
#if defined(KORE_USE_PGSQL)
static PyObject *pykore_pgsql_alloc(struct http_request *,
const char *, const char *);
@@ -153,17 +156,18 @@ static TAILQ_HEAD(, pyproc) procs;
static struct kore_pool coro_pool;
static struct kore_pool queue_wait_pool;
+static struct kore_pool gather_coro_pool;
static struct kore_pool queue_object_pool;
+static struct kore_pool gather_result_pool;
static u_int64_t coro_id;
static int coro_count;
-static TAILQ_HEAD(, python_coro) coro_runnable;
-static TAILQ_HEAD(, python_coro) coro_suspended;
+static struct coro_list coro_runnable;
+static struct coro_list coro_suspended;
extern const char *__progname;
/* XXX */
-static struct http_request *req_running = NULL;
static struct python_coro *coro_running = NULL;
void
@@ -180,8 +184,12 @@ kore_python_init(void)
kore_pool_init(&queue_wait_pool, "queue_wait_pool",
sizeof(struct pyqueue_waiting), 100);
+ kore_pool_init(&gather_coro_pool, "gather_coro_pool",
+ sizeof(struct pygather_coro), 100);
kore_pool_init(&queue_object_pool, "queue_object_pool",
sizeof(struct pyqueue_object), 100);
+ kore_pool_init(&gather_result_pool, "gather_result_pool",
+ sizeof(struct pygather_result), 100);
PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &allocator);
PyMem_SetAllocator(PYMEM_DOMAIN_MEM, &allocator);
@@ -212,14 +220,27 @@ kore_python_path(const char *path)
void
kore_python_coro_run(void)
{
+ struct pygather_op *op;
struct python_coro *coro, *next;
for (coro = TAILQ_FIRST(&coro_runnable); coro != NULL; coro = next) {
next = TAILQ_NEXT(coro, list);
+
if (coro->state != CORO_STATE_RUNNABLE)
fatal("non-runnable coro on coro_runnable");
- if (python_coro_run(coro) == KORE_RESULT_OK)
- kore_python_coro_delete(coro);
+
+ if (python_coro_run(coro) == KORE_RESULT_OK) {
+ if (coro->gatherop != NULL) {
+ op = coro->gatherop;
+ if (op->coro->request != NULL)
+ http_request_wakeup(op->coro->request);
+ else
+ python_coro_wakeup(op->coro);
+ pygather_reap_coro(op, coro);
+ } else {
+ kore_python_coro_delete(coro);
+ }
+ }
}
/*
@@ -268,14 +289,28 @@ kore_python_log_error(const char *function)
return;
}
- if ((repr = PyObject_Repr(value)) == NULL)
- sval = "unknown";
- else
- sval = PyUnicode_AsUTF8(repr);
+ if (value == NULL || !PyObject_IsInstance(value, type))
+ PyErr_NormalizeException(&type, &value, &traceback);
- kore_log(LOG_ERR, "uncaught exception %s in '%s'", sval, function);
+ /*
+ * If we're in an active coroutine and it was tied to a gather
+ * operation we have to make sure we can use the Exception that
+ * was thrown as the result value so we can propagate it via the
+ * return list of kore.gather().
+ */
+ if (coro_running != NULL && coro_running->gatherop != NULL) {
+ PyErr_SetObject(PyExc_StopIteration, value);
+ } else {
+ if ((repr = PyObject_Repr(value)) == NULL)
+ sval = "unknown";
+ else
+ sval = PyUnicode_AsUTF8(repr);
+
+ kore_log(LOG_ERR,
+ "uncaught exception %s in '%s'", sval, function);
- Py_XDECREF(repr);
+ Py_XDECREF(repr);
+ }
Py_DECREF(type);
Py_DECREF(value);
@@ -391,6 +426,7 @@ python_coro_create(PyObject *obj, struct http_request *req)
coro_count++;
coro->sockop = NULL;
+ coro->gatherop = NULL;
coro->exception = NULL;
coro->exception_msg = NULL;
@@ -482,17 +518,13 @@ python_runtime_http_request(void *addr, struct http_request *req)
{
PyObject *pyret, *pyreq, *args, *callable;
- req_running = req;
-
if (req->py_coro != NULL) {
python_coro_wakeup(req->py_coro);
if (python_coro_run(req->py_coro) == KORE_RESULT_OK) {
kore_python_coro_delete(req->py_coro);
req->py_coro = NULL;
- req_running = NULL;
return (KORE_RESULT_OK);
}
- req_running = NULL;
return (KORE_RESULT_RETRY);
}
@@ -518,16 +550,13 @@ python_runtime_http_request(void *addr, struct http_request *req)
}
if (PyCoro_CheckExact(pyret)) {
- http_request_sleep(req);
req->py_coro = python_coro_create(pyret, req);
- req_running = NULL;
- /* XXX merge with the above python_coro_run() block. */
if (python_coro_run(req->py_coro) == KORE_RESULT_OK) {
kore_python_coro_delete(req->py_coro);
req->py_coro = NULL;
- req_running = NULL;
return (KORE_RESULT_OK);
}
+ http_request_sleep(req);
return (KORE_RESULT_RETRY);
}
@@ -535,7 +564,6 @@ python_runtime_http_request(void *addr, struct http_request *req)
fatal("python_runtime_http_request: unexpected return type");
Py_DECREF(pyret);
- req_running = NULL;
return (KORE_RESULT_OK);
}
@@ -970,6 +998,59 @@ python_kore_queue(PyObject *self, PyObject *args)
}
static PyObject *
+python_kore_gather(PyObject *self, PyObject *args)
+{
+ struct pygather_op *op;
+ PyObject *obj;
+ struct pygather_coro *coro;
+ Py_ssize_t sz, idx;
+
+ if (coro_running == NULL) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "kore.gather only available in coroutines");
+ return (NULL);
+ }
+
+ sz = PyTuple_Size(args);
+
+ if (sz > INT_MAX) {
+ PyErr_SetString(PyExc_TypeError, "too many arguments");
+ return (NULL);
+ }
+
+ op = PyObject_New(struct pygather_op, &pygather_op_type);
+ if (op == NULL)
+ return (NULL);
+
+ op->count = (int)sz;
+ op->coro = coro_running;
+
+ TAILQ_INIT(&op->results);
+ TAILQ_INIT(&op->coroutines);
+
+ for (idx = 0; idx < sz; idx++) {
+ if ((obj = PyTuple_GetItem(args, idx)) == NULL)
+ return (NULL);
+
+ if (!PyCoro_CheckExact(obj)) {
+ PyErr_SetString(PyExc_TypeError, "not a coroutine");
+ return (NULL);
+ }
+
+ Py_INCREF(obj);
+
+ coro = kore_pool_get(&gather_coro_pool);
+
+ coro->coro = python_coro_create(obj, NULL);
+ coro->coro->gatherop = op;
+
+ TAILQ_INSERT_TAIL(&op->coroutines, coro, list);
+ }
+
+ return ((PyObject *)op);
+}
+
+static PyObject *
python_kore_lock(PyObject *self, PyObject *args)
{
struct pylock *lock;
@@ -2285,6 +2366,107 @@ pyproc_op_iternext(struct pyproc_op *op)
return (NULL);
}
+static void
+pygather_reap_coro(struct pygather_op *op, struct python_coro *reap)
+{
+ struct pygather_coro *coro;
+ struct pygather_result *result;
+
+ TAILQ_FOREACH(coro, &op->coroutines, list) {
+ if (coro->coro->id == reap->id)
+ break;
+ }
+
+ if (coro == NULL)
+ fatal("coroutine %" PRIu64 " not found in gather", reap->id);
+
+ result = kore_pool_get(&gather_result_pool);
+ result->obj = NULL;
+
+ if (_PyGen_FetchStopIterationValue(&result->obj) == -1) {
+ result->obj = Py_None;
+ Py_INCREF(Py_None);
+ }
+
+ TAILQ_INSERT_TAIL(&op->results, result, list);
+
+ TAILQ_REMOVE(&op->coroutines, coro, list);
+ kore_pool_put(&gather_coro_pool, coro);
+
+ kore_python_coro_delete(reap);
+}
+
+static void
+pygather_op_dealloc(struct pygather_op *op)
+{
+ struct pygather_coro *coro, *next;
+ struct pygather_result *res, *rnext;
+
+ for (coro = TAILQ_FIRST(&op->coroutines); coro != NULL; coro = next) {
+ next = TAILQ_NEXT(coro, list);
+ TAILQ_REMOVE(&op->coroutines, coro, list);
+
+ /* Make sure we don't end up in pygather_reap_coro(). */
+ coro->coro->gatherop = NULL;
+
+ kore_python_coro_delete(coro->coro);
+ kore_pool_put(&gather_coro_pool, coro);
+ }
+
+ for (res = TAILQ_FIRST(&op->results); res != NULL; res = rnext) {
+ rnext = TAILQ_NEXT(res, list);
+ TAILQ_REMOVE(&op->results, res, list);
+
+ Py_DECREF(res->obj);
+ kore_pool_put(&gather_result_pool, res);
+ }
+
+ PyObject_Del((PyObject *)op);
+}
+
+static PyObject *
+pygather_op_await(PyObject *obj)
+{
+ Py_INCREF(obj);
+ return (obj);
+}
+
+static PyObject *
+pygather_op_iternext(struct pygather_op *op)
+{
+ int idx;
+ struct pygather_result *res, *next;
+ PyObject *list, *obj;
+
+ if (!TAILQ_EMPTY(&op->coroutines)) {
+ Py_RETURN_NONE;
+ }
+
+ if ((list = PyList_New(op->count)) == NULL)
+ return (NULL);
+
+ idx = 0;
+
+ for (res = TAILQ_FIRST(&op->results); res != NULL; res = next) {
+ next = TAILQ_NEXT(res, list);
+ TAILQ_REMOVE(&op->results, res, list);
+
+ obj = res->obj;
+ res->obj = NULL;
+ kore_pool_put(&gather_result_pool, res);
+
+ if (PyList_SetItem(list, idx++, obj) != 0) {
+ Py_DECREF(list);
+ return (NULL);
+ }
+ }
+
+ PyErr_SetObject(PyExc_StopIteration, list);
+ Py_DECREF(list);
+
+ return (NULL);
+}
+
static PyObject *
pyhttp_request_alloc(const struct http_request *req)
{