commit 4ca7f29649bbd7dbfae8e93a66a7696f95fdb51d
parent e1766e74baa8eea4b2c5155a7e0874aa4c1d85c8
Author: Joris Vink <joris@coders.se>
Date: Mon, 25 Mar 2019 10:13:52 +0100
Add a concurrency parameter to kore.gather()
Diffstat:
2 files changed, 62 insertions(+), 13 deletions(-)
diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h
@@ -41,13 +41,13 @@ static PyObject *python_kore_timer(PyObject *, PyObject *);
static PyObject *python_kore_fatal(PyObject *, PyObject *);
static PyObject *python_kore_queue(PyObject *, PyObject *);
static PyObject *python_kore_tracer(PyObject *, PyObject *);
-static PyObject *python_kore_gather(PyObject *, PyObject *);
static PyObject *python_kore_fatalx(PyObject *, PyObject *);
static PyObject *python_kore_suspend(PyObject *, PyObject *);
static PyObject *python_kore_shutdown(PyObject *, PyObject *);
static PyObject *python_kore_bind_unix(PyObject *, PyObject *);
static PyObject *python_kore_task_create(PyObject *, PyObject *);
static PyObject *python_kore_socket_wrap(PyObject *, PyObject *);
+static PyObject *python_kore_gather(PyObject *, PyObject *, PyObject *);
#if defined(KORE_USE_PGSQL)
static PyObject *python_kore_pgsql_register(PyObject *, PyObject *);
@@ -69,7 +69,7 @@ static struct PyMethodDef pykore_methods[] = {
METHOD("timer", python_kore_timer, METH_VARARGS),
METHOD("queue", python_kore_queue, METH_VARARGS),
METHOD("tracer", python_kore_tracer, METH_VARARGS),
- METHOD("gather", python_kore_gather, METH_VARARGS),
+ METHOD("gather", python_kore_gather, METH_VARARGS | METH_KEYWORDS),
METHOD("fatal", python_kore_fatal, METH_VARARGS),
METHOD("fatalx", python_kore_fatalx, METH_VARARGS),
METHOD("suspend", python_kore_suspend, METH_VARARGS),
@@ -460,6 +460,8 @@ struct pygather_result {
struct pygather_op {
PyObject_HEAD
int count;
+ int running;
+ int concurrency;
struct python_coro *coro;
TAILQ_HEAD(, pygather_result) results;
TAILQ_HEAD(, pygather_coro) coroutines;
diff --git a/src/python.c b/src/python.c
@@ -50,6 +50,7 @@ static struct python_coro *python_coro_create(PyObject *,
struct http_request *);
static int python_coro_run(struct python_coro *);
static void python_coro_wakeup(struct python_coro *);
+static void python_coro_suspend(struct python_coro *);
static void pysocket_evt_handle(void *, int);
static void pysocket_op_timeout(void *, u_int64_t);
@@ -238,11 +239,9 @@ void
kore_python_coro_run(void)
{
struct pygather_op *op;
- struct python_coro *coro, *next;
-
- for (coro = TAILQ_FIRST(&coro_runnable); coro != NULL; coro = next) {
- next = TAILQ_NEXT(coro, list);
+ struct python_coro *coro;
+ while ((coro = TAILQ_FIRST(&coro_runnable)) != NULL) {
if (coro->state != CORO_STATE_RUNNABLE)
fatal("non-runnable coro on coro_runnable");
@@ -563,10 +562,7 @@ python_coro_run(struct python_coro *coro)
Py_DECREF(item);
}
- coro->state = CORO_STATE_SUSPENDED;
- TAILQ_REMOVE(&coro_runnable, coro, list);
- TAILQ_INSERT_HEAD(&coro_suspended, coro, list);
-
+ python_coro_suspend(coro);
coro_running = NULL;
if (coro->request != NULL)
@@ -587,6 +583,17 @@ python_coro_wakeup(struct python_coro *coro)
}
static void
+python_coro_suspend(struct python_coro *coro)
+{
+ if (coro->state != CORO_STATE_RUNNABLE)
+ return;
+
+ coro->state = CORO_STATE_SUSPENDED;
+ TAILQ_REMOVE(&coro_runnable, coro, list);
+ TAILQ_INSERT_HEAD(&coro_suspended, coro, list);
+}
+
+static void
pyconnection_dealloc(struct pyconnection *pyc)
{
PyObject_Del((PyObject *)pyc);
@@ -1185,12 +1192,13 @@ python_kore_tracer(PyObject *self, PyObject *args)
}
static PyObject *
-python_kore_gather(PyObject *self, PyObject *args)
+python_kore_gather(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct pygather_op *op;
PyObject *obj;
struct pygather_coro *coro;
Py_ssize_t sz, idx;
+ int concurrency;
if (coro_running == NULL) {
PyErr_SetString(PyExc_RuntimeError,
@@ -1205,12 +1213,33 @@ python_kore_gather(PyObject *self, PyObject *args)
return (NULL);
}
+ if (kwargs != NULL &&
+ (obj = PyDict_GetItemString(kwargs, "concurrency")) != NULL) {
+ if (!PyLong_Check(obj)) {
+ PyErr_SetString(PyExc_TypeError,
+ "concurrency level must be an integer");
+ return (NULL);
+ }
+
+ PyErr_Clear();
+ concurrency = (int)PyLong_AsLong(obj);
+ if (concurrency == -1 && PyErr_Occurred())
+ return (NULL);
+
+ if (concurrency == 0)
+ concurrency = sz;
+ } else {
+ concurrency = sz;
+ }
+
op = PyObject_New(struct pygather_op, &pygather_op_type);
if (op == NULL)
return (NULL);
+ op->running = 0;
op->count = (int)sz;
op->coro = coro_running;
+ op->concurrency = concurrency;
TAILQ_INIT(&op->results);
TAILQ_INIT(&op->coroutines);
@@ -1230,11 +1259,14 @@ python_kore_gather(PyObject *self, PyObject *args)
Py_INCREF(obj);
coro = kore_pool_get(&gather_coro_pool);
-
coro->coro = python_coro_create(obj, NULL);
coro->coro->gatherop = op;
-
TAILQ_INSERT_TAIL(&op->coroutines, coro, list);
+
+ if (idx > concurrency - 1)
+ python_coro_suspend(coro->coro);
+ else
+ op->running++;
}
return ((PyObject *)op);
@@ -2908,6 +2940,10 @@ pygather_reap_coro(struct pygather_op *op, struct python_coro *reap)
if (coro == NULL)
fatal("coroutine %" PRIu64 " not found in gather", reap->id);
+ op->running--;
+ if (op->running < 0)
+ fatal("gatherop: running miscount (%d)", op->running);
+
result = kore_pool_get(&gather_result_pool);
result->obj = NULL;
@@ -2974,10 +3010,21 @@ static PyObject *
pygather_op_iternext(struct pygather_op *op)
{
int idx;
+ struct pygather_coro *coro;
struct pygather_result *res, *next;
PyObject *list, *obj;
if (!TAILQ_EMPTY(&op->coroutines)) {
+ if (op->running > 0)
+ Py_RETURN_NONE;
+
+ TAILQ_FOREACH(coro, &op->coroutines, list) {
+ if (op->running >= op->concurrency)
+ break;
+ python_coro_wakeup(coro->coro);
+ op->running++;
+ }
+
Py_RETURN_NONE;
}