kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 3c9a141cd0d51e68485f954096abe853217f893d
parent c8878ebcde926e10f2388d477513f2a3f25ed7ef
Author: Joris Vink <joris@coders.se>
Date:   Fri,  3 May 2019 13:42:34 +0200

allow an iterator to be passed to req.response().

if an iterator is passed kore will send the response with
transfer-encoding: chunked and call the iterator for every
chunk that was sent.

The iterator must return a utf-8 string.

Works wonderful with TemplateStream from jinja2.

Diffstat:
include/kore/http.h | 1+
include/kore/kore.h | 1+
include/kore/python_methods.h | 7+++++++
src/http.c | 17+++++++++++------
src/python.c | 143+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
5 files changed, 152 insertions(+), 17 deletions(-)

diff --git a/include/kore/http.h b/include/kore/http.h @@ -316,6 +316,7 @@ const char *http_status_text(int); const char *http_method_text(int); time_t http_date_to_time(char *); char *http_validate_header(char *); +void http_start_recv(struct connection *); void http_request_free(struct http_request *); void http_request_sleep(struct http_request *); void http_request_wakeup(struct http_request *); diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -183,6 +183,7 @@ TAILQ_HEAD(netbuf_head, netbuf); #define CONN_IDLE_TIMER_ACT 0x01 #define CONN_CLOSE_EMPTY 0x02 #define CONN_WS_CLOSE_SENT 0x04 +#define CONN_IS_BUSY 0x08 #define KORE_IDLE_TIMER_MAX 5000 diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -547,6 +547,13 @@ struct pyhttp_request { PyObject *data; }; +struct pyhttp_iterobj { + int remove; + PyObject *iterator; + struct connection *connection; + struct kore_buf buf; +}; + struct pyhttp_file { PyObject_HEAD struct http_file *file; diff --git a/src/http.c b/src/http.c @@ -1408,6 +1408,15 @@ http_state_cleanup(struct http_request *req) req->hdlr_extra = NULL; } +void +http_start_recv(struct connection *c) +{ + c->http_start = kore_time_ms(); + c->http_timeout = http_header_timeout * 1000; + net_recv_reset(c, http_header_max, http_header_recv); + (void)net_recv_flush(c); +} + static struct http_request * http_request_new(struct connection *c, const char *host, const char *method, char *path, const char *version) @@ -1965,12 +1974,8 @@ http_response_normal(struct http_request *req, struct connection *c, if (d != NULL && req != NULL && req->method != HTTP_METHOD_HEAD) net_send_queue(c, d, len); - if (!(c->flags & CONN_CLOSE_EMPTY)) { - c->http_start = kore_time_ms(); - c->http_timeout = http_header_timeout * 1000; - net_recv_reset(c, http_header_max, http_header_recv); - (void)net_recv_flush(c); - } + if (!(c->flags & CONN_CLOSE_EMPTY) && !(c->flags & CONN_IS_BUSY)) + http_start_recv(c); if (req != NULL) req->content_length = len; diff --git a/src/python.c b/src/python.c @@ -76,6 +76,10 @@ static void pysuspend_wakeup(void *, u_int64_t); static void pygather_reap_coro(struct pygather_op *, struct python_coro *); +static int pyhttp_iterobj_chunk_sent(struct netbuf *); +static int pyhttp_iterobj_next(struct pyhttp_iterobj *); +static void pyhttp_iterobj_disconnect(struct connection *); + #if defined(KORE_USE_PGSQL) static PyObject *pykore_pgsql_alloc(struct http_request *, const char *, const char *, PyObject *); @@ -175,6 +179,7 @@ static PyMemAllocatorEx allocator = { static TAILQ_HEAD(, pyproc) procs; static struct kore_pool coro_pool; +static struct kore_pool iterobj_pool; static struct kore_pool queue_wait_pool; static struct kore_pool gather_coro_pool; static struct kore_pool queue_object_pool; @@ -205,6 +210,8 @@ kore_python_init(void) kore_pool_init(&coro_pool, "coropool", sizeof(struct python_coro), 100); + kore_pool_init(&iterobj_pool, "iterobj_pool", + sizeof(struct pyhttp_iterobj), 100); kore_pool_init(&queue_wait_pool, "queue_wait_pool", sizeof(struct pyqueue_waiting), 100); kore_pool_init(&gather_coro_pool, "gather_coro_pool", @@ -3109,28 +3116,55 @@ pyhttp_file_alloc(struct http_file *file) static PyObject * pyhttp_response(struct pyhttp_request *pyreq, PyObject *args) { + struct connection *c; char *ptr; - PyObject *data; Py_ssize_t length; int status; + struct pyhttp_iterobj *iterobj; + PyObject *obj, *iterator; length = -1; - if (!PyArg_ParseTuple(args, "iS", &status, &data)) + if (!PyArg_ParseTuple(args, "iO", &status, &obj)) return (NULL); - if (PyBytes_AsStringAndSize(data, &ptr, &length) == -1) - return (NULL); + if (PyBytes_CheckExact(obj)) { + if (PyBytes_AsStringAndSize(obj, &ptr, &length) == -1) + return (NULL); - if (length < 0) { - PyErr_SetString(PyExc_TypeError, "invalid length"); - return (NULL); - } + if (length < 0) { + PyErr_SetString(PyExc_TypeError, "invalid length"); + return (NULL); + } + + Py_INCREF(obj); + + http_response_stream(pyreq->req, status, ptr, length, + pyhttp_response_sent, obj); + } else { + if ((iterator = PyObject_GetIter(obj)) == NULL) + return (NULL); + + c = pyreq->req->owner; + + iterobj = kore_pool_get(&iterobj_pool); + iterobj->iterator = iterator; + iterobj->connection = c; + iterobj->remove = 0; - Py_INCREF(data); + kore_buf_init(&iterobj->buf, 4096); - http_response_stream(pyreq->req, status, ptr, length, - pyhttp_response_sent, data); + c->hdlr_extra = iterobj; + c->flags |= CONN_IS_BUSY; + c->disconnect = pyhttp_iterobj_disconnect; + + pyreq->req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH; + http_response_header(pyreq->req, "transfer-encoding", + "chunked"); + + http_response(pyreq->req, status, NULL, 0); + pyhttp_iterobj_next(iterobj); + } Py_RETURN_TRUE; } @@ -3146,6 +3180,93 @@ pyhttp_response_sent(struct netbuf *nb) return (KORE_RESULT_OK); } +static int +pyhttp_iterobj_next(struct pyhttp_iterobj *iterobj) +{ + struct netbuf *nb; + PyObject *obj; + const char *ptr; + Py_ssize_t length; + + PyErr_Clear(); + + if ((obj = PyIter_Next(iterobj->iterator)) == NULL) { + if (PyErr_Occurred()) { + kore_python_log_error("pyhttp_iterobj_next"); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); + } + + if ((ptr = PyUnicode_AsUTF8AndSize(obj, &length)) == NULL) { + kore_python_log_error("pyhttp_iterobj_next"); + return (KORE_RESULT_ERROR); + } + + kore_buf_reset(&iterobj->buf); + kore_buf_appendf(&iterobj->buf, "%x\r\n", length); + kore_buf_append(&iterobj->buf, ptr, length); + kore_buf_appendf(&iterobj->buf, "\r\n"); + + Py_DECREF(obj); + + net_send_stream(iterobj->connection, iterobj->buf.data, + iterobj->buf.offset, pyhttp_iterobj_chunk_sent, &nb); + + nb->extra = iterobj; + + return (KORE_RESULT_RETRY); +} + +static int +pyhttp_iterobj_chunk_sent(struct netbuf *nb) +{ + int ret; + struct pyhttp_iterobj *iterobj; + + iterobj = nb->extra; + + if (iterobj->remove) { + ret = KORE_RESULT_ERROR; + } else { + ret = pyhttp_iterobj_next(iterobj); + } + + if (ret != KORE_RESULT_RETRY) { + iterobj->connection->hdlr_extra = NULL; + iterobj->connection->disconnect = NULL; + iterobj->connection->flags &= ~CONN_IS_BUSY; + + if (iterobj->remove == 0) + http_start_recv(iterobj->connection); + + kore_buf_reset(&iterobj->buf); + kore_buf_appendf(&iterobj->buf, "0\r\n\r\n"); + net_send_queue(iterobj->connection, + iterobj->buf.data, iterobj->buf.offset); + + Py_DECREF(iterobj->iterator); + + kore_buf_cleanup(&iterobj->buf); + kore_pool_put(&iterobj_pool, iterobj); + } else { + ret = KORE_RESULT_OK; + } + + return (ret); +} + +static void +pyhttp_iterobj_disconnect(struct connection *c) +{ + struct pyhttp_iterobj *iterobj; + + iterobj = c->hdlr_extra; + + iterobj->remove = 1; +} + static PyObject * pyhttp_response_header(struct pyhttp_request *pyreq, PyObject *args) {