kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 4922171d9693c02a2b38d8a65f910a12f21b8539
parent ac025adaa75c6ea1c0d1b17df60ba6fb8d014c0e
Author: Joris Vink <joris@coders.se>
Date:   Fri, 25 Oct 2013 11:10:03 +0200

Change net_send_queue() in preparation for improving send performance.

No longer takes callbacks, flags, or *out arguments.

Update rest of the code that called these callbacks whenever sending
was completed, instead call them right away now.

Diffstat:
includes/kore.h | 7++++---
src/connection.c | 3+++
src/http.c | 26+++++++-------------------
src/net.c | 9+++------
src/spdy.c | 82++++++++++++++++++++++---------------------------------------------------------
5 files changed, 40 insertions(+), 87 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -58,6 +58,7 @@ #define NETBUF_RECV 0 #define NETBUF_SEND 1 +#define NETBUF_SEND_PAYLOAD_MAX 16384 #define NETBUF_CALL_CB_ALWAYS 0x01 #define NETBUF_FORCE_REMOVE 0x02 @@ -349,8 +350,7 @@ void net_recv_queue(struct connection *, size_t, int, struct netbuf **, int (*cb)(struct netbuf *)); int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); -void net_send_queue(struct connection *, u_int8_t *, size_t, int, - struct netbuf **, int (*cb)(struct netbuf *)); +void net_send_queue(struct connection *, u_int8_t *, u_int32_t); void kore_buf_free(struct kore_buf *); struct kore_buf *kore_buf_create(u_int32_t); @@ -363,9 +363,10 @@ void kore_buf_replace_string(struct kore_buf *, char *, void *, size_t); struct spdy_header_block *spdy_header_block_create(int); struct spdy_stream *spdy_stream_lookup(struct connection *, u_int32_t); -int spdy_frame_data_done(struct netbuf *); int spdy_stream_get_header(struct spdy_header_block *, char *, char **); +void spdy_update_wsize(struct connection *, + struct spdy_stream *, u_int32_t); int spdy_frame_recv(struct netbuf *); void spdy_session_teardown(struct connection *c, u_int8_t); diff --git a/src/connection.c b/src/connection.c @@ -209,6 +209,9 @@ kore_connection_remove(struct connection *c) kore_debug("kore_connection_remove(%p)", c); + /* XXX */ + net_send_flush(c); + if (c->ssl != NULL) SSL_free(c->ssl); close(c->fd); diff --git a/src/http.c b/src/http.c @@ -25,7 +25,6 @@ static char *http_status_text(int); static int http_post_data_recv(struct netbuf *); -static int http_send_done(struct netbuf *); static TAILQ_HEAD(, http_request) http_requests; static struct kore_pool http_request_pool; @@ -242,7 +241,6 @@ http_request_free(struct http_request *req) int http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) { - struct netbuf *nb; u_int32_t hlen; struct http_header *hdr; struct kore_buf *buf; @@ -281,15 +279,14 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) spdy_frame_send(req->owner, SPDY_CTRL_FRAME_SYN_REPLY, 0, hlen, req->stream, 0); - net_send_queue(req->owner, htext, hlen, 0, NULL, NULL); + net_send_queue(req->owner, htext, hlen); kore_mem_free(htext); if (len > 0) { spdy_frame_send(req->owner, SPDY_DATA_FRAME, 0, len, req->stream, 0); - net_send_queue(req->owner, d, len, 0, &nb, - spdy_frame_data_done); - nb->extra = req->stream; + net_send_queue(req->owner, d, len); + spdy_update_wsize(req->owner, req->stream, len); } spdy_frame_send(req->owner, SPDY_DATA_FRAME, @@ -323,10 +320,12 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) kore_buf_append(buf, "\r\n", 2); htext = kore_buf_release(buf, &hlen); - net_send_queue(req->owner, htext, hlen, 0, NULL, NULL); + net_send_queue(req->owner, htext, hlen); kore_mem_free(htext); - net_send_queue(req->owner, d, len, 0, NULL, http_send_done); + net_send_queue(req->owner, d, len); + net_recv_queue(req->owner, http_header_max, + NETBUF_CALL_CB_ALWAYS, NULL, http_header_recv); } return (KORE_RESULT_OK); @@ -869,17 +868,6 @@ http_post_data_recv(struct netbuf *nb) return (KORE_RESULT_OK); } -static int -http_send_done(struct netbuf *nb) -{ - struct connection *c = (struct connection *)nb->owner; - - net_recv_queue(c, http_header_max, - NETBUF_CALL_CB_ALWAYS, NULL, http_header_recv); - - return (KORE_RESULT_OK); -} - static char * http_status_text(int status) { diff --git a/src/net.c b/src/net.c @@ -25,17 +25,16 @@ net_init(void) } void -net_send_queue(struct connection *c, u_int8_t *data, size_t len, int flags, - struct netbuf **out, int (*cb)(struct netbuf *)) +net_send_queue(struct connection *c, u_int8_t *data, u_int32_t len) { struct netbuf *nb; nb = kore_pool_get(&nb_pool); - nb->cb = cb; + nb->flags = 0; + nb->cb = NULL; nb->len = len; nb->owner = c; nb->offset = 0; - nb->flags = flags; nb->type = NETBUF_SEND; if (len > 0) { @@ -46,8 +45,6 @@ net_send_queue(struct connection *c, u_int8_t *data, size_t len, int flags, } TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); - if (out != NULL) - *out = nb; } void diff --git a/src/spdy.c b/src/spdy.c @@ -26,11 +26,7 @@ static int spdy_ctrl_frame_settings(struct netbuf *); static int spdy_ctrl_frame_ping(struct netbuf *); static int spdy_ctrl_frame_window(struct netbuf *); static int spdy_data_frame_recv(struct netbuf *); -static int spdy_frame_send_done(struct netbuf *); -static int spdy_goaway_send_done(struct netbuf *); -static void spdy_update_wsize(struct connection *, - struct spdy_stream *, u_int32_t); static void spdy_stream_close(struct connection *, struct spdy_stream *); static int spdy_zlib_inflate(struct connection *, u_int8_t *, @@ -132,7 +128,6 @@ void spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, u_int32_t len, struct spdy_stream *s, u_int32_t misc) { - struct netbuf *nnb; u_int8_t nb[16]; u_int32_t length; @@ -188,10 +183,11 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, } if (s != NULL && type == SPDY_DATA_FRAME) { - net_send_queue(c, nb, length, 0, &nnb, spdy_frame_send_done); - nnb->extra = s; + net_send_queue(c, nb, length); + if ((flags & FLAG_FIN) && (s->flags & FLAG_FIN)) + spdy_stream_close(c, s); } else { - net_send_queue(c, nb, length, 0, NULL, NULL); + net_send_queue(c, nb, length); } } @@ -333,16 +329,6 @@ spdy_stream_get_header(struct spdy_header_block *s, char *header, char **out) return (KORE_RESULT_ERROR); } -int -spdy_frame_data_done(struct netbuf *nb) -{ - struct connection *c = (struct connection *)nb->owner; - struct spdy_stream *s = (struct spdy_stream *)nb->extra; - - spdy_update_wsize(c, s, nb->len); - return (KORE_RESULT_OK); -} - void spdy_session_teardown(struct connection *c, u_int8_t err) { @@ -354,12 +340,30 @@ spdy_session_teardown(struct connection *c, u_int8_t err) net_write32((u_int8_t *)&d[4], err); spdy_frame_send(c, SPDY_CTRL_FRAME_GOAWAY, 0, 8, NULL, 0); - net_send_queue(c, d, sizeof(d), 0, NULL, spdy_goaway_send_done); + net_send_queue(c, d, sizeof(d)); c->flags &= ~CONN_READ_POSSIBLE; c->flags |= CONN_READ_BLOCK; net_send_flush(c); + kore_connection_disconnect(c); +} + +void +spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) +{ + s->send_wsize -= len; + kore_debug("spdy_update_wsize(): stream %d, window size %d", + s->stream_id, s->send_wsize); + + if (s->send_wsize <= 0) { + kore_debug("window size <= 0 for stream %d", s->stream_id); + c->flags &= ~CONN_WRITE_POSSIBLE; + c->flags |= CONN_WRITE_BLOCK; + + c->idle_timer.length = 5000; + kore_connection_start_idletimer(c); + } } static int @@ -700,46 +704,6 @@ spdy_stream_close(struct connection *c, struct spdy_stream *s) } static int -spdy_frame_send_done(struct netbuf *nb) -{ - u_int8_t flags; - struct connection *c = (struct connection *)nb->owner; - struct spdy_stream *s = (struct spdy_stream *)nb->extra; - - flags = *(u_int8_t *)(nb->buf + 4); - if ((flags & FLAG_FIN) && (s->flags & FLAG_FIN)) - spdy_stream_close(c, s); - - return (KORE_RESULT_OK); -} - -static int -spdy_goaway_send_done(struct netbuf *nb) -{ - struct connection *c = (struct connection *)nb->owner; - - kore_connection_disconnect(c); - return (KORE_RESULT_OK); -} - -static void -spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) -{ - s->send_wsize -= len; - kore_debug("spdy_update_wsize(): stream %d, window size %d", - s->stream_id, s->send_wsize); - - if (s->send_wsize <= 0) { - kore_debug("window size <= 0 for stream %d", s->stream_id); - c->flags &= ~CONN_WRITE_POSSIBLE; - c->flags |= CONN_WRITE_BLOCK; - - c->idle_timer.length = 5000; - kore_connection_start_idletimer(c); - } -} - -static int spdy_zlib_inflate(struct connection *c, u_int8_t *src, size_t len, u_int8_t **dst, u_int32_t *olen) {