kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit e3ae1b4e2d59283fab54349858fd518cca65099a
parent 966ed3d20db357ccf1b712492efafdb6282d8c7d
Author: Joris Vink <joris@coders.se>
Date:   Thu,  7 Aug 2014 10:22:54 +0200

Implement some great SPDY improvements.

* Bump spdy announcement to the correct draft version
* When receiving a RST, clean out the netbuf chain of that stream

Diffstat:
includes/kore.h | 6++++--
includes/spdy.h | 2+-
src/net.c | 33+++++++++++++++++++++------------
src/spdy.c | 26+++++++++++++++++++++-----
4 files changed, 47 insertions(+), 20 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -74,6 +74,7 @@ extern int daemon(int, int); #define NETBUF_CALL_CB_ALWAYS 0x01 #define NETBUF_FORCE_REMOVE 0x02 +#define NETBUF_MUST_RESEND 0x04 #define X509_GET_CN(c, o, l) \ X509_NAME_get_text_by_NID(X509_get_subject_name(c), \ @@ -171,8 +172,8 @@ struct connection { z_stream z_deflate; u_int32_t wsize_initial; - TAILQ_HEAD(, netbuf) send_queue; - TAILQ_HEAD(, netbuf) recv_queue; + struct netbuf_head send_queue; + struct netbuf_head recv_queue; u_int32_t client_stream_id; TAILQ_HEAD(, spdy_stream) spdy_streams; @@ -445,6 +446,7 @@ int net_recv(struct connection *); int net_send(struct connection *); int net_send_flush(struct connection *); int net_recv_flush(struct connection *); +void net_remove_netbuf(struct netbuf_head *, struct netbuf *); void net_recv_queue(struct connection *, size_t, int, struct netbuf **, int (*cb)(struct netbuf *)); int net_recv_expand(struct connection *c, struct netbuf *, size_t, diff --git a/includes/spdy.h b/includes/spdy.h @@ -66,7 +66,7 @@ struct spdy_stream { extern const unsigned char SPDY_dictionary_txt[]; -#define KORE_SSL_PROTO_STRING "\x06spdy/3\x08http/1.1" +#define KORE_SSL_PROTO_STRING "\x08spdy/3.2\x08http/1.1" #define SPDY_CONTROL_FRAME(x) ((x & (1 << 31))) #define SPDY_FRAME_SIZE 8 diff --git a/src/net.c b/src/net.c @@ -136,6 +136,7 @@ net_send(struct connection *c) switch (r) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: + nb->flags |= NETBUF_MUST_RESEND; c->flags &= ~CONN_WRITE_POSSIBLE; return (KORE_RESULT_OK); default: @@ -158,20 +159,17 @@ net_send(struct connection *c) } } #endif - kore_debug("net_send(%d/%d bytes), progress with %d", - nb->s_off, nb->b_len, r); + kore_debug("net_send(%p/%d/%d bytes), progress with %d", + nb, nb->s_off, nb->b_len, r); nb->s_off += (size_t)r; + nb->flags &= ~NETBUF_MUST_RESEND; if (nb->stream != NULL) spdy_update_wsize(c, nb->stream, r); } - if (nb->s_off == nb->b_len) { - TAILQ_REMOVE(&(c->send_queue), nb, list); - - kore_mem_free(nb->buf); - kore_pool_put(&nb_pool, nb); - } + if (nb->s_off == nb->b_len) + net_remove_netbuf(&(c->send_queue), nb); } return (KORE_RESULT_OK); @@ -245,10 +243,7 @@ net_recv(struct connection *c) r = nb->cb(nb); if (nb->s_off == nb->b_len || (nb->flags & NETBUF_FORCE_REMOVE)) { - TAILQ_REMOVE(&(c->recv_queue), nb, list); - - kore_mem_free(nb->buf); - kore_pool_put(&nb_pool, nb); + net_remove_netbuf(&(c->recv_queue), nb); } if (r != KORE_RESULT_OK) @@ -273,6 +268,20 @@ net_recv_flush(struct connection *c) return (KORE_RESULT_OK); } +void +net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) +{ + nb->stream = NULL; + if (nb->flags & NETBUF_MUST_RESEND) { + kore_debug("retaining %p (MUST_RESEND)", nb); + return; + } + + TAILQ_REMOVE(list, nb, list); + kore_mem_free(nb->buf); + kore_pool_put(&nb_pool, nb); +} + u_int16_t net_read16(u_int8_t *b) { diff --git a/src/spdy.c b/src/spdy.c @@ -20,6 +20,9 @@ #include "kore.h" #include "http.h" +#define SPDY_KEEP_NETBUFS 0 +#define SPDY_REMOVE_NETBUFS 1 + static int spdy_ctrl_frame_syn_stream(struct netbuf *); static int spdy_ctrl_frame_rst_stream(struct netbuf *); static int spdy_ctrl_frame_settings(struct netbuf *); @@ -28,7 +31,7 @@ static int spdy_ctrl_frame_window(struct netbuf *); static int spdy_data_frame_recv(struct netbuf *); static void spdy_stream_close(struct connection *, - struct spdy_stream *); + struct spdy_stream *, int); static int spdy_zlib_inflate(struct connection *, u_int8_t *, size_t, u_int8_t **, u_int32_t *); static int spdy_zlib_deflate(struct connection *, u_int8_t *, @@ -365,7 +368,7 @@ spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) if (s->send_size == 0) { if (s->flags & (SPDY_KORE_FIN | FLAG_FIN)) { - spdy_stream_close(c, s); + spdy_stream_close(c, s, SPDY_KEEP_NETBUFS); return; } @@ -522,10 +525,13 @@ static int spdy_ctrl_frame_rst_stream(struct netbuf *nb) { struct spdy_stream *s; - u_int32_t stream_id; + u_int32_t stream_id, status; struct connection *c = (struct connection *)nb->owner; stream_id = net_read32(nb->buf + SPDY_FRAME_SIZE); + status = net_read32(nb->buf + SPDY_FRAME_SIZE + sizeof(u_int32_t)); + printf("RST STATUS IS %d\n", status); + if ((stream_id % 2) == 0) { kore_debug("received RST for non-client stream %u", stream_id); return (KORE_RESULT_ERROR); @@ -536,7 +542,7 @@ spdy_ctrl_frame_rst_stream(struct netbuf *nb) return (KORE_RESULT_ERROR); } - spdy_stream_close(c, s); + spdy_stream_close(c, s, SPDY_REMOVE_NETBUFS); return (KORE_RESULT_OK); } @@ -735,12 +741,21 @@ spdy_data_frame_recv(struct netbuf *nb) } static void -spdy_stream_close(struct connection *c, struct spdy_stream *s) +spdy_stream_close(struct connection *c, struct spdy_stream *s, int rb) { struct http_request *req; + struct netbuf *nb, *nt; kore_debug("spdy_stream_close(%p, %p) <%d>", c, s, s->stream_id); + if (rb) { + for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = nt) { + nt = TAILQ_NEXT(nb, list); + if (nb->stream == s) + net_remove_netbuf(&(c->send_queue), nb); + } + } + TAILQ_REMOVE(&(c->spdy_streams), s, list); if (s->hblock != NULL) { if (s->hblock->header_block != NULL) @@ -750,6 +765,7 @@ spdy_stream_close(struct connection *c, struct spdy_stream *s) if (s->httpreq != NULL) { req = s->httpreq; + req->stream = NULL; req->flags |= HTTP_REQUEST_DELETE; }