kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit b64f674db22fa27d061c82046064a9c2b0c2b8f7
parent 517de467906dfb4ff412fa3a0dbda62aa84677a4
Author: Joris Vink <joris@coders.se>
Date:   Tue, 14 Jan 2014 21:43:45 +0100

Handle SPDY streams better when sending data.

Diffstat:
includes/kore.h | 12++++++++----
includes/spdy.h | 5+++++
src/http.c | 10+++++-----
src/net.c | 10++++++++--
src/spdy.c | 25+++++++++++++++++--------
5 files changed, 43 insertions(+), 19 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -66,6 +66,10 @@ #define NETBUF_CALL_CB_ALWAYS 0x01 #define NETBUF_FORCE_REMOVE 0x02 +/* XXX hackish. */ +struct http_request; +struct spdy_stream; + struct netbuf { u_int8_t *buf; u_int32_t s_off; @@ -75,6 +79,8 @@ struct netbuf { u_int8_t flags; void *owner; + struct spdy_stream *stream; + void *extra; int (*cb)(struct netbuf *); @@ -119,9 +125,6 @@ LIST_HEAD(listener_head, listener); #define KORE_IDLE_TIMER_MAX 20000 -/* XXX hackish. */ -struct http_request; - struct connection { u_int8_t type; int fd; @@ -404,7 +407,8 @@ void net_recv_queue(struct connection *, size_t, int, struct netbuf **, int (*cb)(struct netbuf *)); int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); -void net_send_queue(struct connection *, u_int8_t *, u_int32_t); +void net_send_queue(struct connection *, u_int8_t *, + u_int32_t, struct spdy_stream *); void kore_buf_free(struct kore_buf *); struct kore_buf *kore_buf_create(u_int32_t); diff --git a/includes/spdy.h b/includes/spdy.h @@ -53,6 +53,8 @@ struct spdy_stream { u_int8_t flags; u_int8_t prio; u_int64_t post_size; + u_int64_t send_size; + int32_t recv_wsize; int32_t send_wsize; @@ -105,4 +107,7 @@ extern const unsigned char SPDY_dictionary_txt[]; #define SPDY_HBLOCK_NORMAL 0 #define SPDY_HBLOCK_DELAYED_ALLOC 1 +/* internal flags (make sure they don't clash with SPDY stream flags) */ +#define SPDY_KORE_FIN 0x10 + #endif /* !__H_SPDY_H */ diff --git a/src/http.c b/src/http.c @@ -302,14 +302,14 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) spdy_frame_send(req->owner, SPDY_CTRL_FRAME_SYN_REPLY, 0, hlen, req->stream, 0); - net_send_queue(req->owner, htext, hlen); + net_send_queue(req->owner, htext, hlen, NULL); kore_mem_free(htext); if (len > 0) { + req->stream->send_size = len; spdy_frame_send(req->owner, SPDY_DATA_FRAME, 0, len, req->stream, 0); - net_send_queue(req->owner, d, len); - spdy_update_wsize(req->owner, req->stream, len); + net_send_queue(req->owner, d, len, req->stream); } spdy_frame_send(req->owner, SPDY_DATA_FRAME, @@ -345,10 +345,10 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) kore_buf_append(buf, "\r\n", 2); htext = kore_buf_release(buf, &hlen); - net_send_queue(req->owner, htext, hlen); + net_send_queue(req->owner, htext, hlen, NULL); kore_mem_free(htext); - net_send_queue(req->owner, d, len); + net_send_queue(req->owner, d, len, NULL); net_recv_queue(req->owner, http_header_max, NETBUF_CALL_CB_ALWAYS, NULL, http_header_recv); } diff --git a/src/net.c b/src/net.c @@ -27,13 +27,14 @@ net_init(void) } void -net_send_queue(struct connection *c, u_int8_t *data, u_int32_t len) +net_send_queue(struct connection *c, u_int8_t *data, u_int32_t len, + struct spdy_stream *s) { struct netbuf *nb; u_int32_t avail; nb = TAILQ_LAST(&(c->send_queue), netbuf_head); - if (nb != NULL && nb->b_len < nb->m_len) { + if (nb != NULL && nb->b_len < nb->m_len && nb->stream == s) { avail = nb->m_len - nb->b_len; if (len < avail) { memcpy(nb->buf + nb->b_len, data, len); @@ -55,6 +56,7 @@ net_send_queue(struct connection *c, u_int8_t *data, u_int32_t len) nb->cb = NULL; nb->owner = c; nb->s_off = 0; + nb->stream = s; nb->b_len = len; nb->type = NETBUF_SEND; @@ -82,6 +84,7 @@ net_recv_queue(struct connection *c, size_t len, int flags, nb->m_len = len; nb->owner = c; nb->s_off = 0; + nb->stream = NULL; nb->flags = flags; nb->type = NETBUF_RECV; nb->buf = kore_malloc(nb->b_len); @@ -143,6 +146,9 @@ net_send(struct connection *c) } nb->s_off += (size_t)r; + + if (nb->stream != NULL) + spdy_update_wsize(c, nb->stream, r); } if (nb->s_off == nb->b_len) { diff --git a/src/spdy.c b/src/spdy.c @@ -181,13 +181,10 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, break; } - if (s != NULL && type == SPDY_DATA_FRAME) { - net_send_queue(c, nb, length); - if ((flags & FLAG_FIN) && (s->flags & FLAG_FIN)) - spdy_stream_close(c, s); - } else { - net_send_queue(c, nb, length); - } + if (s != NULL && type == SPDY_DATA_FRAME && (flags & FLAG_FIN)) + s->flags |= SPDY_KORE_FIN; + + net_send_queue(c, nb, length, NULL); } struct spdy_stream * @@ -339,7 +336,7 @@ spdy_session_teardown(struct connection *c, u_int8_t err) net_write32((u_int8_t *)&d[4], err); spdy_frame_send(c, SPDY_CTRL_FRAME_GOAWAY, 0, 8, NULL, 0); - net_send_queue(c, d, sizeof(d)); + net_send_queue(c, d, sizeof(d), NULL); c->flags &= ~CONN_READ_POSSIBLE; c->flags |= CONN_READ_BLOCK; @@ -352,6 +349,17 @@ void spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) { s->send_wsize -= len; + s->send_size -= (u_int64_t)len; + + if (s->send_size == 0) { + if (s->flags & (SPDY_KORE_FIN | FLAG_FIN)) { + spdy_stream_close(c, s); + return; + } + + kore_log(LOG_NOTICE, "A spdy stream was empty but not closed"); + } + kore_debug("spdy_update_wsize(): stream %d, window size %d", s->stream_id, s->send_wsize); @@ -412,6 +420,7 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) } s = kore_malloc(sizeof(*s)); + s->send_size = 0; s->httpreq = NULL; s->prio = syn.prio; s->flags = ctrl.flags;