kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 848704f74bff8b5ea9c20a45be85e0c7453434d2
parent 4fc434e9096bcf8afdf0d5b66a095afd7ee0937b
Author: Joris Vink <joris@coders.se>
Date:   Wed,  1 May 2013 08:09:04 +0200

lots of new stuff, including processing of http requests and an attempt
to build an initial spdy response (SYN frame + header block content).

Diffstat:
Makefile | 2+-
includes/http.h | 8++++++++
includes/kore.h | 11+++++++----
includes/spdy.h | 22++++++++++++++++++----
src/http.c | 100+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/kore.c | 15++++++++++-----
src/net.c | 14++++++++++++++
src/spdy.c | 156++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
8 files changed, 301 insertions(+), 27 deletions(-)

diff --git a/Makefile b/Makefile @@ -10,7 +10,7 @@ CFLAGS+=-I/usr/local/ssl/include CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes CFLAGS+=-Wmissing-declarations -Wshadow -Wpointer-arith -Wcast-qual CFLAGS+=-Wsign-compare -Iincludes -g -LDFLAGS=-static -Llibs -lssl -lcrypto -ldl -lz +LDFLAGS=-Llibs -lssl -lcrypto -ldl -lz light: $(S_OBJS) $(CC) $(CFLAGS) $(S_OBJS) $(LDFLAGS) -o $(BIN) diff --git a/includes/http.h b/includes/http.h @@ -28,4 +28,12 @@ struct http_request { TAILQ_ENTRY(http_request) list; }; +void http_init(void); +void http_process(void); +void http_request_free(struct http_request *); +int http_response(struct http_request *, int, + u_int8_t *, u_int32_t); +int http_new_request(struct connection *, struct spdy_stream *, + char *, char *, char *); + #endif /* !__H_HTTP_H */ diff --git a/includes/kore.h b/includes/kore.h @@ -65,6 +65,7 @@ struct connection { TAILQ_HEAD(, netbuf) send_queue; TAILQ_HEAD(, netbuf) recv_queue; + u_int32_t client_stream_id; TAILQ_HEAD(, spdy_stream) spdy_streams; }; @@ -73,12 +74,15 @@ void *kore_calloc(size_t, size_t); void *kore_realloc(void *, size_t); char *kore_strdup(const char *); void kore_strlcpy(char *, const char *, size_t); +void kore_server_disconnect(struct connection *); void fatal(const char *, ...); void kore_log_internal(char *, int, const char *, ...); u_int16_t net_read16(u_int8_t *); u_int32_t net_read32(u_int8_t *); +void net_write16(u_int8_t *, u_int16_t); +void net_write32(u_int8_t *, u_int32_t); int net_recv(struct connection *); int net_send(struct connection *); int net_recv_queue(struct connection *, size_t, @@ -88,10 +92,9 @@ int net_recv_expand(struct connection *c, struct netbuf *, size_t, int net_send_queue(struct connection *, u_int8_t *, size_t, int (*cb)(struct netbuf *)); -int http_new_request(struct connection *, struct spdy_stream *, - char *, char *, char *); - -int spdy_frame_recv(struct netbuf *); +int spdy_frame_recv(struct netbuf *); +int spdy_frame_send(struct connection *, u_int16_t, + u_int8_t, u_int32_t, u_int32_t, u_int8_t *); struct spdy_stream *spdy_stream_lookup(struct connection *, u_int32_t); #endif /* !__H_KORE_H */ diff --git a/includes/spdy.h b/includes/spdy.h @@ -38,15 +38,19 @@ struct spdy_syn_stream { u_int8_t prio; }; +struct spdy_header_block { + u_int8_t *header_block; + u_int32_t header_block_len; + u_int32_t header_offset; + u_int32_t header_pairs; +}; + struct spdy_stream { u_int32_t stream_id; u_int8_t flags; u_int8_t prio; - u_int8_t *header_block; - u_int32_t header_block_len; - u_int32_t header_pairs; - + struct spdy_header_block *hblock; TAILQ_ENTRY(spdy_stream) list; }; @@ -62,10 +66,20 @@ extern const unsigned char SPDY_dictionary_txt[]; /* control frames */ #define SPDY_CTRL_FRAME_SYN_STREAM 1 +#define SPDY_CTRL_FRAME_SYN_REPLY 2 #define SPDY_CTRL_FRAME_SETTINGS 4 +#define SPDY_DATA_FRAME 99 + /* flags */ #define FLAG_FIN 0x01 #define FLAG_UNIDIRECTIONAL 0x02 +#define SPDY_HBLOCK_NORMAL 0 +#define SPDY_HBLOCK_DELAYED_ALLOC 1 + +struct spdy_header_block *spdy_header_block_create(int); +void spdy_header_block_add(struct spdy_header_block *, char *, char *); +u_int8_t *spdy_header_block_release(struct spdy_header_block *, u_int32_t *); + #endif /* !__H_SPDY_H */ diff --git a/src/http.c b/src/http.c @@ -36,12 +36,112 @@ #include "kore.h" #include "http.h" +TAILQ_HEAD(, http_request) http_requests; + +static int http_generic_cb(struct http_request *); + +void +http_init(void) +{ + TAILQ_INIT(&http_requests); +} + int http_new_request(struct connection *c, struct spdy_stream *s, char *host, char *method, char *path) { + struct http_request *req; + kore_log("http_new_request(%p, %p, %s, %s, %s)", c, s, host, method, path); + req = (struct http_request *)kore_malloc(sizeof(*req)); + req->owner = c; + req->stream = s; + req->host = kore_strdup(host); + req->path = kore_strdup(path); + req->method = kore_strdup(method); + TAILQ_INSERT_TAIL(&http_requests, req, list); + + return (KORE_RESULT_OK); +} + +void +http_request_free(struct http_request *req) +{ + free(req->method); + free(req->path); + free(req->host); + free(req); +} + +int +http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) +{ + u_int32_t hlen; + u_int8_t *htext; + struct spdy_header_block *hblock; + + kore_log("http_response(%p, %d, %p, %d)", req, status, d, len); + + if (req->stream != NULL) { + hblock = spdy_header_block_create(SPDY_HBLOCK_NORMAL); + spdy_header_block_add(hblock, ":status", "200"); + spdy_header_block_add(hblock, ":version", "HTTP/1.1"); + spdy_header_block_add(hblock, "content-type", "text/plain"); + if ((htext = spdy_header_block_release(hblock, &hlen)) == NULL) + return (KORE_RESULT_ERROR); + + kore_log("deflated is %d bytes", hlen); + + if (!spdy_frame_send(req->owner, SPDY_CTRL_FRAME_SYN_REPLY, + 0, hlen, req->stream->stream_id, NULL)) + return (KORE_RESULT_ERROR); + + if (!net_send_queue(req->owner, htext, hlen, NULL)) + return (KORE_RESULT_ERROR); + +#if 0 + if (!spdy_frame_send(req->owner, SPDY_DATA_FRAME, 0, len, + req->stream->stream_id, d)) + return (KORE_RESULT_ERROR); +#endif + } else { + kore_log("normal http not functional yet"); + } + return (KORE_RESULT_OK); } + +void +http_process(void) +{ + struct http_request *req, *next; + + if (TAILQ_EMPTY(&http_requests)) + return; + + kore_log("http_process()"); + for (req = TAILQ_FIRST(&http_requests); req != NULL; req = next) { + next = TAILQ_NEXT(req, list); + + /* XXX - add module hooks here */ + if (!http_generic_cb(req)) + kore_server_disconnect(req->owner); + + TAILQ_REMOVE(&http_requests, req, list); + http_request_free(req); + } +} + +static int +http_generic_cb(struct http_request *req) +{ + u_int32_t len; + + kore_log("http_generic_cb(%s, %s, %s)", + req->host, req->method, req->path); + + len = strlen("<p>Hello world</p>"); + return (http_response(req, 200, (u_int8_t *)"<p>Hello world</p>", len)); +} diff --git a/src/kore.c b/src/kore.c @@ -35,6 +35,7 @@ #include "spdy.h" #include "kore.h" +#include "http.h" #define EPOLL_EVENTS 500 @@ -45,7 +46,6 @@ static int kore_socket_nonblock(int); static int kore_server_sslstart(void); static void kore_event(int, int, void *); static int kore_server_accept(struct listener *); -static void kore_server_disconnect(struct connection *); static int kore_connection_handle(struct connection *, int); static int kore_server_bind(struct listener *, const char *, int); static int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); @@ -69,15 +69,18 @@ main(int argc, char *argv[]) if ((efd = epoll_create(1000)) == -1) fatal("epoll_create(): %s", errno_s); + http_init(); + kore_event(server.fd, EPOLLIN, &server); events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); for (;;) { - kore_log("main(): epoll_wait()"); - n = epoll_wait(efd, events, EPOLL_EVENTS, -1); + n = epoll_wait(efd, events, EPOLL_EVENTS, 10); if (n == -1) fatal("epoll_wait(): %s", errno_s); - kore_log("main(): %d sockets available", n); + if (n > 0) + kore_log("main(): %d sockets available", n); + for (i = 0; i < n; i++) { fd = (int *)events[i].data.ptr; @@ -100,6 +103,8 @@ main(int argc, char *argv[]) kore_server_disconnect(c); } } + + http_process(); } close(server.fd); @@ -210,7 +215,7 @@ kore_server_accept(struct listener *l) return (KORE_RESULT_OK); } -static void +void kore_server_disconnect(struct connection *c) { struct netbuf *nb, *next; diff --git a/src/net.c b/src/net.c @@ -109,7 +109,9 @@ net_send(struct connection *c) return (KORE_RESULT_OK); nb = TAILQ_FIRST(&(c->send_queue)); + kore_log("nb is %p (%d/%d bytes)", nb, nb->offset, nb->len); r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); + kore_log("SSL_write(): %d bytes", r); if (r <= 0) { r = SSL_get_error(c->ssl, r); switch (r) { @@ -214,3 +216,15 @@ net_read32(u_int8_t *b) r = *(u_int32_t *)b; return (ntohl(r)); } + +void +net_write16(u_int8_t *p, u_int16_t n) +{ + *p = htons(n); +} + +void +net_write32(u_int8_t *p, u_int32_t n) +{ + *p = htonl(n); +} diff --git a/src/spdy.c b/src/spdy.c @@ -39,7 +39,7 @@ static int spdy_ctrl_frame_syn_stream(struct netbuf *); static int spdy_ctrl_frame_settings(struct netbuf *); -static int spdy_stream_get_header(struct spdy_stream *, +static int spdy_stream_get_header(struct spdy_header_block *, char *, char **); static int spdy_zlib_inflate(u_int8_t *, size_t, @@ -93,6 +93,35 @@ spdy_frame_recv(struct netbuf *nb) return (r); } +int +spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, + u_int32_t len, u_int32_t stream_id, u_int8_t *data) +{ + u_int8_t nb[12]; + u_int32_t length; + + kore_log("spdy_frame_send(%p, %d, %d, %d, %d, %p)", c, type, flags, + len, stream_id, data); + + length = 0; + memset(nb, 0, sizeof(nb)); + switch (type) { + case SPDY_CTRL_FRAME_SYN_REPLY: + net_write16(&nb[0], 3); + nb[0] |= (1 << 7); + net_write16(&nb[2], type); + net_write32(&nb[4], len + 4); + nb[4] = flags; + net_write32(&nb[8], stream_id); + length = 12; + break; + case SPDY_DATA_FRAME: + break; + } + + return (net_send_queue(c, nb, length, NULL)); +} + struct spdy_stream * spdy_stream_lookup(struct connection *c, u_int32_t id) { @@ -106,6 +135,86 @@ spdy_stream_lookup(struct connection *c, u_int32_t id) return (NULL); } +struct spdy_header_block * +spdy_header_block_create(int delayed_alloc) +{ + struct spdy_header_block *hblock; + + kore_log("spdy_header_block_create()"); + + hblock = (struct spdy_header_block *)kore_malloc(sizeof(*hblock)); + if (delayed_alloc == SPDY_HBLOCK_NORMAL) { + hblock->header_block = (u_int8_t *)kore_malloc(128); + hblock->header_block_len = 128; + } else { + hblock->header_block = NULL; + hblock->header_block_len = 0; + } + + hblock->header_pairs = 0; + hblock->header_offset = 0; + + return (hblock); +} + +void +spdy_header_block_add(struct spdy_header_block *hblock, char *name, char *value) +{ + u_int8_t *p; + char *out; + u_int32_t nlen, vlen; + + kore_log("spdy_header_block_add(%p, %s, %s)", hblock, name, value); + + nlen = strlen(name); + vlen = strlen(value); + if ((nlen + vlen + hblock->header_offset) > hblock->header_block_len) { + hblock->header_block_len += nlen + vlen + 128; + hblock->header_block = + (u_int8_t *)kore_realloc(hblock->header_block, + hblock->header_block_len); + } + + p = hblock->header_block + hblock->header_offset; + net_write32(p, nlen); + memcpy((p + 4), (u_int8_t *)name, nlen); + hblock->header_offset += 4 + nlen; + + p = hblock->header_block + hblock->header_offset; + net_write32(p, vlen); + memcpy((p + 4), (u_int8_t *)value, vlen); + hblock->header_offset += 4 + vlen; + + hblock->header_pairs++; + + if (!spdy_stream_get_header(hblock, name, &out)) { + kore_log("cannot find newly inserted header %s", name); + } else { + kore_log("found header (%s, %s) as %s", name, value, out); + free(out); + } +} + +u_int8_t * +spdy_header_block_release(struct spdy_header_block *hblock, u_int32_t *len) +{ + u_int8_t *deflated; + + kore_log("spdy_header_block_release(%p, %p)", hblock, len); + + if (!spdy_zlib_deflate(hblock->header_block, hblock->header_offset, + &deflated, len)) { + free(hblock->header_block); + free(hblock); + return (NULL); + } + + free(hblock->header_block); + free(hblock); + + return (deflated); +} + static int spdy_ctrl_frame_syn_stream(struct netbuf *nb) { @@ -126,13 +235,20 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) syn.prio = net_read16(nb->buf + 16) & 0xe000; syn.slot = net_read16(nb->buf + 16) & 0x7; - /* XXX need to send protocol errors here? */ + /* XXX need to send protocol error. */ if ((syn.stream_id % 2) == 0 || syn.stream_id == 0) { kore_log("client sent incorrect id for SPDY_SYN_STREAM (%d)", syn.stream_id); return (KORE_RESULT_ERROR); } + /* XXX need to send protocol error. */ + if (syn.stream_id < c->client_stream_id) { + kore_log("client sent incorrect id SPDY_SYN_STREAM (%d < %d)", + syn.stream_id, c->client_stream_id); + return (KORE_RESULT_ERROR); + } + if ((s = spdy_stream_lookup(c, syn.stream_id)) != NULL) { kore_log("duplicate SPDY_SYN_STREAM (%d)", syn.stream_id); return (KORE_RESULT_ERROR); @@ -142,28 +258,29 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) s->prio = syn.prio; s->flags = ctrl.flags; s->stream_id = syn.stream_id; - s->header_block_len = 0; - s->header_block = NULL; + s->hblock = spdy_header_block_create(SPDY_HBLOCK_DELAYED_ALLOC); src = (nb->buf + SPDY_FRAME_SIZE + SPDY_SYNFRAME_SIZE); kore_log("compressed headers are %d bytes long", ctrl.length - 10); if (!spdy_zlib_inflate(src, (ctrl.length - SPDY_SYNFRAME_SIZE), - &(s->header_block), &(s->header_block_len))) { - free(s->header_block); + &(s->hblock->header_block), &(s->hblock->header_block_len))) { + free(s->hblock->header_block); + free(s->hblock); free(s); return (KORE_RESULT_ERROR); } - s->header_pairs = net_read32(s->header_block); - kore_log("got %d headers", s->header_pairs); + s->hblock->header_pairs = net_read32(s->hblock->header_block); + kore_log("got %d headers", s->hblock->header_pairs); path = NULL; host = NULL; method = NULL; #define GET_HEADER(n, r) \ - if (!spdy_stream_get_header(s, n, r)) { \ - free(s->header_block); \ + if (!spdy_stream_get_header(s->hblock, n, r)) { \ + free(s->hblock->header_block); \ + free(s->hblock); \ free(s); \ kore_log("no such header: %s", n); \ if (path != NULL) \ @@ -179,7 +296,8 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) GET_HEADER(":method", &method); GET_HEADER(":host", &host); if (!http_new_request(c, s, host, method, path)) { - free(s->header_block); + free(s->hblock->header_block); + free(s->hblock); free(s); return (KORE_RESULT_ERROR); } @@ -188,6 +306,7 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) free(method); free(host); + c->client_stream_id = s->stream_id; TAILQ_INSERT_TAIL(&(c->spdy_streams), s, list); kore_log("SPDY_SYN_STREAM: %d:%d:%d", s->stream_id, s->flags, s->prio); @@ -207,16 +326,27 @@ spdy_ctrl_frame_settings(struct netbuf *nb) } static int -spdy_stream_get_header(struct spdy_stream *s, char *header, char **out) +spdy_stream_get_header(struct spdy_header_block *s, char *header, char **out) { - u_int8_t *p; char *cmp; + u_int8_t *p, *end; u_int32_t i, nlen, vlen; + end = s->header_block + s->header_block_len; + p = s->header_block + 4; for (i = 0; i < s->header_pairs; i++) { nlen = net_read32(p); + if ((p + nlen + 4) >= end) { + kore_log("nlen out of bounds (%d)", nlen); + return (KORE_RESULT_ERROR); + } + vlen = net_read32(p + nlen + 4); + if ((p + nlen + vlen + 8) >= end) { + kore_log("vlen out of bounds (%d)", vlen); + return (KORE_RESULT_ERROR); + } cmp = (char *)(p + 4); if (!strncasecmp(cmp, header, nlen)) {