kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 10284d59b6ee5f48e496135839677a854ded024a
parent 1ef0f3423b77c501bd0db767402f8df7265648f7
Author: Joris Vink <joris@coders.se>
Date:   Sun, 10 Aug 2014 18:17:06 +0200

Another round of spdy/3.1 improvements.

* Always make sure we end the stream properly
* Check for SPDY_FLOW_WINDOW_MAX on window frame updates
* Kill SPDY_STREAM_BLOCKING, once flow control kicks in its per session

Diffstat:
includes/kore.h | 4+++-
includes/spdy.h | 8++++++--
src/http.c | 3+++
src/net.c | 25+------------------------
src/spdy.c | 113++++++++++++++++++++++++++++++++++++++++---------------------------------------
5 files changed, 70 insertions(+), 83 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Joris Vink <joris@coders.se> + * Copyright (c) 2013-2014 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -489,6 +489,8 @@ void spdy_header_block_add(struct spdy_header_block *, char *, char *); u_int8_t *spdy_header_block_release(struct connection *, struct spdy_header_block *, u_int32_t *); +void spdy_stream_close(struct connection *, + struct spdy_stream *, int); struct spdy_header_block *spdy_header_block_create(int); diff --git a/includes/spdy.h b/includes/spdy.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Joris Vink <joris@coders.se> + * Copyright (c) 2013-2014 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -107,9 +107,13 @@ extern const unsigned char SPDY_dictionary_txt[]; #define SPDY_HBLOCK_NORMAL 0 #define SPDY_HBLOCK_DELAYED_ALLOC 1 +#define SPDY_FLOW_WINDOW_MAX 2147483647 + /* internal flags (make sure they don't clash with SPDY stream flags) */ #define SPDY_KORE_FIN 0x10 #define SPDY_DATAFRAME_PRELUDE 0x20 -#define SPDY_STREAM_BLOCKING 0x40 + +#define SPDY_KEEP_NETBUFS 0 +#define SPDY_REMOVE_NETBUFS 1 #endif /* !__H_SPDY_H */ diff --git a/src/http.c b/src/http.c @@ -1089,6 +1089,9 @@ http_response_spdy(struct http_request *req, struct connection *c, if (d != NULL) net_send_queue(c, d, len, s, NETBUF_LAST_CHAIN); + } else { + spdy_frame_send(c, SPDY_DATA_FRAME, FLAG_FIN, 0, s, 0); + spdy_stream_close(c, req->stream, SPDY_KEEP_NETBUFS); } } diff --git a/src/net.c b/src/net.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Joris Vink <joris@coders.se> + * Copyright (c) 2013-2014 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -78,7 +78,6 @@ net_send_queue(struct connection *c, void *data, u_int32_t len, memcpy(nb->buf, d, nb->b_len); if (before == NETBUF_BEFORE_CHAIN) { - kore_debug("net_send_queue(): %p->new", c->snb); TAILQ_INSERT_BEFORE(c->snb, nb, list); } else { TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); @@ -154,10 +153,8 @@ net_send(struct connection *c) { int r; u_int32_t len, smin; - struct netbuf *nb, *next; c->snb = TAILQ_FIRST(&(c->send_queue)); - kore_debug("net_send(%p, %d, %d)", c->snb, c->snb->s_off, c->snb->b_len); if (c->snb->b_len != 0) { if (c->snb->stream != NULL && (c->snb->stream->flags & SPDY_DATAFRAME_PRELUDE)) { @@ -167,25 +164,6 @@ net_send(struct connection *c) } c->snb = TAILQ_FIRST(&(c->send_queue)); - kore_debug("after (%p, %d, %d)", - c->snb, c->snb->s_off, c->snb->b_len); - } - - if (c->snb->stream != NULL && - (c->snb->stream->flags & SPDY_STREAM_BLOCKING)) { - kore_debug("stream block, resorting", c->snb); - for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; - nb = next) { - next = TAILQ_NEXT(nb, list); - if (nb->stream != c->snb->stream) - continue; - - TAILQ_REMOVE(&(c->send_queue), nb, list); - TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); - } - - c->snb = NULL; - return (KORE_RESULT_OK); } smin = c->snb->b_len - c->snb->s_off; @@ -195,7 +173,6 @@ net_send(struct connection *c) } len = MIN(NETBUF_SEND_PAYLOAD_MAX, smin); - kore_debug("chosen len: %d", len); #if !defined(KORE_BENCHMARK) r = SSL_write(c->ssl, diff --git a/src/spdy.c b/src/spdy.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Joris Vink <joris@coders.se> + * Copyright (c) 2013-2014 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -22,9 +22,6 @@ #include "kore.h" #include "http.h" -#define SPDY_KEEP_NETBUFS 0 -#define SPDY_REMOVE_NETBUFS 1 - static int spdy_ctrl_frame_syn_stream(struct netbuf *); static int spdy_ctrl_frame_rst_stream(struct netbuf *); static int spdy_ctrl_frame_settings(struct netbuf *); @@ -36,8 +33,6 @@ static int spdy_data_frame_recv(struct netbuf *); static void spdy_block_write(struct connection *); static void spdy_enable_write(struct connection *); -static void spdy_stream_close(struct connection *, - struct spdy_stream *, int); static int spdy_zlib_inflate(struct connection *, u_int8_t *, size_t, u_int8_t **, u_int32_t *); static int spdy_zlib_deflate(struct connection *, u_int8_t *, @@ -148,6 +143,10 @@ spdy_dataframe_begin(struct connection *c) { struct spdy_stream *s = c->snb->stream; + kore_debug("spdy_dataframe_begin(%p): s:%u fz:%d sz:%d wz:%d cwz:%d", + c, s->stream_id, s->frame_size, s->send_size, s->send_wsize, + c->spdy_send_wsize); + if (s->frame_size != 0 || s->send_size == 0) { fatal("spdy_dataframe_begin(): s:%u fz:%d - sz:%d", s->stream_id, s->frame_size, s->send_size); @@ -421,7 +420,6 @@ spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) s->flags |= SPDY_KORE_FIN; kore_debug("sending final frame %u", s->stream_id); spdy_frame_send(c, SPDY_DATA_FRAME, FLAG_FIN, 0, s, 0); - return; } if (s->flags & (SPDY_KORE_FIN | FLAG_FIN)) { @@ -432,15 +430,44 @@ spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) kore_debug("%u remains half open\n", s->stream_id); } - if ((int)s->send_wsize <= 0) { - kore_debug("flow control kicked in for STREAM %p:%p", s, c); - s->flags |= SPDY_STREAM_BLOCKING; + if ((int)s->send_wsize <= 0 || (int)c->spdy_send_wsize <= 0) { + kore_debug("flow control kicked in for %p:%p", c, s); + spdy_block_write(c); } +} - if ((int)c->spdy_send_wsize <= 0) { - kore_debug("flow control kicked in for CONNECTION %p", c); - spdy_block_write(c); +void +spdy_stream_close(struct connection *c, struct spdy_stream *s, int rb) +{ + struct http_request *req; + struct netbuf *nb, *nt; + + kore_debug("spdy_stream_close(%p, %p) <%d>", c, s, s->stream_id); + + if (rb) { + for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = nt) { + nt = TAILQ_NEXT(nb, list); + if (nb->stream == s) { + kore_debug("spdy_stream_close: killing %p", nb); + net_remove_netbuf(&(c->send_queue), nb); + } + } } + + TAILQ_REMOVE(&(c->spdy_streams), s, list); + if (s->hblock != NULL) { + if (s->hblock->header_block != NULL) + kore_mem_free(s->hblock->header_block); + kore_mem_free(s->hblock); + } + + if (s->httpreq != NULL) { + req = s->httpreq; + req->stream = NULL; + req->flags |= HTTP_REQUEST_DELETE; + } + + kore_mem_free(s); } static int @@ -562,9 +589,6 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) /* * We don't care so much for what http_request_new() tells us here, * we just have to clean up after passing our stuff to it. - * - * In case of early errors (414, 500, ...) a net_send_flush() will - * clear out this stream properly via spdy_stream_close(). */ (void)http_request_new(c, s, host, method, path, version, (struct http_request **)&(s->httpreq)); @@ -686,25 +710,36 @@ spdy_ctrl_frame_window(struct netbuf *nb) stream_id = net_read32(nb->buf + SPDY_FRAME_SIZE); window_size = net_read32(nb->buf + SPDY_FRAME_SIZE + 4); + kore_debug("window_update: %u for %u", window_size, stream_id); r = KORE_RESULT_OK; if ((s = spdy_stream_lookup(c, stream_id)) != NULL) { s->send_wsize += window_size; - if (c->flags & CONN_WRITE_BLOCK && s->send_wsize > 0) { - s->flags &= ~SPDY_STREAM_BLOCKING; - kore_debug("stream %u no longer blocket", s->stream_id); + if ((u_int64_t)s->send_wsize > SPDY_FLOW_WINDOW_MAX) { + kore_debug("window_update: size too large"); + return (KORE_RESULT_ERROR); + } + + if (c->flags & CONN_WRITE_BLOCK && + s->send_wsize > 0 && c->spdy_send_wsize > 0) { + kore_debug("stream %u no longer blocked", s->stream_id); + spdy_enable_write(c); + r = net_send_flush(c); } } else { c->spdy_send_wsize += window_size; + if ((u_int64_t)c->spdy_send_wsize > SPDY_FLOW_WINDOW_MAX) { + kore_debug("window_update: size too large"); + return (KORE_RESULT_ERROR); + } + if (c->flags & CONN_WRITE_BLOCK && c->spdy_send_wsize > 0) { + kore_debug("session %p no longer blocked", c); spdy_enable_write(c); r = net_send_flush(c); } } - kore_debug("window_update: %u for %u", window_size, stream_id); - kore_debug("c->spdy_send_wsize = %u", c->spdy_send_wsize); - return (r); } @@ -814,40 +849,6 @@ spdy_data_frame_recv(struct netbuf *nb) } static void -spdy_stream_close(struct connection *c, struct spdy_stream *s, int rb) -{ - struct http_request *req; - struct netbuf *nb, *nt; - - kore_debug("spdy_stream_close(%p, %p) <%d>", c, s, s->stream_id); - - if (rb) { - for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = nt) { - nt = TAILQ_NEXT(nb, list); - if (nb->stream == s) { - kore_debug("spdy_stream_close: killing %p", nb); - net_remove_netbuf(&(c->send_queue), nb); - } - } - } - - TAILQ_REMOVE(&(c->spdy_streams), s, list); - if (s->hblock != NULL) { - if (s->hblock->header_block != NULL) - kore_mem_free(s->hblock->header_block); - kore_mem_free(s->hblock); - } - - if (s->httpreq != NULL) { - req = s->httpreq; - req->stream = NULL; - req->flags |= HTTP_REQUEST_DELETE; - } - - kore_mem_free(s); -} - -static void spdy_block_write(struct connection *c) { kore_debug("spdy_block_write(%p)", c);