kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 23c0ec67c6df33e5b9c5197c664ed495d5a8a455
parent 7a6be8ff2e1eaae16e0e2ba55857a3e2a212a715
Author: Joris Vink <joris@coders.se>
Date:   Sun, 28 Apr 2013 19:11:44 +0200

begin with the ability to read control frames. something feels fishy with epoll() and its triggering of events. I probably got it wrong.

Diffstat:
Makefile | 2+-
includes/kore.h | 15+++++++++++++--
includes/spdy.h | 47+++++++++++++++++++++++++++++++++++++++++++++++
src/kore.c | 89++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
src/net.c | 171+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/spdy.c | 100+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/utils.c | 1+
7 files changed, 408 insertions(+), 17 deletions(-)

diff --git a/Makefile b/Makefile @@ -3,7 +3,7 @@ CC=gcc BIN=kore -S_SRC= src/kore.c src/utils.c +S_SRC= src/kore.c src/net.c src/spdy.c src/utils.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-I/usr/local/ssl/include diff --git a/includes/kore.h b/includes/kore.h @@ -28,9 +28,9 @@ #define KORE_SSL_PROTO_STRING "\x06spdy/3\x08http/1.1" struct netbuf { - u_int8_t *data; + u_int8_t *buf; u_int32_t offset; - u_int32_t length; + u_int32_t len; void *owner; int (*cb)(struct netbuf *); @@ -58,6 +58,8 @@ struct connection { void *owner; SSL *ssl; + struct spdy_frame spdy_cur_frame; + TAILQ_HEAD(, netbuf) send_queue; TAILQ_HEAD(, netbuf) recv_queue; }; @@ -70,4 +72,13 @@ char *kore_strdup(const char *); void fatal(const char *, ...); void kore_log_internal(char *, int, const char *, ...); +int net_recv(struct connection *); +int net_send(struct connection *); +void net_recv_queue(struct connection *, size_t, + int (*cb)(struct netbuf *)); +void net_send_queue(struct connection *, u_int8_t *, size_t, + int (*cb)(struct netbuf *)); + +int spdy_frame_recv(struct netbuf *); + #endif /* !__H_KORE_H */ diff --git a/includes/spdy.h b/includes/spdy.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef __H_SPDY_H +#define __H_SPDY_H + +struct spdy_frame { + u_int32_t frame_1; + u_int32_t frame_2; +}; + +struct spdy_ctrl_frame { + int type:16; + int version:15; + int control_bit:1; + int length:24; + int flags:8; +}; + +struct spdy_data_frame { + int stream_id:31; + int control_bit:1; + int length:24; + int flags:8; +}; + +#define SPDY_CONTROL_FRAME(x) ((x->frame_1 & (1 << 31))) +#define SPDY_FRAME_SIZE 8 + +/* control frames. */ +#define SPDY_CTRL_FRAME_SYN_STREAM 1 +#define SPDY_CTRL_FRAME_SETTINGS 4 + +#endif /* !__H_SPDY_H */ diff --git a/src/kore.c b/src/kore.c @@ -33,6 +33,7 @@ #include <string.h> #include <unistd.h> +#include "spdy.h" #include "kore.h" #define EPOLL_EVENTS 500 @@ -44,6 +45,7 @@ static int kore_socket_nonblock(int); static int kore_server_sslstart(void); static void kore_event(int, int, void *); static int kore_server_accept(struct listener *); +static void kore_server_disconnect(struct connection *); static int kore_connection_handle(struct connection *, int); static int kore_server_bind(struct listener *, const char *, int); static int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); @@ -70,10 +72,12 @@ main(int argc, char *argv[]) kore_event(server.fd, EPOLLIN, &server); events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); for (;;) { + kore_log("main(): epoll_wait()"); n = epoll_wait(efd, events, EPOLL_EVENTS, -1); if (n == -1) fatal("epoll_wait(): %s", errno_s); + kore_log("main(): %d sockets available", n); for (i = 0; i < n; i++) { fd = (int *)events[i].data.ptr; @@ -83,6 +87,7 @@ main(int argc, char *argv[]) fatal("error on server socket"); c = (struct connection *)events[i].data.ptr; + kore_server_disconnect(c); continue; } @@ -92,7 +97,7 @@ main(int argc, char *argv[]) c = (struct connection *)events[i].data.ptr; if (!kore_connection_handle(c, events[i].events)) - /* Disconnect. */; + kore_server_disconnect(c); } } } @@ -104,6 +109,8 @@ main(int argc, char *argv[]) static int kore_server_sslstart(void) { + kore_log("kore_server_sslstart()"); + SSL_library_init(); SSL_load_error_strings(); ssl_ctx = SSL_CTX_new(SSLv23_server_method()); @@ -135,6 +142,8 @@ kore_server_sslstart(void) static int kore_server_bind(struct listener *l, const char *ip, int port) { + kore_log("kore_server_bind(%p, %s, %d)", l, ip, port); + if ((l->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { kore_log("socket(): %s", errno_s); return (KORE_RESULT_ERROR); @@ -171,6 +180,8 @@ kore_server_accept(struct listener *l) socklen_t len; struct connection *c; + kore_log("kore_server_accept(%p)", l); + len = sizeof(struct sockaddr_in); c = (struct connection *)kore_malloc(sizeof(*c)); if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { @@ -198,6 +209,35 @@ kore_server_accept(struct listener *l) return (KORE_RESULT_OK); } +static void +kore_server_disconnect(struct connection *c) +{ + struct netbuf *nb, *next; + + kore_log("kore_server_disconnect(%p)", c); + + close(c->fd); + if (c->ssl != NULL) + SSL_free(c->ssl); + + for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { + next = TAILQ_NEXT(nb, list); + TAILQ_REMOVE(&(c->send_queue), nb, list); + free(nb->buf); + free(nb); + } + + for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { + next = TAILQ_NEXT(nb, list); + TAILQ_REMOVE(&(c->recv_queue), nb, list); + free(nb->buf); + free(nb); + } + + kore_log("disconnect connection from %s", inet_ntoa(c->sin.sin_addr)); + free(c); +} + static int kore_connection_handle(struct connection *c, int flags) { @@ -205,6 +245,8 @@ kore_connection_handle(struct connection *c, int flags) u_int32_t len; const u_char *data; + kore_log("kore_connection_handle(%p, %d)", c, flags); + switch (c->state) { case CONN_STATE_SSL_SHAKE: if (c->ssl == NULL) { @@ -222,7 +264,10 @@ kore_connection_handle(struct connection *c, int flags) r = SSL_get_error(c->ssl, r); switch (r) { case SSL_ERROR_WANT_READ: + kore_log("ssl_want_read on handshake"); + return (KORE_RESULT_OK); case SSL_ERROR_WANT_WRITE: + kore_log("ssl_want_write on handshake"); return (KORE_RESULT_OK); default: kore_log("SSL_accept(): %s", ssl_errno_s); @@ -241,6 +286,7 @@ kore_connection_handle(struct connection *c, int flags) if (!memcmp(data, "spdy/3", 6)) kore_log("using SPDY/3"); c->proto = CONN_PROTO_SPDY; + net_recv_queue(c, SPDY_FRAME_SIZE, spdy_frame_recv); } else { kore_log("using HTTP/1.1"); c->proto = CONN_PROTO_HTTP; @@ -249,7 +295,12 @@ kore_connection_handle(struct connection *c, int flags) c->state = CONN_STATE_ESTABLISHED; break; case CONN_STATE_ESTABLISHED: - kore_log("got bytes on established"); + if (flags & EPOLLIN) { + if (!net_recv(c)) + return (KORE_RESULT_ERROR); + } else { + kore_log("got unhandled client event"); + } break; default: kore_log("unknown state on %d (%d)", c->fd, c->state); @@ -264,6 +315,8 @@ kore_socket_nonblock(int fd) { int flags; + kore_log("kore_socket_nonblock(%d)", fd); + if ((flags = fcntl(fd, F_GETFL, 0)) == -1) { kore_log("fcntl(): F_GETFL %s", errno_s); return (KORE_RESULT_ERROR); @@ -278,24 +331,32 @@ kore_socket_nonblock(int fd) return (KORE_RESULT_OK); } -static void -kore_event(int fd, int flags, void *udata) -{ - struct epoll_event evt; - - evt.events = flags; - evt.data.ptr = udata; - if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &evt) == -1) - fatal("epoll_ctl(): %s", errno_s); -} - static int kore_ssl_npn_cb(SSL *ssl, const u_char **data, unsigned int *len, void *arg) { - kore_log("npn callback: sending protocols"); + kore_log("kore_ssl_npn_cb(): sending protocols"); *data = (const unsigned char *)KORE_SSL_PROTO_STRING; *len = strlen(KORE_SSL_PROTO_STRING); return (SSL_TLSEXT_ERR_OK); } + +static void +kore_event(int fd, int flags, void *udata) +{ + struct epoll_event evt; + + kore_log("kore_event(%d, %d, %p)", fd, flags, udata); + + evt.events = flags; + evt.data.ptr = udata; + if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &evt) == -1) { + if (errno == EEXIST) { + if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &evt) == -1) + fatal("epoll_ctl() MOD: %s", errno_s); + } else { + fatal("epoll_ctl() ADD: %s", errno_s); + } + } +} diff --git a/src/net.c b/src/net.c @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> +#include <sys/epoll.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <errno.h> +#include <fcntl.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "spdy.h" +#include "kore.h" + +void +net_send_queue(struct connection *c, u_int8_t *data, size_t len, + int (*cb)(struct netbuf *)) +{ + struct netbuf *nb; + + kore_log("net_send_queue(%p, %p, %d, %p)", c, data, len, cb); + + nb = (struct netbuf *)kore_malloc(sizeof(*nb)); + nb->cb = cb; + nb->len = len; + nb->owner = c; + nb->offset = 0; + nb->buf = (u_int8_t *)kore_malloc(nb->len); + memcpy(nb->buf, data, nb->len); + + TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); + net_send(c); +} + +void +net_recv_queue(struct connection *c, size_t len, int (*cb)(struct netbuf *)) +{ + struct netbuf *nb; + + kore_log("net_recv_queue(%p, %d, %p)", c, len, cb); + + nb = (struct netbuf *)kore_malloc(sizeof(*nb)); + nb->cb = cb; + nb->len = len; + nb->owner = c; + nb->offset = 0; + nb->buf = (u_int8_t *)kore_malloc(nb->len); + + TAILQ_INSERT_TAIL(&(c->recv_queue), nb, list); + net_recv(c); +} + +int +net_send(struct connection *c) +{ + int r; + struct netbuf *nb; + + kore_log("net_send(%p)", c); + + if (TAILQ_EMPTY(&(c->send_queue))) + return (KORE_RESULT_OK); + + nb = TAILQ_FIRST(&(c->send_queue)); + r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); + kore_log("SSL_write(): %d bytes", r); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + kore_log("ssl_want_read on net_send()"); + return (KORE_RESULT_OK); + case SSL_ERROR_WANT_WRITE: + kore_log("ssl_want_write on net_send()"); + return (KORE_RESULT_OK); + default: + kore_log("SSL_write(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + } + + nb->offset += (size_t)r; + if (nb->offset == nb->len) { + TAILQ_REMOVE(&(c->send_queue), nb, list); + + if (nb->cb != NULL) + r = nb->cb(nb); + else + r = KORE_RESULT_OK; + + free(nb->buf); + free(nb); + } else { + r = KORE_RESULT_OK; + } + + return (r); +} + +int +net_recv(struct connection *c) +{ + int r; + struct netbuf *nb; + + kore_log("net_recv(%p)", c); + + if (TAILQ_EMPTY(&(c->recv_queue))) + return (KORE_RESULT_ERROR); + + nb = TAILQ_FIRST(&(c->recv_queue)); + kore_log("nb is %p (%d/%d bytes)", nb, nb->offset, nb->len); + r = SSL_read(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); + kore_log("SSL_read(): %d bytes", r); + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + kore_log("ssl_want_read on net_recv()"); + return (KORE_RESULT_OK); + case SSL_ERROR_WANT_WRITE: + kore_log("ssl_want_write on net_recv()"); + return (KORE_RESULT_OK); + default: + kore_log("SSL_read(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + } + + nb->offset += (size_t)r; + kore_log("read %d out of %d bytes", nb->offset, nb->len); + if (nb->offset == nb->len) { + if (nb->cb == NULL) { + kore_log("kore_read_client(): nb->cb == NULL"); + return (KORE_RESULT_ERROR); + } + + TAILQ_REMOVE(&(c->recv_queue), nb, list); + r = nb->cb(nb); + free(nb->buf); + free(nb); + } else { + r = KORE_RESULT_OK; + } + + return (r); +} diff --git a/src/spdy.c b/src/spdy.c @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <errno.h> +#include <fcntl.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "spdy.h" +#include "kore.h" +static int spdy_ctrl_frame_syn_stream(struct netbuf *); +static int spdy_ctrl_frame_settings(struct netbuf *); + +int +spdy_frame_recv(struct netbuf *nb) +{ + struct spdy_ctrl_frame *ctrl; + int (*cb)(struct netbuf *); + struct connection *c = (struct connection *)nb->owner; + struct spdy_frame *frame = (struct spdy_frame *)nb->buf; + + frame->frame_1 = ntohl(frame->frame_1); + frame->frame_2 = ntohl(frame->frame_2); + + c->spdy_cur_frame = *frame; + if (SPDY_CONTROL_FRAME(frame)) { + kore_log("received control frame"); + + ctrl = (struct spdy_ctrl_frame *)frame; + kore_log("type is %d", ctrl->type); + kore_log("version is %d", ctrl->version); + kore_log("length is %d", ctrl->length); + + switch (ctrl->type) { + case SPDY_CTRL_FRAME_SYN_STREAM: + cb = spdy_ctrl_frame_syn_stream; + break; + case SPDY_CTRL_FRAME_SETTINGS: + cb = spdy_ctrl_frame_settings; + break; + default: + cb = NULL; + break; + } + + if (cb != NULL) { + net_recv_queue(c, ctrl->length, cb); + } else { + kore_log("no callback for type %d", ctrl->type); + } + } else { + kore_log("received data frame"); + } + + return (KORE_RESULT_OK); +} + +static int +spdy_ctrl_frame_syn_stream(struct netbuf *nb) +{ + kore_log("-- SPDY_SYN_STREAM"); + return (KORE_RESULT_OK); +} + +static int +spdy_ctrl_frame_settings(struct netbuf *nb) +{ + struct connection *c = (struct connection *)nb->owner; + + kore_log("-- SPDY_SETTINGS"); + net_recv_queue(c, SPDY_FRAME_SIZE, spdy_frame_recv); + + return (KORE_RESULT_OK); +} diff --git a/src/utils.c b/src/utils.c @@ -32,6 +32,7 @@ #include <stdlib.h> #include <string.h> +#include "spdy.h" #include "kore.h" void *