kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 7771adbec21f0fd2063b95f989972a814a1dfc2a
parent b2f870a36f9b7d5f53e7cb8ccf1bfb0090d39784
Author: Joris Vink <joris@coders.se>
Date:   Wed, 17 Sep 2014 08:25:45 +0200

Allow applications to create new connections in our event loop.

Diffstat:
includes/kore.h | 30+++++++++++++++++++++---------
src/bsd.c | 12++++++++----
src/connection.c | 63+++++++++++++++++++++++++++++++++++++++++----------------------
src/linux.c | 11++++++++---
src/net.c | 167++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
5 files changed, 183 insertions(+), 100 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -160,6 +160,10 @@ struct connection { void *hdlr_extra; X509 *cert; + void (*disconnect)(struct connection *); + int (*read)(struct connection *, int *); + int (*write)(struct connection *, int, int *); + u_int8_t addrtype; union { struct sockaddr_in ipv4; @@ -359,6 +363,7 @@ void kore_platform_disable_read(int); void kore_platform_enable_accept(void); void kore_platform_disable_accept(void); void kore_platform_event_wait(u_int64_t); +void kore_platform_event_all(int, void *); void kore_platform_schedule_read(int, void *); void kore_platform_event_schedule(int, int, int, void *); void kore_platform_worker_setcpu(struct kore_worker *); @@ -376,15 +381,18 @@ int kore_ssl_sni_cb(SSL *, int *, void *); int kore_server_bind(const char *, const char *); int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); -void kore_connection_init(void); -int kore_connection_nonblock(int); -int kore_connection_handle(struct connection *); -void kore_connection_remove(struct connection *); -void kore_connection_disconnect(struct connection *); -void kore_connection_start_idletimer(struct connection *); -void kore_connection_stop_idletimer(struct connection *); -void kore_connection_check_idletimer(u_int64_t, struct connection *); -int kore_connection_accept(struct listener *, struct connection **); +void kore_connection_init(void); +struct connection *kore_connection_new(void *); +int kore_connection_nonblock(int); +int kore_connection_handle(struct connection *); +void kore_connection_remove(struct connection *); +void kore_connection_disconnect(struct connection *); +void kore_connection_start_idletimer(struct connection *); +void kore_connection_stop_idletimer(struct connection *); +void kore_connection_check_idletimer(u_int64_t, + struct connection *); +int kore_connection_accept(struct listener *, + struct connection **); u_int64_t kore_time_ms(void); void kore_log_init(void); @@ -457,6 +465,10 @@ int net_recv(struct connection *); int net_send(struct connection *); int net_send_flush(struct connection *); int net_recv_flush(struct connection *); +int net_read(struct connection *, int *); +int net_read_ssl(struct connection *, int *); +int net_write(struct connection *, int, int *); +int net_write_ssl(struct connection *, int, int *); void net_remove_netbuf(struct netbuf_head *, struct netbuf *); void net_recv_queue(struct connection *, u_int32_t, int, struct netbuf **, int (*cb)(struct netbuf *)); diff --git a/src/bsd.c b/src/bsd.c @@ -145,10 +145,7 @@ kore_platform_event_wait(u_int64_t timer) if (c == NULL) break; - kore_platform_event_schedule(c->fd, - EVFILT_READ, EV_ADD, c); - kore_platform_event_schedule(c->fd, - EVFILT_WRITE, EV_ADD | EV_ONESHOT, c); + kore_platform_event_all(c->fd, c); } break; case KORE_TYPE_CONNECTION: @@ -187,6 +184,13 @@ kore_platform_event_wait(u_int64_t timer) } void +kore_platform_event_all(int fd, void *c) +{ + kore_platform_event_schedule(fd, EVFILT_READ, EV_ADD, c); + kore_platform_event_schedule(fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, c); +} + +void kore_platform_event_schedule(int fd, int type, int flags, void *data) { if (nchanges >= event_count) { diff --git a/src/connection.c b/src/connection.c @@ -33,6 +33,38 @@ kore_connection_init(void) sizeof(struct connection), worker_max_connections); } +struct connection * +kore_connection_new(void *owner) +{ + struct connection *c; + + c = kore_pool_get(&connection_pool); + + c->ssl = NULL; + c->flags = 0; + c->cert = NULL; + c->owner = owner; + c->disconnect = NULL; + c->hdlr_extra = NULL; + c->inflate_started = 0; + c->deflate_started = 0; + c->client_stream_id = 0; + c->proto = CONN_PROTO_UNKNOWN; + c->type = KORE_TYPE_CONNECTION; + c->wsize_initial = SPDY_INIT_WSIZE; + c->spdy_send_wsize = SPDY_INIT_WSIZE; + c->spdy_recv_wsize = SPDY_INIT_WSIZE; + c->idle_timer.start = 0; + c->idle_timer.length = KORE_IDLE_TIMER_MAX; + + TAILQ_INIT(&(c->send_queue)); + TAILQ_INIT(&(c->recv_queue)); + TAILQ_INIT(&(c->spdy_streams)); + TAILQ_INIT(&(c->http_requests)); + + return (c); +} + int kore_connection_accept(struct listener *l, struct connection **out) { @@ -43,8 +75,7 @@ kore_connection_accept(struct listener *l, struct connection **out) kore_debug("kore_connection_accept(%p)", l); *out = NULL; - c = kore_pool_get(&connection_pool); - c->type = KORE_TYPE_CONNECTION; + c = kore_connection_new(l); c->addrtype = l->addrtype; if (c->addrtype == AF_INET) { @@ -67,31 +98,16 @@ kore_connection_accept(struct listener *l, struct connection **out) return (KORE_RESULT_ERROR); } - c->owner = l; - c->ssl = NULL; - c->flags = 0; - c->cert = NULL; - c->hdlr_extra = NULL; - c->inflate_started = 0; - c->deflate_started = 0; - c->client_stream_id = 0; - c->proto = CONN_PROTO_UNKNOWN; - c->wsize_initial = SPDY_INIT_WSIZE; - c->spdy_send_wsize = SPDY_INIT_WSIZE; - c->spdy_recv_wsize = SPDY_INIT_WSIZE; - c->idle_timer.start = 0; - c->idle_timer.length = KORE_IDLE_TIMER_MAX; - - TAILQ_INIT(&(c->send_queue)); - TAILQ_INIT(&(c->recv_queue)); - TAILQ_INIT(&(c->spdy_streams)); - TAILQ_INIT(&(c->http_requests)); - #if !defined(KORE_BENCHMARK) c->state = CONN_STATE_SSL_SHAKE; + c->write = net_write_ssl; + c->read = net_read_ssl; #else c->state = CONN_STATE_ESTABLISHED; c->proto = CONN_PROTO_HTTP; + c->write = net_write; + c->read = net_read; + if (http_keepalive_time != 0) c->idle_timer.length = http_keepalive_time * 1000; @@ -112,6 +128,9 @@ kore_connection_disconnect(struct connection *c) if (c->state != CONN_STATE_DISCONNECTING) { kore_debug("preparing %p for disconnection", c); c->state = CONN_STATE_DISCONNECTING; + if (c->disconnect) + c->disconnect(c); + kore_worker_connection_move(c); } } diff --git a/src/linux.c b/src/linux.c @@ -131,9 +131,7 @@ kore_platform_event_wait(u_int64_t timer) if (c == NULL) break; - kore_platform_event_schedule(c->fd, - EPOLLIN | EPOLLOUT | - EPOLLRDHUP | EPOLLET, 0, c); + kore_platform_event_all(c->fd, c); } break; case KORE_TYPE_CONNECTION: @@ -165,6 +163,13 @@ kore_platform_event_wait(u_int64_t timer) } void +kore_platform_event_all(int fd, void *c) +{ + kore_platform_event_schedule(fd, + EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET, 0, c); +} + +void kore_platform_event_schedule(int fd, int type, int flags, void *udata) { struct epoll_event evt; diff --git a/src/net.c b/src/net.c @@ -176,37 +176,11 @@ net_send(struct connection *c) len = MIN(NETBUF_SEND_PAYLOAD_MAX, smin); -#if !defined(KORE_BENCHMARK) - r = SSL_write(c->ssl, - (c->snb->buf + c->snb->s_off), len); - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - c->snb->flags |= NETBUF_MUST_RESEND; - c->flags &= ~CONN_WRITE_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("SSL_write(): %s", - ssl_errno_s); - return (KORE_RESULT_ERROR); - } - } -#else - r = write(c->fd, (c->snb->buf + c->snb->s_off), len); - if (r <= -1) { - switch (errno) { - case EINTR: - case EAGAIN: - c->flags &= ~CONN_WRITE_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("write: %s", errno_s); - return (KORE_RESULT_ERROR); - } - } -#endif + if (!c->write(c, len, &r)) + return (KORE_RESULT_ERROR); + if (!(c->flags & CONN_WRITE_POSSIBLE)) + return (KORE_RESULT_OK); + kore_debug("net_send(%p/%d/%d bytes), progress with %d", c->snb, c->snb->s_off, c->snb->b_len, r); @@ -253,37 +227,11 @@ net_recv(struct connection *c) return (KORE_RESULT_ERROR); } -#if !defined(KORE_BENCHMARK) - r = SSL_read(c->ssl, - (c->rnb->buf + c->rnb->s_off), - (c->rnb->b_len - c->rnb->s_off)); - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - c->flags &= ~CONN_READ_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("SSL_read(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - } -#else - r = read(c->fd, (c->rnb->buf + c->rnb->s_off), - (c->rnb->b_len - c->rnb->s_off)); - if (r <= 0) { - switch (errno) { - case EINTR: - case EAGAIN: - c->flags &= ~CONN_READ_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("read(): %s", errno_s); - return (KORE_RESULT_ERROR); - } - } -#endif + if (!c->read(c, &r)) + return (KORE_RESULT_ERROR); + if (!(c->flags & CONN_READ_POSSIBLE)) + return (KORE_RESULT_OK); + kore_debug("net_recv(%ld/%ld bytes), progress with %d", c->rnb->s_off, c->rnb->b_len, r); @@ -340,6 +288,101 @@ net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) kore_pool_put(&nb_pool, nb); } +#if !defined(KORE_BENCHMARK) +int +net_write_ssl(struct connection *c, int len, int *written) +{ + int r; + + r = SSL_write(c->ssl, (c->snb->buf + c->snb->s_off), len); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + c->snb->flags |= NETBUF_MUST_RESEND; + c->flags &= ~CONN_WRITE_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("SSL_write(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + } + + *written = r; + return (KORE_RESULT_OK); +} + +int +net_read_ssl(struct connection *c, int *bytes) +{ + int r; + + r = SSL_read(c->ssl, (c->rnb->buf + c->rnb->s_off), + (c->rnb->b_len - c->rnb->s_off)); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + c->flags &= ~CONN_READ_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("SSL_read(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + } + + *bytes = r; + return (KORE_RESULT_OK); +} +#endif + +int +net_write(struct connection *c, int len, int *written) +{ + int r; + + r = write(c->fd, (c->snb->buf + c->snb->s_off), len); + if (r <= -1) { + switch (errno) { + case EINTR: + case EAGAIN: + c->flags &= ~CONN_WRITE_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("write: %s", errno_s); + return (KORE_RESULT_ERROR); + } + } + + *written = r; + return (KORE_RESULT_OK); +} + +int +net_read(struct connection *c, int *bytes) +{ + int r; + + r = read(c->fd, (c->rnb->buf + c->rnb->s_off), + (c->rnb->b_len - c->rnb->s_off)); + if (r <= 0) { + switch (errno) { + case EINTR: + case EAGAIN: + c->flags &= ~CONN_READ_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("read(): %s", errno_s); + return (KORE_RESULT_ERROR); + } + } + + *bytes = r; + return (KORE_RESULT_OK); +} + u_int16_t net_read16(u_int8_t *b) {