kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 4247d3139c0ec5b557e62c81b21d231f81d43f74
parent 4922171d9693c02a2b38d8a65f910a12f21b8539
Author: Joris Vink <joris@coders.se>
Date:   Fri, 25 Oct 2013 14:22:29 +0200

Chain our sending netbufs together. Gives us a lot less SSL_write() calls.

Diffstat:
includes/kore.h | 9+++++++--
src/http.c | 36++++++++++++++++++++++++++----------
src/net.c | 112++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
src/spdy.c | 3+--
4 files changed, 108 insertions(+), 52 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -65,8 +65,9 @@ struct netbuf { u_int8_t *buf; - u_int32_t offset; - u_int32_t len; + u_int32_t s_off; + u_int32_t b_len; + u_int32_t m_len; u_int8_t type; u_int8_t flags; @@ -77,6 +78,8 @@ struct netbuf { TAILQ_ENTRY(netbuf) list; }; +TAILQ_HEAD(netbuf_head, netbuf); + #define KORE_TYPE_LISTENER 1 #define KORE_TYPE_CONNECTION 2 @@ -110,6 +113,7 @@ LIST_HEAD(listener_head, listener); #define CONN_WRITE_BLOCK 0x04 #define CONN_IDLE_TIMER_ACT 0x10 #define CONN_READ_BLOCK 0x20 +#define CONN_WILL_FLUSH 0x40 #define KORE_IDLE_TIMER_MAX 20000 @@ -151,6 +155,7 @@ struct connection { TAILQ_HEAD(, http_request) http_requests; TAILQ_ENTRY(connection) list; + TAILQ_ENTRY(connection) flush_list; }; #define HANDLER_TYPE_STATIC 1 diff --git a/src/http.c b/src/http.c @@ -112,10 +112,13 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, void http_process(void) { - struct http_request *req, *next; + struct connection *c, *cnext; struct kore_module_handle *hdlr; + TAILQ_HEAD(, connection) flush_list; + struct http_request *req, *next; int r, (*cb)(struct http_request *); + TAILQ_INIT(&flush_list); for (req = TAILQ_FIRST(&http_requests); req != NULL; req = next) { next = TAILQ_NEXT(req, list); @@ -144,9 +147,11 @@ http_process(void) switch (r) { case KORE_RESULT_OK: - r = net_send_flush(req->owner); - if (r == KORE_RESULT_ERROR) - kore_connection_disconnect(req->owner); + if (!(req->owner->flags & CONN_WILL_FLUSH)) { + req->owner->flags |= CONN_WILL_FLUSH; + TAILQ_INSERT_TAIL(&flush_list, + req->owner, flush_list); + } break; case KORE_RESULT_ERROR: kore_connection_disconnect(req->owner); @@ -163,6 +168,17 @@ http_process(void) http_request_count--; } } + + for (c = TAILQ_FIRST(&flush_list); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, flush_list); + TAILQ_REMOVE(&flush_list, c, flush_list); + + r = net_send_flush(c); + if (r == KORE_RESULT_ERROR) + kore_connection_disconnect(c); + else + c->flags &= ~CONN_WILL_FLUSH; + } } void @@ -371,10 +387,10 @@ http_header_recv(struct netbuf *nb) kore_debug("http_header_recv(%p)", nb); - if (nb->len < 4) + if (nb->b_len < 4) return (KORE_RESULT_OK); - end_headers = kore_mem_find(nb->buf, nb->offset, "\r\n\r\n", 4); + end_headers = kore_mem_find(nb->buf, nb->s_off, "\r\n\r\n", 4); if (end_headers == NULL) return (KORE_RESULT_OK); @@ -470,12 +486,12 @@ http_header_recv(struct netbuf *nb) req->post_data = kore_buf_create(clen); kore_buf_append(req->post_data, end_headers, - (nb->offset - len)); + (nb->s_off - len)); - bytes_left = clen - (nb->offset - len); + bytes_left = clen - (nb->s_off - len); if (bytes_left > 0) { kore_debug("%ld/%ld (%ld - %ld) more bytes for POST", - bytes_left, clen, nb->offset, len); + bytes_left, clen, nb->s_off, len); net_recv_queue(c, bytes_left, 0, &nnb, http_post_data_recv); nnb->extra = req; @@ -860,7 +876,7 @@ http_post_data_recv(struct netbuf *nb) { struct http_request *req = (struct http_request *)nb->extra; - kore_buf_append(req->post_data, nb->buf, nb->offset); + kore_buf_append(req->post_data, nb->buf, nb->s_off); req->flags |= HTTP_REQUEST_COMPLETE; kore_debug("post complete for request %p", req); diff --git a/src/net.c b/src/net.c @@ -17,32 +17,77 @@ #include "kore.h" struct kore_pool nb_pool; +struct kore_pool nsb_pool; void net_init(void) { kore_pool_init(&nb_pool, "nb_pool", sizeof(struct netbuf), 1000); + kore_pool_init(&nsb_pool, "nsb_pool", NETBUF_SEND_PAYLOAD_MAX, 25); } void net_send_queue(struct connection *c, u_int8_t *data, u_int32_t len) { + u_int8_t *p; struct netbuf *nb; + u_int32_t avail, i, payload; + + if (len > NETBUF_SEND_PAYLOAD_MAX) { + p = data; + avail = (len / NETBUF_SEND_PAYLOAD_MAX) + 1; + for (i = 0; i < avail && len != 0; i++) { + if (len > NETBUF_SEND_PAYLOAD_MAX) + payload = NETBUF_SEND_PAYLOAD_MAX; + else + payload = len; + + net_send_queue(c, p, payload); + + p += payload; + len -= payload; + } + + return; + } + + if (len > NETBUF_SEND_PAYLOAD_MAX) { + kore_log(LOG_NOTICE, + "net_send_queue(): %d > NETBUF_SEND_PAYLOAD_MAX", len); + kore_connection_disconnect(c); + return; + } + + nb = TAILQ_LAST(&(c->send_queue), netbuf_head); + if (nb != NULL && nb->b_len < nb->m_len) { + avail = nb->m_len - nb->b_len; + if (len < avail) { + memcpy(nb->buf + nb->b_len, data, len); + nb->b_len += len; + return; + } else if (len > avail) { + memcpy(nb->buf + nb->b_len, data, avail); + nb->b_len += avail; + + len -= avail; + data += avail; + if (len == 0) + return; + } + } nb = kore_pool_get(&nb_pool); nb->flags = 0; nb->cb = NULL; - nb->len = len; nb->owner = c; - nb->offset = 0; + nb->s_off = 0; + nb->b_len = len; nb->type = NETBUF_SEND; + nb->m_len = NETBUF_SEND_PAYLOAD_MAX; + nb->buf = kore_pool_get(&nsb_pool); - if (len > 0) { - nb->buf = kore_malloc(nb->len); - memcpy(nb->buf, data, nb->len); - } else { - nb->buf = NULL; - } + if (len > 0) + memcpy(nb->buf, data, nb->b_len); TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); } @@ -55,12 +100,13 @@ net_recv_queue(struct connection *c, size_t len, int flags, nb = kore_pool_get(&nb_pool); nb->cb = cb; - nb->len = len; + nb->b_len = len; + nb->m_len = len; nb->owner = c; - nb->offset = 0; + nb->s_off = 0; nb->flags = flags; nb->type = NETBUF_RECV; - nb->buf = kore_malloc(nb->len); + nb->buf = kore_malloc(nb->b_len); TAILQ_INSERT_TAIL(&(c->recv_queue), nb, list); if (out != NULL) @@ -77,8 +123,9 @@ net_recv_expand(struct connection *c, struct netbuf *nb, size_t len, } nb->cb = cb; - nb->len += len; - nb->buf = kore_realloc(nb->buf, nb->len); + nb->b_len += len; + nb->m_len = nb->b_len; + nb->buf = kore_realloc(nb->buf, nb->b_len); TAILQ_REMOVE(&(c->recv_queue), nb, list); TAILQ_INSERT_HEAD(&(c->recv_queue), nb, list); @@ -94,12 +141,12 @@ net_send(struct connection *c) while (!TAILQ_EMPTY(&(c->send_queue))) { nb = TAILQ_FIRST(&(c->send_queue)); - if (nb->len != 0) { + if (nb->b_len != 0) { r = SSL_write(c->ssl, - (nb->buf + nb->offset), (nb->len - nb->offset)); + (nb->buf + nb->s_off), (nb->b_len - nb->s_off)); - kore_debug("net_send(%ld/%ld bytes), progress with %d", - nb->offset, nb->len, r); + kore_debug("net_send(%d/%d bytes), progress with %d", + nb->s_off, nb->b_len, r); if (r <= 0) { r = SSL_get_error(c->ssl, r); @@ -115,25 +162,14 @@ net_send(struct connection *c) } } - nb->offset += (size_t)r; + nb->s_off += (size_t)r; } - if (nb->offset == nb->len) { + if (nb->s_off == nb->b_len) { TAILQ_REMOVE(&(c->send_queue), nb, list); - if (nb->cb != NULL) - r = nb->cb(nb); - else - r = KORE_RESULT_OK; - - if (nb->offset == nb->len) { - if (nb->buf != NULL) - kore_mem_free(nb->buf); - kore_pool_put(&nb_pool, nb); - } - - if (r != KORE_RESULT_OK) - return (r); + kore_pool_put(&nsb_pool, nb->buf); + kore_pool_put(&nb_pool, nb); } } @@ -168,10 +204,10 @@ net_recv(struct connection *c) } r = SSL_read(c->ssl, - (nb->buf + nb->offset), (nb->len - nb->offset)); + (nb->buf + nb->s_off), (nb->b_len - nb->s_off)); kore_debug("net_recv(%ld/%ld bytes), progress with %d", - nb->offset, nb->len, r); + nb->s_off, nb->b_len, r); if (r <= 0) { r = SSL_get_error(c->ssl, r); @@ -179,7 +215,7 @@ net_recv(struct connection *c) case SSL_ERROR_WANT_READ: c->flags &= ~CONN_READ_POSSIBLE; if (nb->flags & NETBUF_CALL_CB_ALWAYS && - nb-> offset > 0) + nb->s_off > 0) goto handle; return (KORE_RESULT_OK); case SSL_ERROR_WANT_WRITE: @@ -191,11 +227,11 @@ net_recv(struct connection *c) } } - nb->offset += (size_t)r; - if (nb->offset == nb->len) { + nb->s_off += (size_t)r; + if (nb->s_off == nb->b_len) { handle: r = nb->cb(nb); - if (nb->offset == nb->len || + if (nb->s_off == nb->b_len || (nb->flags & NETBUF_FORCE_REMOVE)) { TAILQ_REMOVE(&(c->recv_queue), nb, list); diff --git a/src/spdy.c b/src/spdy.c @@ -114,8 +114,7 @@ spdy_frame_recv(struct netbuf *nb) } if (r == KORE_RESULT_OK) { - net_recv_queue(c, SPDY_FRAME_SIZE, - 0, NULL, spdy_frame_recv); + net_recv_queue(c, SPDY_FRAME_SIZE, 0, NULL, spdy_frame_recv); } else { r = KORE_RESULT_OK; spdy_session_teardown(c, SPDY_SESSION_ERROR_PROTOCOL);