kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit a074258dcc0f7693c5a0abe75e0ff4035d5ba300
parent d59847c4481c91c4ae04b32d8b2daed7458277f0
Author: Joris Vink <joris@coders.se>
Date:   Wed,  1 May 2013 13:43:47 +0200

rework the way events trigger and the way we handle them. follow the manual for once, as edge trigger does not refire until we got an EAGAIN.

introduce net_send_flush() and net_recv_flush() for this purpose, we attempt to make as much headway as possible until we reach EAGAIN or until we can simply pickup again later.

should merge all the stuff in http_response() into a single send buffer, **out is in place in net_send_queue() for that purpose.

Diffstat:
includes/kore.h | 10++++++++--
src/http.c | 7++++---
src/kore.c | 45++++++++++++++++++++++++++++++++++-----------
src/net.c | 61+++++++++++++++++++++++++++++++++++++++++++++++++------------
src/spdy.c | 4++--
5 files changed, 97 insertions(+), 30 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -54,6 +54,9 @@ struct listener { #define CONN_PROTO_SPDY 1 #define CONN_PROTO_HTTP 2 +#define CONN_READ_POSSIBLE 0x01 +#define CONN_WRITE_POSSIBLE 0x02 + struct connection { int fd; int state; @@ -61,6 +64,7 @@ struct connection { struct sockaddr_in sin; void *owner; SSL *ssl; + int flags; u_int8_t inflate_started; z_stream z_inflate; @@ -90,12 +94,14 @@ void net_write16(u_int8_t *, u_int16_t); void net_write32(u_int8_t *, u_int32_t); int net_recv(struct connection *); int net_send(struct connection *); +int net_send_flush(struct connection *); +int net_recv_flush(struct connection *); int net_recv_queue(struct connection *, size_t, - int (*cb)(struct netbuf *)); + struct netbuf **, int (*cb)(struct netbuf *)); int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); int net_send_queue(struct connection *, u_int8_t *, size_t, - int (*cb)(struct netbuf *)); + struct netbuf **, int (*cb)(struct netbuf *)); struct spdy_stream *spdy_stream_lookup(struct connection *, u_int32_t); struct spdy_header_block *spdy_header_block_create(int); diff --git a/src/http.c b/src/http.c @@ -87,7 +87,7 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) kore_log("http_response(%p, %d, %p, %d)", req, status, d, len); - if (req->stream != NULL) { + if (req->owner->proto == CONN_PROTO_SPDY) { snprintf(sbuf, sizeof(sbuf), "%d", status); hblock = spdy_header_block_create(SPDY_HBLOCK_NORMAL); spdy_header_block_add(hblock, ":status", sbuf); @@ -101,14 +101,14 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) 0, hlen, req->stream->stream_id)) return (KORE_RESULT_ERROR); - if (!net_send_queue(req->owner, htext, hlen, NULL)) + if (!net_send_queue(req->owner, htext, hlen, NULL, NULL)) return (KORE_RESULT_ERROR); if (len > 0) { if (!spdy_frame_send(req->owner, SPDY_DATA_FRAME, 0, len, req->stream->stream_id)) return (KORE_RESULT_ERROR); - if (!net_send_queue(req->owner, d, len, NULL)) + if (!net_send_queue(req->owner, d, len, NULL, NULL)) return (KORE_RESULT_ERROR); } @@ -143,6 +143,7 @@ http_process(void) kore_server_disconnect(req->owner); } + net_send_flush(req->owner); TAILQ_REMOVE(&http_requests, req, list); http_request_free(req); } diff --git a/src/kore.c b/src/kore.c @@ -205,13 +205,17 @@ kore_server_accept(struct listener *l) c->owner = l; c->ssl = NULL; + c->flags = 0; + c->inflate_started = 0; + c->deflate_started = 0; + c->client_stream_id = 0; c->proto = CONN_PROTO_UNKNOWN; c->state = CONN_STATE_SSL_SHAKE; TAILQ_INIT(&(c->send_queue)); TAILQ_INIT(&(c->recv_queue)); TAILQ_INIT(&(c->spdy_streams));; - kore_event(c->fd, EPOLLIN | EPOLLET, c); + kore_event(c->fd, EPOLLIN | EPOLLOUT | EPOLLET, c); kore_log("new connection from %s", inet_ntoa(c->sin.sin_addr)); return (KORE_RESULT_OK); @@ -221,6 +225,7 @@ void kore_server_disconnect(struct connection *c) { struct netbuf *nb, *next; + struct spdy_stream *s, *snext; kore_log("kore_server_disconnect(%p)", c); @@ -242,6 +247,19 @@ kore_server_disconnect(struct connection *c) free(nb); } + for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { + snext = TAILQ_NEXT(s, list); + TAILQ_REMOVE(&(c->spdy_streams), s, list); + + if (s->hblock != NULL) { + if (s->hblock->header_block != NULL) + free(s->hblock->header_block); + free(s->hblock); + } + + free(s); + } + kore_log("disconnect connection from %s", inet_ntoa(c->sin.sin_addr)); free(c); } @@ -255,6 +273,11 @@ kore_connection_handle(struct connection *c, int flags) kore_log("kore_connection_handle(%p, %d)", c, flags); + if (flags & EPOLLIN) + c->flags |= CONN_READ_POSSIBLE; + if (flags & EPOLLOUT) + c->flags |= CONN_WRITE_POSSIBLE; + switch (c->state) { case CONN_STATE_SSL_SHAKE: if (c->ssl == NULL) { @@ -272,10 +295,7 @@ kore_connection_handle(struct connection *c, int flags) r = SSL_get_error(c->ssl, r); switch (r) { case SSL_ERROR_WANT_READ: - kore_log("ssl_want_read on handshake"); - return (KORE_RESULT_OK); case SSL_ERROR_WANT_WRITE: - kore_log("ssl_want_write on handshake"); return (KORE_RESULT_OK); default: kore_log("SSL_accept(): %s", ssl_errno_s); @@ -294,8 +314,8 @@ kore_connection_handle(struct connection *c, int flags) if (!memcmp(data, "spdy/3", 6)) kore_log("using SPDY/3"); c->proto = CONN_PROTO_SPDY; - if (!net_recv_queue(c, - SPDY_FRAME_SIZE, spdy_frame_recv)) + if (!net_recv_queue(c, SPDY_FRAME_SIZE, + NULL, spdy_frame_recv)) return (KORE_RESULT_ERROR); } else { kore_log("using HTTP/1.1"); @@ -303,13 +323,16 @@ kore_connection_handle(struct connection *c, int flags) } c->state = CONN_STATE_ESTABLISHED; - break; + /* FALLTHROUGH */ case CONN_STATE_ESTABLISHED: - if (flags & EPOLLIN) { - if (!net_recv(c)) + if (c->flags & CONN_READ_POSSIBLE) { + if (!net_recv_flush(c)) + return (KORE_RESULT_ERROR); + } + + if (c->flags & CONN_WRITE_POSSIBLE) { + if (!net_send_flush(c)) return (KORE_RESULT_ERROR); - } else { - kore_log("got unhandled client event"); } break; default: diff --git a/src/net.c b/src/net.c @@ -39,7 +39,7 @@ int net_send_queue(struct connection *c, u_int8_t *data, size_t len, - int (*cb)(struct netbuf *)) + struct netbuf **out, int (*cb)(struct netbuf *)) { struct netbuf *nb; @@ -52,15 +52,24 @@ net_send_queue(struct connection *c, u_int8_t *data, size_t len, nb->offset = 0; nb->retain = 0; nb->type = NETBUF_SEND; - nb->buf = (u_int8_t *)kore_malloc(nb->len); - memcpy(nb->buf, data, nb->len); + + if (len > 0) { + nb->buf = (u_int8_t *)kore_malloc(nb->len); + memcpy(nb->buf, data, nb->len); + } else { + nb->buf = NULL; + } TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); - return (net_send(c)); + if (out != NULL) + *out = nb; + + return (KORE_RESULT_OK); } int -net_recv_queue(struct connection *c, size_t len, int (*cb)(struct netbuf *)) +net_recv_queue(struct connection *c, size_t len, + struct netbuf **out, int (*cb)(struct netbuf *)) { struct netbuf *nb; @@ -76,7 +85,10 @@ net_recv_queue(struct connection *c, size_t len, int (*cb)(struct netbuf *)) nb->buf = (u_int8_t *)kore_malloc(nb->len); TAILQ_INSERT_TAIL(&(c->recv_queue), nb, list); - return (net_recv(c)); + if (out != NULL) + *out = nb; + + return (KORE_RESULT_OK); } int @@ -108,6 +120,11 @@ net_send(struct connection *c) return (KORE_RESULT_OK); nb = TAILQ_FIRST(&(c->send_queue)); + if (nb->len == 0) { + kore_log("net_send(): len is 0"); + return (KORE_RESULT_ERROR); + } + r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); //kore_log("net_send(%ld/%ld bytes), progress with %d", @@ -117,10 +134,8 @@ net_send(struct connection *c) r = SSL_get_error(c->ssl, r); switch (r) { case SSL_ERROR_WANT_READ: - kore_log("ssl_want_read on net_send()"); - return (KORE_RESULT_OK); case SSL_ERROR_WANT_WRITE: - kore_log("ssl_want_write on net_send()"); + c->flags &= ~CONN_WRITE_POSSIBLE; return (KORE_RESULT_OK); default: kore_log("SSL_write(): %s", ssl_errno_s); @@ -147,6 +162,18 @@ net_send(struct connection *c) } int +net_send_flush(struct connection *c) +{ + while (!TAILQ_EMPTY(&(c->send_queue)) && + (c->flags & CONN_WRITE_POSSIBLE)) { + if (!net_send(c)) + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} + +int net_recv(struct connection *c) { int r; @@ -165,10 +192,8 @@ net_recv(struct connection *c) r = SSL_get_error(c->ssl, r); switch (r) { case SSL_ERROR_WANT_READ: - kore_log("ssl_want_read on net_recv()"); - return (KORE_RESULT_OK); case SSL_ERROR_WANT_WRITE: - kore_log("ssl_want_write on net_recv()"); + c->flags &= ~CONN_READ_POSSIBLE; return (KORE_RESULT_OK); default: kore_log("SSL_read(): %s", ssl_errno_s); @@ -199,6 +224,18 @@ net_recv(struct connection *c) return (r); } +int +net_recv_flush(struct connection *c) +{ + while (!TAILQ_EMPTY(&(c->recv_queue)) && + (c->flags & CONN_READ_POSSIBLE)) { + if (!net_recv(c)) + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} + u_int16_t net_read16(u_int8_t *b) { diff --git a/src/spdy.c b/src/spdy.c @@ -103,7 +103,7 @@ spdy_frame_recv(struct netbuf *nb) } if (r == KORE_RESULT_OK) - r = net_recv_queue(c, SPDY_FRAME_SIZE, spdy_frame_recv); + r = net_recv_queue(c, SPDY_FRAME_SIZE, NULL, spdy_frame_recv); return (r); } @@ -146,7 +146,7 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, break; } - return (net_send_queue(c, nb, length, NULL)); + return (net_send_queue(c, nb, length, NULL, NULL)); } struct spdy_stream *