kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 51a9e4db9d86682b08a18b60e5b13fe073844032
parent 6fa881e224db6cbbd51659c2939bdae8b7e30591
Author: Joris Vink <joris@coders.se>
Date:   Mon,  1 Jul 2013 11:30:18 +0200

Implement SPDY WINDOW_UPDATE and SETTINGS.

Diffstat:
includes/kore.h | 3+++
includes/spdy.h | 13+++++++++++++
src/connection.c | 1+
src/http.c | 5++++-
src/linux.c | 3++-
src/spdy.c | 113+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
6 files changed, 128 insertions(+), 10 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -74,6 +74,7 @@ struct listener { #define CONN_READ_POSSIBLE 0x01 #define CONN_WRITE_POSSIBLE 0x02 +#define CONN_WRITE_BLOCK 0x04 struct connection { int fd; @@ -88,6 +89,7 @@ struct connection { z_stream z_inflate; u_int8_t deflate_started; z_stream z_deflate; + u_int32_t wsize_initial; TAILQ_HEAD(, netbuf) send_queue; TAILQ_HEAD(, netbuf) recv_queue; @@ -260,6 +262,7 @@ void kore_buf_appendb(struct kore_buf *, struct kore_buf *); struct spdy_header_block *spdy_header_block_create(int); struct spdy_stream *spdy_stream_lookup(struct connection *, u_int32_t); +int spdy_frame_data_done(struct netbuf *); int spdy_stream_get_header(struct spdy_header_block *, char *, char **); diff --git a/includes/spdy.h b/includes/spdy.h @@ -49,6 +49,7 @@ struct spdy_stream { u_int32_t stream_id; u_int8_t flags; u_int8_t prio; + int32_t wsize; void *httpreq; struct spdy_header_block *hblock; @@ -65,6 +66,7 @@ extern const unsigned char SPDY_dictionary_txt[]; #define SPDY_SYNFRAME_SIZE 10 #define SPDY_ZLIB_DICT_SIZE 1423 #define SPDY_ZLIB_CHUNK 16348 +#define SPDY_INIT_WSIZE 65536 /* control frames */ #define SPDY_CTRL_FRAME_SYN_STREAM 1 @@ -77,6 +79,17 @@ extern const unsigned char SPDY_dictionary_txt[]; /* flags */ #define FLAG_FIN 0x01 #define FLAG_UNIDIRECTIONAL 0x02 +#define SPDY_STREAM_WILLCLOSE 0x04 + +/* settings */ +#define SETTINGS_UPLOAD_BANDWIDTH 1 +#define SETTINGS_DOWNLOAD_BANDWIDTH 2 +#define SETTINGS_ROUND_TRIP_TIME 3 +#define SETTINGS_MAX_CONCURRENT_STREAMS 4 +#define SETTINGS_CURRENT_CWND 5 +#define SETTINGS_DOWNLOAD_RETRANS_RATE 6 +#define SETTINGS_INITIAL_WINDOW_SIZE 7 +#define SETTINGS_CLIENT_CERTIFICATE_VECTOR_SIZE 8 #define SPDY_HBLOCK_NORMAL 0 #define SPDY_HBLOCK_DELAYED_ALLOC 1 diff --git a/src/connection.c b/src/connection.c @@ -76,6 +76,7 @@ kore_connection_accept(struct listener *l, struct connection **out) c->client_stream_id = 0; c->proto = CONN_PROTO_UNKNOWN; c->state = CONN_STATE_SSL_SHAKE; + c->wsize_initial = SPDY_INIT_WSIZE; TAILQ_INIT(&(c->send_queue)); TAILQ_INIT(&(c->recv_queue)); diff --git a/src/http.c b/src/http.c @@ -213,6 +213,7 @@ http_request_free(struct http_request *req) int http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) { + struct netbuf *nb; u_int32_t hlen; struct http_header *hdr; struct kore_buf *buf; @@ -245,7 +246,9 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) if (len > 0) { spdy_frame_send(req->owner, SPDY_DATA_FRAME, 0, len, req->stream, 0); - net_send_queue(req->owner, d, len, 0, NULL, NULL); + net_send_queue(req->owner, d, len, 0, &nb, + spdy_frame_data_done); + nb->extra = req->stream; } spdy_frame_send(req->owner, SPDY_DATA_FRAME, diff --git a/src/linux.c b/src/linux.c @@ -126,7 +126,8 @@ kore_platform_event_wait(void) c = (struct connection *)events[i].data.ptr; if (events[i].events & EPOLLIN) c->flags |= CONN_READ_POSSIBLE; - if (events[i].events & EPOLLOUT) + if (events[i].events & EPOLLOUT && + !(c->flags & CONN_WRITE_BLOCK)) c->flags |= CONN_WRITE_POSSIBLE; if (!kore_connection_handle(c)) diff --git a/src/spdy.c b/src/spdy.c @@ -43,6 +43,9 @@ static int spdy_ctrl_frame_settings(struct netbuf *); static int spdy_ctrl_frame_ping(struct netbuf *); static int spdy_ctrl_frame_window(struct netbuf *); static int spdy_data_frame_recv(struct netbuf *); +static int spdy_frame_send_done(struct netbuf *); +static void spdy_update_wsize(struct connection *, + struct spdy_stream *, u_int32_t); static void spdy_stream_close(struct connection *, struct spdy_stream *); @@ -134,8 +137,9 @@ void spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, u_int32_t len, struct spdy_stream *s, u_int32_t misc) { - u_int8_t nb[12]; - u_int32_t length; + struct netbuf *nnb; + u_int8_t nb[12]; + u_int32_t length; kore_debug("spdy_frame_send(%p, %d, %d, %d, %p, %d)", c, type, flags, len, s, misc); @@ -170,10 +174,17 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, break; } - net_send_queue(c, nb, length, 0, NULL, NULL); + if (s != NULL && type == SPDY_DATA_FRAME) { + net_send_queue(c, nb, length, 0, &nnb, spdy_frame_send_done); + nnb->extra = s; + } else { + net_send_queue(c, nb, length, 0, NULL, NULL); + } - if ((flags & FLAG_FIN) && (s->flags & FLAG_FIN)) - spdy_stream_close(c, s); + if (s != NULL) { + if ((flags & FLAG_FIN) && (s->flags & FLAG_FIN)) + s->flags |= SPDY_STREAM_WILLCLOSE; + } } struct spdy_stream * @@ -313,6 +324,16 @@ spdy_stream_get_header(struct spdy_header_block *s, char *header, char **out) return (KORE_RESULT_ERROR); } +int +spdy_frame_data_done(struct netbuf *nb) +{ + struct connection *c = (struct connection *)nb->owner; + struct spdy_stream *s = (struct spdy_stream *)nb->extra; + + spdy_update_wsize(c, s, nb->len); + return (KORE_RESULT_OK); +} + static int spdy_ctrl_frame_syn_stream(struct netbuf *nb) { @@ -359,6 +380,7 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) s = (struct spdy_stream *)kore_malloc(sizeof(*s)); s->prio = syn.prio; s->flags = ctrl.flags; + s->wsize = c->wsize_initial; s->stream_id = syn.stream_id; s->hblock = spdy_header_block_create(SPDY_HBLOCK_DELAYED_ALLOC); @@ -415,7 +437,8 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) c->client_stream_id = s->stream_id; TAILQ_INSERT_TAIL(&(c->spdy_streams), s, list); - kore_debug("SPDY_SYN_STREAM: %d:%d:%d", s->stream_id, s->flags, s->prio); + kore_debug("SPDY_SYN_STREAM: %d:%d:%d", s->stream_id, + s->flags, s->prio); return (KORE_RESULT_OK); } @@ -423,7 +446,31 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) static int spdy_ctrl_frame_settings(struct netbuf *nb) { - kore_debug("SPDY_SETTINGS (to be implemented)"); + u_int8_t *buf, flags; + u_int32_t ecount, i, id, val; + struct connection *c = (struct connection *)nb->owner; + + ecount = net_read32(nb->buf + SPDY_FRAME_SIZE); + kore_debug("SPDY_SETTINGS: %d settings present", ecount); + + buf = nb->buf + SPDY_FRAME_SIZE + 4; + for (i = 0; i < ecount; i++) { + flags = *(u_int8_t *)buf; + id = net_read32(buf) & 0xffffff; + val = net_read32(buf + 4); + + switch (id) { + case SETTINGS_INITIAL_WINDOW_SIZE: + c->wsize_initial = val; + break; + default: + kore_debug("no handling for setting %d:%d (%d)", + id, val, flags); + break; + } + + buf += 8; + } return (KORE_RESULT_OK); } @@ -450,12 +497,34 @@ spdy_ctrl_frame_ping(struct netbuf *nb) static int spdy_ctrl_frame_window(struct netbuf *nb) { - u_int32_t stream_id, window_size; + struct spdy_stream *s; + u_int32_t stream_id, window_size; + struct connection *c = (struct connection *)nb->owner; stream_id = net_read32(nb->buf + SPDY_FRAME_SIZE); window_size = net_read32(nb->buf + SPDY_FRAME_SIZE + 4); + if ((s = spdy_stream_lookup(c, stream_id)) == NULL) { + kore_debug("received WINDOW_UPDATE for nonexistant stream"); + kore_debug("stream_id: %d", stream_id); + return (KORE_RESULT_ERROR); + } + + if (s->flags & SPDY_STREAM_WILLCLOSE) { + kore_debug("received WINDOW_UPDATE for FIN stream"); + return (KORE_RESULT_ERROR); + } + kore_debug("SPDY_WINDOW_UPDATE: %d:%d", stream_id, window_size); + s->wsize += window_size; + if (s->wsize > 0) { + c->flags &= ~CONN_WRITE_BLOCK; + c->flags |= CONN_WRITE_POSSIBLE; + + kore_debug("can now send again (%d wsize)", s->wsize); + return (net_send_flush(c)); + } + return (KORE_RESULT_OK); } @@ -513,6 +582,34 @@ spdy_stream_close(struct connection *c, struct spdy_stream *s) } static int +spdy_frame_send_done(struct netbuf *nb) +{ + u_int8_t flags; + struct connection *c = (struct connection *)nb->owner; + struct spdy_stream *s = (struct spdy_stream *)nb->extra; + + flags = *(u_int8_t *)(nb->buf + 4); + if ((flags & FLAG_FIN) && (s->flags & FLAG_FIN)) + spdy_stream_close(c, s); + + return (KORE_RESULT_OK); +} + +static void +spdy_update_wsize(struct connection *c, struct spdy_stream *s, u_int32_t len) +{ + s->wsize -= len; + kore_debug("spdy_update_wsize(): stream %d, window size %d", + s->stream_id, s->wsize); + + if (s->wsize <= 0) { + kore_debug("window size <= 0 for stream %d", s->stream_id); + c->flags &= ~CONN_WRITE_POSSIBLE; + c->flags |= CONN_WRITE_BLOCK; + } +} + +static int spdy_zlib_inflate(struct connection *c, u_int8_t *src, size_t len, u_int8_t **dst, u_int32_t *olen) {