kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 364dc58219a7ac9608e5f7affc838e002e861939
parent fdb9004c6d4fcaea0bf8f8d09f62f31f9c0eea55
Author: Joris Vink <joris@coders.se>
Date:   Thu,  2 May 2013 09:10:35 +0200

rework disconnecting clients and fix bug where spdy ocnnections could segfault after disconnecting

Diffstat:
includes/kore.h | 11+++++++----
src/http.c | 20+++++++++++---------
src/kore.c | 29+++++++++++++++++++++++++----
src/net.c | 27+++++++++++++--------------
4 files changed, 56 insertions(+), 31 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -33,7 +33,6 @@ struct netbuf { u_int32_t offset; u_int32_t len; u_int8_t type; - u_int8_t retain; u_int8_t flags; void *owner; @@ -47,9 +46,10 @@ struct listener { struct sockaddr_in sin; }; -#define CONN_STATE_UNKNOWN 0 -#define CONN_STATE_SSL_SHAKE 1 -#define CONN_STATE_ESTABLISHED 2 +#define CONN_STATE_UNKNOWN 0 +#define CONN_STATE_SSL_SHAKE 1 +#define CONN_STATE_ESTABLISHED 2 +#define CONN_STATE_DISCONNECTING 3 #define CONN_PROTO_UNKNOWN 0 #define CONN_PROTO_SPDY 1 @@ -59,6 +59,7 @@ struct listener { #define CONN_WRITE_POSSIBLE 0x02 #define NETBUF_CALL_CB_ALWAYS 0x01 +#define NETBUF_FORCE_REMOVE 0x02 struct connection { int fd; @@ -79,6 +80,8 @@ struct connection { u_int32_t client_stream_id; TAILQ_HEAD(, spdy_stream) spdy_streams; + + TAILQ_ENTRY(connection) list; }; #define HANDLER_TYPE_STATIC 1 diff --git a/src/http.c b/src/http.c @@ -222,7 +222,7 @@ void http_process(void) { struct http_request *req, *next; - int (*handler)(struct http_request *); + int r, (*handler)(struct http_request *); if (TAILQ_EMPTY(&http_requests)) return; @@ -232,15 +232,16 @@ http_process(void) next = TAILQ_NEXT(req, list); handler = kore_module_handler_find(req->path); - if (handler == NULL) { - if (!http_generic_404(req)) - kore_server_disconnect(req->owner); - } else { - if (!handler(req)) - kore_server_disconnect(req->owner); - } + if (handler == NULL) + r = http_generic_404(req); + else + r = handler(req); + + if (r != KORE_RESULT_ERROR) + net_send_flush(req->owner); + else + kore_server_disconnect(req->owner); - net_send_flush(req->owner); TAILQ_REMOVE(&http_requests, req, list); http_request_free(req); } @@ -265,6 +266,7 @@ http_header_recv(struct netbuf *nb) if (nb->len > 2 && strncmp((p - 2), "\r\n\r\n", 4)) return (KORE_RESULT_OK); + nb->flags |= NETBUF_FORCE_REMOVE; hbuf = kore_strdup((const char *)nb->buf); h = kore_split_string(hbuf, "\r\n", headers, HTTP_REQ_HEADER_MAX); diff --git a/src/kore.c b/src/kore.c @@ -43,6 +43,9 @@ static int efd = -1; static SSL_CTX *ssl_ctx = NULL; + +static TAILQ_HEAD(, connection) disconnected; + int server_port = 0; char *server_ip = NULL; @@ -51,16 +54,17 @@ static int kore_server_sslstart(void); static void kore_event(int, int, void *); static int kore_server_accept(struct listener *); static int kore_connection_handle(struct connection *, int); +static void kore_server_final_disconnect(struct connection *); static int kore_server_bind(struct listener *, const char *, int); static int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); int main(int argc, char *argv[]) { - struct connection *c; struct listener server; struct epoll_event *events; int n, i, *fd; + struct connection *c, *cnext; if (argc != 2) fatal("Usage: kore [config file]"); @@ -80,6 +84,7 @@ main(int argc, char *argv[]) fatal("epoll_create(): %s", errno_s); http_init(); + TAILQ_INIT(&disconnected); kore_event(server.fd, EPOLLIN, &server); events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); @@ -116,12 +121,28 @@ main(int argc, char *argv[]) } http_process(); + + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + TAILQ_REMOVE(&disconnected, c, list); + kore_server_final_disconnect(c); + } } close(server.fd); return (0); } +void +kore_server_disconnect(struct connection *c) +{ + if (c->state != CONN_STATE_DISCONNECTING) { + kore_log("preparing %p for disconnection", c); + c->state = CONN_STATE_DISCONNECTING; + TAILQ_INSERT_TAIL(&disconnected, c, list); + } +} + static int kore_server_sslstart(void) { @@ -230,13 +251,13 @@ kore_server_accept(struct listener *l) return (KORE_RESULT_OK); } -void -kore_server_disconnect(struct connection *c) +static void +kore_server_final_disconnect(struct connection *c) { struct netbuf *nb, *next; struct spdy_stream *s, *snext; - kore_log("kore_server_disconnect(%p)", c); + kore_log("kore_server_final_disconnect(%p)", c); close(c->fd); if (c->ssl != NULL) diff --git a/src/net.c b/src/net.c @@ -50,7 +50,6 @@ net_send_queue(struct connection *c, u_int8_t *data, size_t len, int flags, nb->len = len; nb->owner = c; nb->offset = 0; - nb->retain = 0; nb->flags = flags; nb->type = NETBUF_SEND; @@ -81,7 +80,6 @@ net_recv_queue(struct connection *c, size_t len, int flags, nb->len = len; nb->owner = c; nb->offset = 0; - nb->retain = 0; nb->flags = flags; nb->type = NETBUF_RECV; nb->buf = (u_int8_t *)kore_malloc(nb->len); @@ -109,7 +107,7 @@ net_recv_expand(struct connection *c, struct netbuf *nb, size_t len, nb->buf = (u_int8_t *)kore_realloc(nb->buf, nb->len); TAILQ_INSERT_HEAD(&(c->recv_queue), nb, list); - return (net_recv(c)); + return (KORE_RESULT_OK); } int @@ -129,8 +127,8 @@ net_send(struct connection *c) r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); - //kore_log("net_send(%ld/%ld bytes), progress with %d", - // nb->offset, nb->len, r); + kore_log("net_send(%ld/%ld bytes), progress with %d", + nb->offset, nb->len, r); if (r <= 0) { r = SSL_get_error(c->ssl, r); @@ -146,7 +144,7 @@ net_send(struct connection *c) } nb->offset += (size_t)r; - if (nb->offset == nb->len || (nb->flags & NETBUF_CALL_CB_ALWAYS)) { + if (nb->offset == nb->len) { if (nb->offset == nb->len) TAILQ_REMOVE(&(c->send_queue), nb, list); @@ -169,6 +167,8 @@ net_send(struct connection *c) int net_send_flush(struct connection *c) { + kore_log("net_send_flush(%p)", c); + while (!TAILQ_EMPTY(&(c->send_queue)) && (c->flags & CONN_WRITE_POSSIBLE)) { if (!net_send(c)) @@ -190,8 +190,8 @@ net_recv(struct connection *c) nb = TAILQ_FIRST(&(c->recv_queue)); r = SSL_read(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); - //kore_log("net_recv(%ld/%ld bytes), progress with %d", - // nb->offset, nb->len, r); + kore_log("net_recv(%ld/%ld bytes), progress with %d", + nb->offset, nb->len, r); if (r <= 0) { r = SSL_get_error(c->ssl, r); @@ -213,13 +213,10 @@ net_recv(struct connection *c) return (KORE_RESULT_ERROR); } - nb->retain++; - if (nb->offset == nb->len) - TAILQ_REMOVE(&(c->recv_queue), nb, list); r = nb->cb(nb); - nb->retain--; - - if (nb->retain == 0 && nb->offset == nb->len) { + if (nb->offset == nb->len || + (nb->flags & NETBUF_FORCE_REMOVE)) { + TAILQ_REMOVE(&(c->recv_queue), nb, list); free(nb->buf); free(nb); } @@ -233,6 +230,8 @@ net_recv(struct connection *c) int net_recv_flush(struct connection *c) { + kore_log("net_recv_flush(%p)", c); + while (!TAILQ_EMPTY(&(c->recv_queue)) && (c->flags & CONN_READ_POSSIBLE)) { if (!net_recv(c))