kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 5cca2f1f7893883b6d7369793b8cbe0d6d6193e2
parent d2c65b4f624879cfc20dafc08382d964304f4234
Author: Joris Vink <joris@coders.se>
Date:   Fri,  8 Aug 2014 14:18:15 +0200

properly implement spdy/3.1 flow control

Diffstat:
includes/kore.h | 16+++++++++++++---
includes/spdy.h | 10++++++----
src/connection.c | 1+
src/http.c | 13+++++--------
src/net.c | 266++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
src/spdy.c | 195++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
6 files changed, 325 insertions(+), 176 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -70,7 +70,10 @@ extern int daemon(int, int); #define NETBUF_RECV 0 #define NETBUF_SEND 1 -#define NETBUF_SEND_PAYLOAD_MAX 4000 +#define NETBUF_SEND_PAYLOAD_MAX 8192 + +#define NETBUF_LAST_CHAIN 0 +#define NETBUF_BEFORE_CHAIN 1 #define NETBUF_CALL_CB_ALWAYS 0x01 #define NETBUF_FORCE_REMOVE 0x02 @@ -142,6 +145,7 @@ LIST_HEAD(listener_head, listener); #define CONN_IDLE_TIMER_ACT 0x10 #define CONN_READ_BLOCK 0x20 #define CONN_CLOSE_EMPTY 0x40 +#define SPDY_CONN_GOAWAY 0x80 #define KORE_IDLE_TIMER_MAX 20000 @@ -171,10 +175,14 @@ struct connection { z_stream z_inflate; u_int8_t deflate_started; z_stream z_deflate; + u_int32_t wsize_initial; + u_int32_t spdy_send_wsize; struct netbuf_head send_queue; + struct netbuf *snb; struct netbuf_head recv_queue; + struct netbuf *rnb; u_int32_t client_stream_id; TAILQ_HEAD(, spdy_stream) spdy_streams; @@ -453,7 +461,7 @@ void net_recv_queue(struct connection *, size_t, int, int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); void net_send_queue(struct connection *, void *, - u_int32_t, struct spdy_stream *); + u_int32_t, struct spdy_stream *, int); void net_send_stream(struct connection *, void *, u_int32_t, struct spdy_stream *); @@ -466,7 +474,6 @@ void kore_buf_appendv(struct kore_buf *, const char *, va_list); void kore_buf_appendb(struct kore_buf *, struct kore_buf *); void kore_buf_replace_string(struct kore_buf *, char *, void *, size_t); -struct spdy_header_block *spdy_header_block_create(int); struct spdy_stream *spdy_stream_lookup(struct connection *, u_int32_t); int spdy_stream_get_header(struct spdy_header_block *, const char *, char **); @@ -474,6 +481,7 @@ void spdy_update_wsize(struct connection *, struct spdy_stream *, u_int32_t); int spdy_frame_recv(struct netbuf *); +int spdy_dataframe_begin(struct connection *); void spdy_session_teardown(struct connection *c, u_int8_t); void spdy_frame_send(struct connection *, u_int16_t, u_int8_t, u_int32_t, struct spdy_stream *, u_int32_t); @@ -482,4 +490,6 @@ void spdy_header_block_add(struct spdy_header_block *, u_int8_t *spdy_header_block_release(struct connection *, struct spdy_header_block *, u_int32_t *); +struct spdy_header_block *spdy_header_block_create(int); + #endif /* !__H_KORE_H */ diff --git a/includes/spdy.h b/includes/spdy.h @@ -54,9 +54,9 @@ struct spdy_stream { u_int8_t prio; u_int64_t post_size; u_int64_t send_size; - - int32_t recv_wsize; - int32_t send_wsize; + u_int32_t frame_size; + u_int32_t recv_wsize; + u_int32_t send_wsize; void *httpreq; struct spdy_header_block *hblock; @@ -66,7 +66,7 @@ struct spdy_stream { extern const unsigned char SPDY_dictionary_txt[]; -#define KORE_SSL_PROTO_STRING "\x08spdy/3.2\x08http/1.1" +#define KORE_SSL_PROTO_STRING "\x08spdy/3.1\x08http/1.1" #define SPDY_CONTROL_FRAME(x) ((x & (1 << 31))) #define SPDY_FRAME_SIZE 8 @@ -109,5 +109,7 @@ extern const unsigned char SPDY_dictionary_txt[]; /* internal flags (make sure they don't clash with SPDY stream flags) */ #define SPDY_KORE_FIN 0x10 +#define SPDY_DATAFRAME_PRELUDE 0x20 +#define SPDY_STREAM_BLOCKING 0x40 #endif /* !__H_SPDY_H */ diff --git a/src/connection.c b/src/connection.c @@ -77,6 +77,7 @@ kore_connection_accept(struct listener *l, struct connection **out) c->client_stream_id = 0; c->proto = CONN_PROTO_UNKNOWN; c->wsize_initial = SPDY_INIT_WSIZE; + c->spdy_send_wsize = SPDY_INIT_WSIZE; c->idle_timer.start = 0; c->idle_timer.length = KORE_IDLE_TIMER_MAX; diff --git a/src/http.c b/src/http.c @@ -385,8 +385,6 @@ http_response(struct http_request *req, int status, void *d, u_int32_t l) switch (req->owner->proto) { case CONN_PROTO_SPDY: http_response_spdy(req, req->owner, req->stream, status, d, l); - spdy_frame_send(req->owner, SPDY_DATA_FRAME, - FLAG_FIN, 0, req->stream, 0); break; case CONN_PROTO_HTTP: http_response_normal(req, req->owner, status, d, l); @@ -1029,7 +1027,6 @@ http_error_response(struct connection *c, struct spdy_stream *s, int status) switch (c->proto) { case CONN_PROTO_SPDY: http_response_spdy(NULL, c, s, status, NULL, 0); - spdy_frame_send(c, SPDY_DATA_FRAME, FLAG_FIN, 0, s, 0); break; case CONN_PROTO_HTTP: if (s != NULL) @@ -1083,15 +1080,15 @@ http_response_spdy(struct http_request *req, struct connection *c, } spdy_frame_send(c, SPDY_CTRL_FRAME_SYN_REPLY, 0, hlen, s, 0); - net_send_queue(c, htext, hlen, NULL); + net_send_queue(c, htext, hlen, NULL, NETBUF_LAST_CHAIN); kore_mem_free(htext); if (len > 0) { req->stream->send_size += len; - spdy_frame_send(c, SPDY_DATA_FRAME, 0, len, s, 0); + req->stream->flags |= SPDY_DATAFRAME_PRELUDE; if (d != NULL) - net_send_queue(c, d, len, s); + net_send_queue(c, d, len, s, NETBUF_LAST_CHAIN); } } @@ -1158,11 +1155,11 @@ http_response_normal(struct http_request *req, struct connection *c, } htext = kore_buf_release(buf, &hlen); - net_send_queue(c, htext, hlen, NULL); + net_send_queue(c, htext, hlen, NULL, NETBUF_LAST_CHAIN); kore_mem_free(htext); if (d != NULL) - net_send_queue(c, d, len, NULL); + net_send_queue(c, d, len, NULL, NETBUF_LAST_CHAIN); if (!(c->flags & CONN_CLOSE_EMPTY)) { net_recv_queue(c, http_header_max, diff --git a/src/net.c b/src/net.c @@ -28,28 +28,34 @@ net_init(void) void net_send_queue(struct connection *c, void *data, u_int32_t len, - struct spdy_stream *s) + struct spdy_stream *s, int before) { u_int8_t *d; struct netbuf *nb; u_int32_t avail; + kore_debug("net_send_queue(%p, %p, %d, %p, %d)", + c, data, len, s, before); + d = data; - nb = TAILQ_LAST(&(c->send_queue), netbuf_head); - if (nb != NULL && nb->b_len < nb->m_len && nb->stream == s) { - avail = nb->m_len - nb->b_len; - if (len < avail) { - memcpy(nb->buf + nb->b_len, d, len); - nb->b_len += len; - return; - } else if (len > avail) { - memcpy(nb->buf + nb->b_len, d, avail); - nb->b_len += avail; - - len -= avail; - d += avail; - if (len == 0) + if (before == NETBUF_LAST_CHAIN) { + nb = TAILQ_LAST(&(c->send_queue), netbuf_head); + if (nb != NULL && !(nb->flags & NETBUF_IS_STREAM) && + nb->stream == s && nb->b_len < nb->m_len) { + avail = nb->m_len - nb->b_len; + if (len < avail) { + memcpy(nb->buf + nb->b_len, d, len); + nb->b_len += len; return; + } else if (len > avail) { + memcpy(nb->buf + nb->b_len, d, avail); + nb->b_len += avail; + + len -= avail; + d += avail; + if (len == 0) + return; + } } } @@ -71,7 +77,12 @@ net_send_queue(struct connection *c, void *data, u_int32_t len, if (len > 0) memcpy(nb->buf, d, nb->b_len); - TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); + if (before == NETBUF_BEFORE_CHAIN) { + kore_debug("net_send_queue(): %p->new", c->snb); + TAILQ_INSERT_BEFORE(c->snb, nb, list); + } else { + TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); + } } void @@ -80,6 +91,8 @@ net_send_stream(struct connection *c, void *data, u_int32_t len, { struct netbuf *nb; + kore_debug("net_send_stream(%p, %p, %d, %p)", c, data, len, s); + nb = kore_pool_get(&nb_pool); nb->cb = NULL; nb->owner = c; @@ -140,63 +153,94 @@ int net_send(struct connection *c) { int r; - struct netbuf *nb; - u_int32_t len; + u_int32_t len, smin; + struct netbuf *nb, *next; + + c->snb = TAILQ_FIRST(&(c->send_queue)); + kore_debug("net_send(%p, %d, %d)", c->snb, c->snb->s_off, c->snb->b_len); + if (c->snb->b_len != 0) { + if (c->snb->stream != NULL && + (c->snb->stream->flags & SPDY_DATAFRAME_PRELUDE)) { + if (!spdy_dataframe_begin(c)) { + c->snb = NULL; + return (KORE_RESULT_OK); + } - while (!TAILQ_EMPTY(&(c->send_queue))) { - nb = TAILQ_FIRST(&(c->send_queue)); - if (nb->b_len != 0) { - len = MIN(NETBUF_SEND_PAYLOAD_MAX, - nb->b_len - nb->s_off); + c->snb = TAILQ_FIRST(&(c->send_queue)); + kore_debug("after (%p, %d, %d)", + c->snb, c->snb->s_off, c->snb->b_len); + } + + if (c->snb->stream != NULL && + (c->snb->stream->flags & SPDY_STREAM_BLOCKING)) { + kore_debug("stream block, resorting", c->snb); + for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; + nb = next) { + next = TAILQ_NEXT(nb, list); + if (nb->stream != c->snb->stream) + continue; + + TAILQ_REMOVE(&(c->send_queue), nb, list); + TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); + } + + c->snb = NULL; + return (KORE_RESULT_OK); + } + + smin = c->snb->b_len - c->snb->s_off; + if (c->snb->stream != NULL && + c->snb->stream->frame_size > 0) { + smin = MIN(smin, c->snb->stream->frame_size); + } + + len = MIN(NETBUF_SEND_PAYLOAD_MAX, smin); + kore_debug("chosen len: %d", len); #if !defined(KORE_BENCHMARK) - r = SSL_write(c->ssl, (nb->buf + nb->s_off), len); - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - nb->flags |= NETBUF_MUST_RESEND; - c->flags &= ~CONN_WRITE_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("SSL_write(): %s", - ssl_errno_s); - return (KORE_RESULT_ERROR); - } + r = SSL_write(c->ssl, + (c->snb->buf + c->snb->s_off), len); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + c->snb->flags |= NETBUF_MUST_RESEND; + c->flags &= ~CONN_WRITE_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("SSL_write(): %s", + ssl_errno_s); + return (KORE_RESULT_ERROR); } + } #else - r = write(c->fd, (nb->buf + nb->s_off), len); - if (r <= -1) { - switch (errno) { - case EINTR: - case EAGAIN: - c->flags &= ~CONN_WRITE_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("write: %s", errno_s); - return (KORE_RESULT_ERROR); - } + r = write(c->fd, (c->snb->buf + c->snb->s_off), len); + if (r <= -1) { + switch (errno) { + case EINTR: + case EAGAIN: + c->flags &= ~CONN_WRITE_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("write: %s", errno_s); + return (KORE_RESULT_ERROR); } -#endif - kore_debug("net_send(%p/%d/%d bytes), progress with %d", - nb, nb->s_off, nb->b_len, r); - - nb->s_off += (size_t)r; - nb->flags &= ~NETBUF_MUST_RESEND; - if (nb->stream != NULL) - spdy_update_wsize(c, nb->stream, r); } +#endif + kore_debug("net_send(%p/%d/%d bytes), progress with %d", + c->snb, c->snb->s_off, c->snb->b_len, r); - if (nb->s_off == nb->b_len) { - if (nb->stream != NULL && - (nb->flags & NETBUF_IS_STREAM)) { - spdy_frame_send(c, SPDY_DATA_FRAME, - FLAG_FIN, 0, nb->stream, 0); - } + c->snb->s_off += (size_t)r; + c->snb->flags &= ~NETBUF_MUST_RESEND; + if (c->snb->stream != NULL) + spdy_update_wsize(c, c->snb->stream, r); + } - net_remove_netbuf(&(c->send_queue), nb); - } + if (c->snb->s_off == c->snb->b_len || + (c->snb->flags & NETBUF_FORCE_REMOVE)) { + net_remove_netbuf(&(c->send_queue), c->snb); + c->snb = NULL; } return (KORE_RESULT_OK); @@ -223,59 +267,59 @@ int net_recv(struct connection *c) { int r; - struct netbuf *nb; - while (!TAILQ_EMPTY(&(c->recv_queue))) { - nb = TAILQ_FIRST(&(c->recv_queue)); - if (nb->cb == NULL) { - kore_debug("kore_read_client(): nb->cb == NULL"); - return (KORE_RESULT_ERROR); - } + c->rnb = TAILQ_FIRST(&(c->recv_queue)); + if (c->rnb == NULL) { + kore_debug("kore_read_client(): nb->cb == NULL"); + return (KORE_RESULT_ERROR); + } #if !defined(KORE_BENCHMARK) - r = SSL_read(c->ssl, - (nb->buf + nb->s_off), (nb->b_len - nb->s_off)); - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - c->flags &= ~CONN_READ_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("SSL_read(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } + r = SSL_read(c->ssl, + (c->rnb->buf + c->rnb->s_off), + (c->rnb->b_len - c->rnb->s_off)); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + c->flags &= ~CONN_READ_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("SSL_read(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); } + } #else - r = read(c->fd, (nb->buf + nb->s_off), (nb->b_len - nb->s_off)); - if (r <= 0) { - switch (errno) { - case EINTR: - case EAGAIN: - c->flags &= ~CONN_READ_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("read(): %s", errno_s); - return (KORE_RESULT_ERROR); - } + r = read(c->fd, (c->rnb->buf + c->rnb->s_off), + (c->rnb->b_len - c->rnb->s_off)); + if (r <= 0) { + switch (errno) { + case EINTR: + case EAGAIN: + c->flags &= ~CONN_READ_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("read(): %s", errno_s); + return (KORE_RESULT_ERROR); } + } #endif - kore_debug("net_recv(%ld/%ld bytes), progress with %d", - nb->s_off, nb->b_len, r); - - nb->s_off += (size_t)r; - if (nb->s_off == nb->b_len || - (nb->flags & NETBUF_CALL_CB_ALWAYS)) { - r = nb->cb(nb); - if (nb->s_off == nb->b_len || - (nb->flags & NETBUF_FORCE_REMOVE)) { - net_remove_netbuf(&(c->recv_queue), nb); - } - - if (r != KORE_RESULT_OK) - return (r); + kore_debug("net_recv(%ld/%ld bytes), progress with %d", + c->rnb->s_off, c->rnb->b_len, r); + + c->rnb->s_off += (size_t)r; + if (c->rnb->s_off == c->rnb->b_len || + (c->rnb->flags & NETBUF_CALL_CB_ALWAYS)) { + r = c->rnb->cb(c->rnb); + if (c->rnb->s_off == c->rnb->b_len || + (c->rnb->flags & NETBUF_FORCE_REMOVE)) { + net_remove_netbuf(&(c->recv_queue), c->rnb); + c->rnb = NULL; } + + if (r != KORE_RESULT_OK) + return (r); } return (KORE_RESULT_OK); @@ -298,10 +342,12 @@ net_recv_flush(struct connection *c) void net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) { - nb->stream = NULL; + kore_debug("net_remove_netbuf(%p, %p, %p)", list, nb, nb->stream); + nb->stream = NULL; if (nb->flags & NETBUF_MUST_RESEND) { kore_debug("retaining %p (MUST_RESEND)", nb); + nb->flags |= NETBUF_FORCE_REMOVE; return; } diff --git a/src/spdy.c b/src/spdy.c @@ -14,6 +14,8 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ +#include <sys/param.h> + #include <limits.h> #include "spdy.h" @@ -28,8 +30,12 @@ static int spdy_ctrl_frame_rst_stream(struct netbuf *); static int spdy_ctrl_frame_settings(struct netbuf *); static int spdy_ctrl_frame_ping(struct netbuf *); static int spdy_ctrl_frame_window(struct netbuf *); +static int spdy_ctrl_frame_goaway(struct netbuf *); static int spdy_data_frame_recv(struct netbuf *); +static void spdy_block_write(struct connection *); +static void spdy_enable_write(struct connection *); + static void spdy_stream_close(struct connection *, struct spdy_stream *, int); static int spdy_zlib_inflate(struct connection *, u_int8_t *, @@ -38,7 +44,7 @@ static int spdy_zlib_deflate(struct connection *, u_int8_t *, size_t, u_int8_t **, u_int32_t *); u_int64_t spdy_idle_time = 120000; -int32_t spdy_recv_wsize = 65536; +u_int32_t spdy_recv_wsize = 65536; int spdy_frame_recv(struct netbuf *nb) @@ -52,17 +58,12 @@ spdy_frame_recv(struct netbuf *nb) kore_debug("spdy_frame_recv(%p)", nb); if (SPDY_CONTROL_FRAME(net_read32(nb->buf))) { - kore_debug("received control frame"); - ctrl.version = net_read16(nb->buf) & 0x7fff; ctrl.type = net_read16(nb->buf + 2); ctrl.flags = *(u_int8_t *)(nb->buf + 4); ctrl.length = net_read32(nb->buf + 4) & 0xffffff; - kore_debug("type is %u", ctrl.type); - kore_debug("version is %u", ctrl.version); - kore_debug("length is %u", ctrl.length); - kore_debug("flags are %u", ctrl.flags); + kore_debug("received control frame %d", ctrl.type); if ((int)ctrl.length < 0) { spdy_session_teardown(c, SPDY_SESSION_ERROR_PROTOCOL); @@ -93,6 +94,9 @@ spdy_frame_recv(struct netbuf *nb) case SPDY_CTRL_FRAME_WINDOW: cb = spdy_ctrl_frame_window; break; + case SPDY_CTRL_FRAME_GOAWAY: + cb = spdy_ctrl_frame_goaway; + break; default: cb = NULL; break; @@ -107,8 +111,13 @@ spdy_frame_recv(struct netbuf *nb) } else { data.stream_id = net_read32(nb->buf) & ~(1 << 31); if ((s = spdy_stream_lookup(c, data.stream_id)) == NULL) { - kore_debug("recv data frame for non existing stream"); - r = KORE_RESULT_ERROR; + if (!(c->flags & SPDY_CONN_GOAWAY)) { + kore_debug("recv dataframe for bad stream: %u", + data.stream_id); + r = KORE_RESULT_ERROR; + } else { + r = KORE_RESULT_OK; + } } else if (s->flags & FLAG_FIN) { kore_debug("received data frame but FLAG_FIN was set"); r = KORE_RESULT_ERROR; @@ -134,6 +143,33 @@ spdy_frame_recv(struct netbuf *nb) return (r); } +int +spdy_dataframe_begin(struct connection *c) +{ + struct spdy_stream *s = c->snb->stream; + + if (s->frame_size != 0 || s->send_size == 0) { + fatal("spdy_dataframe_begin(): s:%u fz:%d - sz:%d", + s->stream_id, s->frame_size, s->send_size); + } + + if ((int)s->send_wsize <= 0 || (int)c->spdy_send_wsize <= 0) { + kore_debug("no space for new dataframe right now"); + spdy_block_write(c); + return (KORE_RESULT_ERROR); + } + + s->frame_size = MIN(NETBUF_SEND_PAYLOAD_MAX, s->send_size); + + kore_debug("spdy_dataframe_begin(): %u: fz:%d wz:%d cwz:%d", + s->stream_id, s->frame_size, s->send_size, c->spdy_send_wsize); + + s->flags &= ~SPDY_DATAFRAME_PRELUDE; + spdy_frame_send(c, SPDY_DATA_FRAME, 0, s->frame_size, s, 0); + + return (KORE_RESULT_OK); +} + void spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, u_int32_t len, struct spdy_stream *s, u_int32_t misc) @@ -144,6 +180,15 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, kore_debug("spdy_frame_send(%p, %u, %u, %u, %p, %u)", c, type, flags, len, s, misc); + switch (type) { + case SPDY_CTRL_FRAME_SYN_REPLY: + case SPDY_CTRL_FRAME_WINDOW: + case SPDY_DATA_FRAME: + if (s == NULL) + fatal("spdy_frame_send(): stream is NULL for %d", type); + break; + } + length = 0; memset(nb, 0, sizeof(nb)); switch (type) { @@ -192,12 +237,10 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, break; } - if (s != NULL && type == SPDY_DATA_FRAME && (flags & FLAG_FIN)) { - s->send_size += length; - s->flags |= SPDY_KORE_FIN; - net_send_queue(c, nb, length, s); + if (type == SPDY_DATA_FRAME && !(flags & FLAG_FIN)) { + net_send_queue(c, nb, length, NULL, NETBUF_BEFORE_CHAIN); } else { - net_send_queue(c, nb, length, NULL); + net_send_queue(c, nb, length, NULL, NETBUF_LAST_CHAIN); } } @@ -324,16 +367,12 @@ spdy_stream_get_header(struct spdy_header_block *s, cmp = (char *)(p + 4); if (!strncasecmp(cmp, header, nlen)) { - kore_debug("found %s header", header); - cmp = (char *)(p + nlen + 8); *out = kore_malloc(vlen + 1); kore_strlcpy(*out, cmp, vlen + 1); return (KORE_RESULT_OK); } - kore_debug("pair name %u bytes, value %u bytes", nlen, vlen); - p += nlen + vlen + 8; } @@ -351,7 +390,7 @@ spdy_session_teardown(struct connection *c, u_int8_t err) net_write32((u_int8_t *)&d[4], err); spdy_frame_send(c, SPDY_CTRL_FRAME_GOAWAY, 0, 8, NULL, 0); - net_send_queue(c, d, sizeof(d), NULL); + net_send_queue(c, d, sizeof(d), NULL, NETBUF_LAST_CHAIN); c->flags &= ~CONN_READ_POSSIBLE; c->flags |= CONN_READ_BLOCK; @@ -363,28 +402,44 @@ spdy_session_teardown(struct connection *c, u_int8_t err) void spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) { + s->send_size -= len; + s->frame_size -= len; s->send_wsize -= len; - s->send_size -= (u_int64_t)len; + c->spdy_send_wsize -= len; + + kore_debug("spdy_update_wsize(): s:%u fz:%d sz:%d wz:%d cwz:%d", + s->stream_id, s->frame_size, s->send_size, + s->send_wsize, c->spdy_send_wsize); + + if (s->frame_size == 0 && s->send_size > 0) { + kore_debug("spdy_update_wsize(): starting new data frame"); + s->flags |= SPDY_DATAFRAME_PRELUDE; + } if (s->send_size == 0) { + if (!(s->flags & SPDY_KORE_FIN)) { + s->flags |= SPDY_KORE_FIN; + kore_debug("sending final frame %u", s->stream_id); + spdy_frame_send(c, SPDY_DATA_FRAME, FLAG_FIN, 0, s, 0); + return; + } + if (s->flags & (SPDY_KORE_FIN | FLAG_FIN)) { spdy_stream_close(c, s, SPDY_KEEP_NETBUFS); return; } - kore_log(LOG_NOTICE, "A spdy stream was empty but not closed"); + kore_debug("%u remains half open\n", s->stream_id); } - kore_debug("spdy_update_wsize(): stream %u, window size %d", - s->stream_id, s->send_wsize); - - if (s->send_wsize <= 0) { - kore_debug("window size <= 0 for stream %u", s->stream_id); - c->flags &= ~CONN_WRITE_POSSIBLE; - c->flags |= CONN_WRITE_BLOCK; + if ((int)s->send_wsize <= 0) { + kore_debug("flow control kicked in for STREAM %p:%p", s, c); + s->flags |= SPDY_STREAM_BLOCKING; + } - c->idle_timer.length = 5000; - kore_connection_start_idletimer(c); + if ((int)c->spdy_send_wsize <= 0) { + kore_debug("flow control kicked in for CONNECTION %p", c); + spdy_block_write(c); } } @@ -412,6 +467,9 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) kore_debug("stream_id: %u", syn.stream_id); kore_debug("length : %u", ctrl.length); + if (c->spdy_send_wsize > 0 && (c->flags & CONN_WRITE_BLOCK)) + spdy_enable_write(c); + if ((int)ctrl.length < 0) { spdy_session_teardown(c, SPDY_SESSION_ERROR_PROTOCOL); return (KORE_RESULT_OK); @@ -439,6 +497,7 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) s = kore_malloc(sizeof(*s)); s->send_size = 0; + s->frame_size = 0; s->httpreq = NULL; s->prio = syn.prio; s->flags = ctrl.flags; @@ -525,13 +584,10 @@ static int spdy_ctrl_frame_rst_stream(struct netbuf *nb) { struct spdy_stream *s; - u_int32_t stream_id, status; + u_int32_t stream_id; struct connection *c = (struct connection *)nb->owner; stream_id = net_read32(nb->buf + SPDY_FRAME_SIZE); - status = net_read32(nb->buf + SPDY_FRAME_SIZE + sizeof(u_int32_t)); - printf("RST STATUS IS %d\n", status); - if ((stream_id % 2) == 0) { kore_debug("received RST for non-client stream %u", stream_id); return (KORE_RESULT_ERROR); @@ -550,8 +606,9 @@ spdy_ctrl_frame_rst_stream(struct netbuf *nb) static int spdy_ctrl_frame_settings(struct netbuf *nb) { + struct spdy_stream *s; u_int8_t *buf; - u_int32_t ecount, i, id, val, length; + u_int32_t ecount, i, id, val, length, diff; struct connection *c = (struct connection *)nb->owner; ecount = net_read32(nb->buf + SPDY_FRAME_SIZE); @@ -582,7 +639,11 @@ spdy_ctrl_frame_settings(struct netbuf *nb) switch (id) { case SETTINGS_INITIAL_WINDOW_SIZE: + diff = val - c->wsize_initial; c->wsize_initial = val; + TAILQ_FOREACH(s, &(c->spdy_streams), list) + s->send_wsize += diff; + kore_debug("updated wsize with %d", diff); break; default: kore_debug("no handling for setting %u:%u", id, val); @@ -618,6 +679,7 @@ spdy_ctrl_frame_ping(struct netbuf *nb) static int spdy_ctrl_frame_window(struct netbuf *nb) { + int r; struct spdy_stream *s; u_int32_t stream_id, window_size; struct connection *c = (struct connection *)nb->owner; @@ -625,25 +687,36 @@ spdy_ctrl_frame_window(struct netbuf *nb) stream_id = net_read32(nb->buf + SPDY_FRAME_SIZE); window_size = net_read32(nb->buf + SPDY_FRAME_SIZE + 4); - if ((s = spdy_stream_lookup(c, stream_id)) == NULL) { - kore_debug("received WINDOW_UPDATE for nonexistant stream"); - kore_debug("stream_id: %u", stream_id); - spdy_session_teardown(c, SPDY_SESSION_ERROR_PROTOCOL); - return (KORE_RESULT_OK); + r = KORE_RESULT_OK; + if ((s = spdy_stream_lookup(c, stream_id)) != NULL) { + s->send_wsize += window_size; + if (c->flags & CONN_WRITE_BLOCK && s->send_wsize > 0) { + s->flags &= ~SPDY_STREAM_BLOCKING; + kore_debug("stream %u no longer blocket", s->stream_id); + } + } else { + c->spdy_send_wsize += window_size; + if (c->flags & CONN_WRITE_BLOCK && c->spdy_send_wsize > 0) { + spdy_enable_write(c); + r = net_send_flush(c); + } } - kore_debug("SPDY_WINDOW_UPDATE: %u:%u", stream_id, window_size); - s->send_wsize += window_size; - if (s->send_wsize > 0 && c->flags & CONN_WRITE_BLOCK) { - c->flags &= ~CONN_WRITE_BLOCK; - c->flags |= CONN_WRITE_POSSIBLE; + kore_debug("window_update: %u for %u", window_size, stream_id); + kore_debug("c->spdy_send_wsize = %u", c->spdy_send_wsize); - kore_connection_stop_idletimer(c); - c->idle_timer.length = spdy_idle_time; + return (r); +} - kore_debug("can now send again (%u wsize)", s->send_wsize); - return (net_send_flush(c)); - } +static int +spdy_ctrl_frame_goaway(struct netbuf *nb) +{ + struct connection *c = (struct connection *)nb->owner; + + kore_debug("spdy_ctrl_frame_goaway(%p)", c); + + c->flags |= SPDY_CONN_GOAWAY; + kore_connection_disconnect(c); return (KORE_RESULT_OK); } @@ -751,8 +824,10 @@ spdy_stream_close(struct connection *c, struct spdy_stream *s, int rb) if (rb) { for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = nt) { nt = TAILQ_NEXT(nb, list); - if (nb->stream == s) + if (nb->stream == s) { + kore_debug("spdy_stream_close: killing %p", nb); net_remove_netbuf(&(c->send_queue), nb); + } } } @@ -772,6 +847,24 @@ spdy_stream_close(struct connection *c, struct spdy_stream *s, int rb) kore_mem_free(s); } +static void +spdy_block_write(struct connection *c) +{ + kore_debug("spdy_block_write(%p)", c); + + c->flags |= CONN_WRITE_BLOCK; + c->flags &= ~CONN_WRITE_POSSIBLE; +} + +static void +spdy_enable_write(struct connection *c) +{ + kore_debug("spdy_enable_write(%p)", c); + + c->flags &= ~CONN_WRITE_BLOCK; + c->flags |= CONN_WRITE_POSSIBLE; +} + static int spdy_zlib_inflate(struct connection *c, u_int8_t *src, size_t len, u_int8_t **dst, u_int32_t *olen)