kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit c8b422d29d15c97696be3b1544da9019baa9e0fc
parent 23c0ec67c6df33e5b9c5197c664ed495d5a8a455
Author: Joris Vink <joris@coders.se>
Date:   Sun, 28 Apr 2013 23:42:13 +0200

allow us to expand receive buffers automatically so we can keep chaining data into the same netbuf. This gives us the possibility to retain the ctrl_frame by the time we reach the proper cb for the actual frame message.

Diffstat:
includes/kore.h | 14+++++++++-----
includes/spdy.h | 14++++++++++++++
src/kore.c | 4+++-
src/net.c | 41++++++++++++++++++++++++++++++++++-------
src/spdy.c | 32++++++++++++++++++++++++++------
src/utils.c | 2+-
6 files changed, 87 insertions(+), 20 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -25,12 +25,16 @@ #define kore_log(fmt, ...) \ kore_log_internal(__FILE__, __LINE__, fmt, ##__VA_ARGS__) -#define KORE_SSL_PROTO_STRING "\x06spdy/3\x08http/1.1" +#define NETBUF_RECV 0 +#define NETBUF_SEND 1 struct netbuf { u_int8_t *buf; u_int32_t offset; u_int32_t len; + u_int8_t type; + u_int8_t retain; + void *owner; int (*cb)(struct netbuf *); @@ -58,8 +62,6 @@ struct connection { void *owner; SSL *ssl; - struct spdy_frame spdy_cur_frame; - TAILQ_HEAD(, netbuf) send_queue; TAILQ_HEAD(, netbuf) recv_queue; }; @@ -74,9 +76,11 @@ void kore_log_internal(char *, int, const char *, ...); int net_recv(struct connection *); int net_send(struct connection *); -void net_recv_queue(struct connection *, size_t, +int net_recv_queue(struct connection *, size_t, + int (*cb)(struct netbuf *)); +int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); -void net_send_queue(struct connection *, u_int8_t *, size_t, +int net_send_queue(struct connection *, u_int8_t *, size_t, int (*cb)(struct netbuf *)); int spdy_frame_recv(struct netbuf *); diff --git a/includes/spdy.h b/includes/spdy.h @@ -17,6 +17,8 @@ #ifndef __H_SPDY_H #define __H_SPDY_H +#define KORE_SSL_PROTO_STRING "\x06spdy/3\x08http/1.1" + struct spdy_frame { u_int32_t frame_1; u_int32_t frame_2; @@ -37,6 +39,14 @@ struct spdy_data_frame { int flags:8; }; +struct spdy_syn_stream { + u_int32_t stream_id; + u_int32_t assoc_stream_id; + u_int8_t slot; + int reserved:5; + int prio:3; +}; + #define SPDY_CONTROL_FRAME(x) ((x->frame_1 & (1 << 31))) #define SPDY_FRAME_SIZE 8 @@ -44,4 +54,8 @@ struct spdy_data_frame { #define SPDY_CTRL_FRAME_SYN_STREAM 1 #define SPDY_CTRL_FRAME_SETTINGS 4 +/* flags. */ +#define FLAG_FIN 0x01 +#define FLAG_UNIDIRECTIONAL 0x02 + #endif /* !__H_SPDY_H */ diff --git a/src/kore.c b/src/kore.c @@ -286,7 +286,9 @@ kore_connection_handle(struct connection *c, int flags) if (!memcmp(data, "spdy/3", 6)) kore_log("using SPDY/3"); c->proto = CONN_PROTO_SPDY; - net_recv_queue(c, SPDY_FRAME_SIZE, spdy_frame_recv); + if (!net_recv_queue(c, + SPDY_FRAME_SIZE, spdy_frame_recv)) + return (KORE_RESULT_ERROR); } else { kore_log("using HTTP/1.1"); c->proto = CONN_PROTO_HTTP; diff --git a/src/net.c b/src/net.c @@ -36,7 +36,7 @@ #include "spdy.h" #include "kore.h" -void +int net_send_queue(struct connection *c, u_int8_t *data, size_t len, int (*cb)(struct netbuf *)) { @@ -49,14 +49,16 @@ net_send_queue(struct connection *c, u_int8_t *data, size_t len, nb->len = len; nb->owner = c; nb->offset = 0; + nb->retain = 0; + nb->type = NETBUF_SEND; nb->buf = (u_int8_t *)kore_malloc(nb->len); memcpy(nb->buf, data, nb->len); TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); - net_send(c); + return (net_send(c)); } -void +int net_recv_queue(struct connection *c, size_t len, int (*cb)(struct netbuf *)) { struct netbuf *nb; @@ -68,10 +70,31 @@ net_recv_queue(struct connection *c, size_t len, int (*cb)(struct netbuf *)) nb->len = len; nb->owner = c; nb->offset = 0; + nb->retain = 0; + nb->type = NETBUF_RECV; nb->buf = (u_int8_t *)kore_malloc(nb->len); TAILQ_INSERT_TAIL(&(c->recv_queue), nb, list); - net_recv(c); + return (net_recv(c)); +} + +int +net_recv_expand(struct connection *c, struct netbuf *nb, size_t len, + int (*cb)(struct netbuf *)) +{ + kore_log("net_recv_expand(%p, %p, %d, %p)", c, nb, len, cb); + + if (nb->type != NETBUF_RECV) { + kore_log("net_recv_expand(): wrong netbuf type"); + return (KORE_RESULT_ERROR); + } + + nb->cb = cb; + nb->len += len; + nb->buf = (u_int8_t *)kore_realloc(nb->buf, nb->len); + TAILQ_INSERT_HEAD(&(c->recv_queue), nb, list); + + return (net_recv(c)); } int @@ -87,7 +110,6 @@ net_send(struct connection *c) nb = TAILQ_FIRST(&(c->send_queue)); r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); - kore_log("SSL_write(): %d bytes", r); if (r <= 0) { r = SSL_get_error(c->ssl, r); switch (r) { @@ -159,10 +181,15 @@ net_recv(struct connection *c) return (KORE_RESULT_ERROR); } + nb->retain++; TAILQ_REMOVE(&(c->recv_queue), nb, list); r = nb->cb(nb); - free(nb->buf); - free(nb); + nb->retain--; + + if (nb->retain == 0) { + free(nb->buf); + free(nb); + } } else { r = KORE_RESULT_OK; } diff --git a/src/spdy.c b/src/spdy.c @@ -34,6 +34,7 @@ #include "spdy.h" #include "kore.h" + static int spdy_ctrl_frame_syn_stream(struct netbuf *); static int spdy_ctrl_frame_settings(struct netbuf *); @@ -41,14 +42,13 @@ int spdy_frame_recv(struct netbuf *nb) { struct spdy_ctrl_frame *ctrl; - int (*cb)(struct netbuf *); + int (*cb)(struct netbuf *), r; struct connection *c = (struct connection *)nb->owner; struct spdy_frame *frame = (struct spdy_frame *)nb->buf; frame->frame_1 = ntohl(frame->frame_1); frame->frame_2 = ntohl(frame->frame_2); - c->spdy_cur_frame = *frame; if (SPDY_CONTROL_FRAME(frame)) { kore_log("received control frame"); @@ -70,20 +70,39 @@ spdy_frame_recv(struct netbuf *nb) } if (cb != NULL) { - net_recv_queue(c, ctrl->length, cb); + r = net_recv_expand(c, nb, ctrl->length, cb); } else { kore_log("no callback for type %d", ctrl->type); + r = KORE_RESULT_ERROR; } } else { + r = KORE_RESULT_OK; kore_log("received data frame"); } - return (KORE_RESULT_OK); + return (r); } static int spdy_ctrl_frame_syn_stream(struct netbuf *nb) { + u_int16_t *b; + struct spdy_ctrl_frame *ctrl; + struct spdy_syn_stream *syn; + + ctrl = (struct spdy_ctrl_frame *)nb->buf; + syn = (struct spdy_syn_stream *)(nb->buf + SPDY_FRAME_SIZE); + + syn->stream_id = ntohl(syn->stream_id); + syn->assoc_stream_id = ntohl(syn->assoc_stream_id); + b = (u_int16_t *)&(syn->slot); + *b = ntohl(*b); + + kore_log("stream id is %d", syn->stream_id); + kore_log("assoc stream id is %d", syn->assoc_stream_id); + kore_log("slot is %d", syn->slot); + kore_log("priority is %d", syn->prio); + kore_log("-- SPDY_SYN_STREAM"); return (KORE_RESULT_OK); } @@ -91,10 +110,11 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) static int spdy_ctrl_frame_settings(struct netbuf *nb) { + int r; struct connection *c = (struct connection *)nb->owner; kore_log("-- SPDY_SETTINGS"); - net_recv_queue(c, SPDY_FRAME_SIZE, spdy_frame_recv); + r = net_recv_queue(c, SPDY_FRAME_SIZE, spdy_frame_recv); - return (KORE_RESULT_OK); + return (r); } diff --git a/src/utils.c b/src/utils.c @@ -54,7 +54,7 @@ kore_realloc(void *ptr, size_t len) if ((nptr = realloc(ptr, len)) == NULL) fatal("kore_realloc(%p, %d): %d", ptr, len, errno); - return (ptr); + return (nptr); } void *