kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit c463ecb3cbf29e5d4dd7e0fad4a4a16d4da9d208
parent 2449a86085a2020bfba55fa0e858668d2f0318f3
Author: Joris Vink <joris@coders.se>
Date:   Tue,  9 Oct 2018 19:34:40 +0200

Changes to the event loop inside of Kore.

Now anyone can schedule events and get a callback to work as long
as the user data structure that is added for the event begins
with a kore_event data structure.

All event state is now kept in that kore_event structure and renamed
CONN_[READ|WRITE]_POSSIBLE to KORE_EVENT_[READ|WRITE].

Diffstat:
examples/nohttp/src/nohttp.c | 7++++---
examples/tls-proxy/src/proxy.c | 8++++----
include/kore/kore.h | 29+++++++++++++++++++----------
include/kore/pgsql.h | 2+-
include/kore/tasks.h | 4++--
src/bsd.c | 86+++++++++++--------------------------------------------------------------------
src/connection.c | 22+++++++++++++++++++---
src/domain.c | 2+-
src/kore.c | 35++++++++++++++++++++++++++++++++++-
src/linux.c | 85++++++++++++-------------------------------------------------------------------
src/msg.c | 2+-
src/net.c | 22+++++++++++-----------
src/pgsql.c | 9+++++++--
src/tasks.c | 8++++++--
src/websocket.c | 4++--
15 files changed, 135 insertions(+), 190 deletions(-)

diff --git a/examples/nohttp/src/nohttp.c b/examples/nohttp/src/nohttp.c @@ -56,9 +56,10 @@ connection_setup(struct connection *c) * * In this callback you would generally look at the state of the connection * in c->state and perform the required actions like writing / reading using - * net_send_flush() or net_recv_flush() if CONN_SEND_POSSIBLE or - * CONN_READ_POSSIBLE are set respectively. Returning KORE_RESULT_ERROR from - * this callback will disconnect the connection alltogether. + * net_send_flush() or net_recv_flush() if KORE_EVENT_WRITE or + * KORE_EVENT_READ are set respectively in c->evt.flags. + * Returning KORE_RESULT_ERROR from this callback will disconnect the + * connection alltogether. */ int connection_handle(struct connection *c) diff --git a/examples/tls-proxy/src/proxy.c b/examples/tls-proxy/src/proxy.c @@ -100,7 +100,7 @@ client_setup(struct connection *c) backend = kore_connection_new(NULL); /* Prepare our connection. */ - backend->addrtype = AF_INET; + backend->family = AF_INET; backend->addr.ipv4.sin_family = AF_INET; backend->addr.ipv4.sin_port = htons(backends[i].port); backend->addr.ipv4.sin_addr.s_addr = inet_addr(backends[i].ip); @@ -150,7 +150,7 @@ client_setup(struct connection *c) TAILQ_INSERT_TAIL(&connections, backend, list); /* Kick off connecting. */ - backend->flags |= CONN_WRITE_POSSIBLE; + backend->evt.flags |= KORE_EVENT_WRITE; backend->handle(backend); } @@ -170,7 +170,7 @@ backend_handle_connect(struct connection *c) struct connection *src; /* We will get a write notification when we can progress. */ - if (!(c->flags & CONN_WRITE_POSSIBLE)) + if (!(c->evt.flags & KORE_EVENT_WRITE)) return (KORE_RESULT_OK); kore_connection_stop_idletimer(c); @@ -190,7 +190,7 @@ backend_handle_connect(struct connection *c) /* Clean the write flag, we'll be called later. */ if (errno != EISCONN) { - c->flags &= ~CONN_WRITE_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_WRITE; kore_connection_start_idletimer(c); return (KORE_RESULT_OK); } diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -171,13 +171,13 @@ TAILQ_HEAD(netbuf_head, netbuf); #define CONN_PROTO_WEBSOCKET 2 #define CONN_PROTO_MSG 3 -#define CONN_READ_POSSIBLE 0x01 -#define CONN_WRITE_POSSIBLE 0x02 -#define CONN_WRITE_BLOCK 0x04 -#define CONN_IDLE_TIMER_ACT 0x10 -#define CONN_READ_BLOCK 0x20 -#define CONN_CLOSE_EMPTY 0x40 -#define CONN_WS_CLOSE_SENT 0x80 +#define KORE_EVENT_READ 0x01 +#define KORE_EVENT_WRITE 0x02 +#define KORE_EVENT_ERROR 0x04 + +#define CONN_IDLE_TIMER_ACT 0x01 +#define CONN_CLOSE_EMPTY 0x02 +#define CONN_WS_CLOSE_SENT 0x04 #define KORE_IDLE_TIMER_MAX 5000 @@ -196,8 +196,14 @@ TAILQ_HEAD(netbuf_head, netbuf); #define KORE_CONNECTION_PRUNE_DISCONNECT 0 #define KORE_CONNECTION_PRUNE_ALL 1 +struct kore_event { + int type; + int flags; + void (*handle)(void *, int); +} __attribute__((packed)); + struct connection { - u_int8_t type; + struct kore_event evt; int fd; u_int8_t state; u_int8_t proto; @@ -272,7 +278,7 @@ struct kore_runtime_call { extern struct kore_runtime kore_native_runtime; struct listener { - u_int8_t type; + struct kore_event evt; int fd; int family; struct kore_runtime_call *connect; @@ -588,10 +594,12 @@ void kore_timer_remove(struct kore_timer *); struct kore_timer *kore_timer_add(void (*cb)(void *, u_int64_t), u_int64_t, void *, int); -int kore_sockopt(int, int, int); void kore_listener_cleanup(void); +void kore_listener_accept(void *, int); void kore_listener_free(struct listener *); struct listener *kore_listener_alloc(int, const char *); + +int kore_sockopt(int, int, int); int kore_server_bind_unix(const char *, const char *); int kore_server_bind(const char *, const char *, const char *); #if !defined(KORE_NO_TLS) @@ -603,6 +611,7 @@ void kore_connection_init(void); void kore_connection_cleanup(void); void kore_connection_prune(int); struct connection *kore_connection_new(void *); +void kore_connection_event(void *, int); int kore_connection_nonblock(int, int); void kore_connection_check_timeout(u_int64_t); int kore_connection_handle(struct connection *); diff --git a/include/kore/pgsql.h b/include/kore/pgsql.h @@ -31,7 +31,7 @@ extern "C" { #endif struct pgsql_conn { - u_int8_t type; + struct kore_event evt; u_int8_t flags; char *name; diff --git a/include/kore/tasks.h b/include/kore/tasks.h @@ -35,7 +35,7 @@ struct http_request; #endif struct kore_task { - u_int8_t type; + struct kore_event evt; int state; int result; pthread_rwlock_t lock; @@ -65,11 +65,11 @@ struct kore_task_thread { }; void kore_task_init(void); +void kore_task_handle(void *, int); void kore_task_run(struct kore_task *); void kore_task_finish(struct kore_task *); void kore_task_destroy(struct kore_task *); int kore_task_finished(struct kore_task *); -void kore_task_handle(struct kore_task *, int); #if !defined(KORE_NO_HTTP) void kore_task_bind_request(struct kore_task *, diff --git a/src/bsd.c b/src/bsd.c @@ -122,9 +122,7 @@ int kore_platform_event_wait(u_int64_t timer) { u_int32_t r; - struct listener *l; - struct connection *c; - u_int8_t type; + struct kore_event *evt; struct timespec timeo; int n, i; @@ -140,83 +138,23 @@ kore_platform_event_wait(u_int64_t timer) if (n > 0) kore_debug("main(): %d sockets available", n); - r = 0; for (i = 0; i < n; i++) { if (events[i].udata == NULL) fatal("events[%d].udata == NULL", i); - type = *(u_int8_t *)events[i].udata; + r = 0; + evt = (struct kore_event *)events[i].udata; - if (events[i].flags & EV_EOF || - events[i].flags & EV_ERROR) { - switch (type) { - case KORE_TYPE_LISTENER: - fatal("error on server socket"); - /* NOTREACHED */ -#if defined(KORE_USE_PGSQL) - case KORE_TYPE_PGSQL_CONN: - kore_pgsql_handle(events[i].udata, 1); - break; -#endif -#if defined(KORE_USE_TASKS) - case KORE_TYPE_TASK: - kore_task_handle(events[i].udata, 1); - break; -#endif - default: - c = (struct connection *)events[i].udata; - kore_connection_disconnect(c); - break; - } + if (events[i].filter == EVFILT_READ) + evt->flags |= KORE_EVENT_READ; - continue; - } + if (events[i].filter == EVFILT_WRITE) + evt->flags |= KORE_EVENT_WRITE; - switch (type) { - case KORE_TYPE_LISTENER: - l = (struct listener *)events[i].udata; - - while (worker_active_connections < - worker_max_connections) { - if (worker_accept_threshold != 0 && - r >= worker_accept_threshold) - break; - - if (!kore_connection_accept(l, &c)) - break; - - if (c == NULL) - break; - - r++; - kore_platform_event_all(c->fd, c); - } - break; - case KORE_TYPE_CONNECTION: - c = (struct connection *)events[i].udata; - if (events[i].filter == EVFILT_READ && - !(c->flags & CONN_READ_BLOCK)) - c->flags |= CONN_READ_POSSIBLE; - if (events[i].filter == EVFILT_WRITE && - !(c->flags & CONN_WRITE_BLOCK)) - c->flags |= CONN_WRITE_POSSIBLE; - - if (c->handle != NULL && !c->handle(c)) - kore_connection_disconnect(c); - break; -#if defined(KORE_USE_PGSQL) - case KORE_TYPE_PGSQL_CONN: - kore_pgsql_handle(events[i].udata, 0); - break; -#endif -#if defined(KORE_USE_TASKS) - case KORE_TYPE_TASK: - kore_task_handle(events[i].udata, 0); - break; -#endif - default: - fatal("wrong type in event %d", type); - } + if (events[i].flags & EV_EOF || events[i].flags & EV_ERROR) + r = 1; + + evt->handle(events[i].udata, r); } return (r); @@ -302,7 +240,7 @@ kore_platform_sendfile(struct connection *c, struct netbuf *nb) if (ret == -1) { if (errno == EAGAIN) { nb->fd_off += len; - c->flags &= ~CONN_WRITE_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_WRITE; return (KORE_RESULT_OK); } diff --git a/src/connection.c b/src/connection.c @@ -74,10 +74,12 @@ kore_connection_new(void *owner) c->disconnect = NULL; c->hdlr_extra = NULL; c->proto = CONN_PROTO_UNKNOWN; - c->type = KORE_TYPE_CONNECTION; c->idle_timer.start = 0; c->idle_timer.length = KORE_IDLE_TIMER_MAX; + c->evt.type = KORE_TYPE_CONNECTION; + c->evt.handle = kore_connection_event; + #if !defined(KORE_NO_HTTP) c->ws_connect = NULL; c->ws_message = NULL; @@ -218,6 +220,20 @@ kore_connection_disconnect(struct connection *c) } } +void +kore_connection_event(void *arg, int error) +{ + struct connection *c = arg; + + if (error) { + kore_connection_disconnect(c); + return; + } + + if (!c->handle(c)) + kore_connection_disconnect(c); +} + int kore_connection_handle(struct connection *c) { @@ -313,12 +329,12 @@ kore_connection_handle(struct connection *c) /* FALLTHROUGH */ #endif /* !KORE_NO_TLS */ case CONN_STATE_ESTABLISHED: - if (c->flags & CONN_READ_POSSIBLE) { + if (c->evt.flags & KORE_EVENT_READ) { if (!net_recv_flush(c)) return (KORE_RESULT_ERROR); } - if (c->flags & CONN_WRITE_POSSIBLE) { + if (c->evt.flags & KORE_EVENT_WRITE) { if (!net_send_flush(c)) return (KORE_RESULT_ERROR); } diff --git a/src/domain.c b/src/domain.c @@ -698,7 +698,7 @@ keymgr_await_data(void) if (!(pfd[0].revents & POLLIN)) break; - worker->msg[1]->flags |= CONN_READ_POSSIBLE; + worker->msg[1]->evt.flags |= KORE_EVENT_READ; if (!net_recv_flush(worker->msg[1])) break; diff --git a/src/kore.c b/src/kore.c @@ -388,7 +388,9 @@ kore_listener_alloc(int family, const char *ccb) l->fd = -1; l->family = family; - l->type = KORE_TYPE_LISTENER; + + l->evt.type = KORE_TYPE_LISTENER; + l->evt.handle = kore_listener_accept; if ((l->fd = socket(family, SOCK_STREAM, 0)) == -1) { kore_listener_free(l); @@ -431,6 +433,37 @@ kore_listener_free(struct listener *l) kore_free(l); } +void +kore_listener_accept(void *arg, int error) +{ + struct connection *c; + struct listener *l = arg; + u_int32_t accepted; + + if (error) + fatal("error on listening socket"); + + if (!(l->evt.flags & KORE_EVENT_READ)) + return; + + accepted = 0; + + while (worker_active_connections < worker_max_connections) { + if (worker_accept_threshold != 0 && + accepted >= worker_accept_threshold) + break; + + if (!kore_connection_accept(l, &c)) + break; + + if (c == NULL) + break; + + accepted++; + kore_platform_event_all(c->fd, c); + } +} + int kore_sockopt(int fd, int what, int opt) { diff --git a/src/linux.c b/src/linux.c @@ -96,9 +96,7 @@ int kore_platform_event_wait(u_int64_t timer) { u_int32_t r; - struct connection *c; - struct listener *l; - u_int8_t type; + struct kore_event *evt; int n, i; n = epoll_wait(efd, events, event_count, timer); @@ -117,78 +115,19 @@ kore_platform_event_wait(u_int64_t timer) if (events[i].data.ptr == NULL) fatal("events[%d].data.ptr == NULL", i); - type = *(u_int8_t *)events[i].data.ptr; + r = 0; + evt = (struct kore_event *)events[i].data.ptr; - if (events[i].events & EPOLLERR || - events[i].events & EPOLLHUP) { - switch (type) { - case KORE_TYPE_LISTENER: - fatal("failed on listener socket"); - /* NOTREACHED */ -#if defined(KORE_USE_PGSQL) - case KORE_TYPE_PGSQL_CONN: - kore_pgsql_handle(events[i].data.ptr, 1); - break; -#endif -#if defined(KORE_USE_TASKS) - case KORE_TYPE_TASK: - kore_task_handle(events[i].data.ptr, 1); - break; -#endif - default: - c = (struct connection *)events[i].data.ptr; - kore_connection_disconnect(c); - break; - } + if (events[i].events & EPOLLIN) + evt->flags |= KORE_EVENT_READ; - continue; - } + if (events[i].events & EPOLLOUT) + evt->flags |= KORE_EVENT_WRITE; - switch (type) { - case KORE_TYPE_LISTENER: - l = (struct listener *)events[i].data.ptr; - - while (worker_active_connections < - worker_max_connections) { - if (worker_accept_threshold != 0 && - r >= worker_accept_threshold) - break; - - if (!kore_connection_accept(l, &c)) - break; - - if (c == NULL) - break; - - r++; - kore_platform_event_all(c->fd, c); - } - break; - case KORE_TYPE_CONNECTION: - c = (struct connection *)events[i].data.ptr; - if (events[i].events & EPOLLIN && - !(c->flags & CONN_READ_BLOCK)) - c->flags |= CONN_READ_POSSIBLE; - if (events[i].events & EPOLLOUT && - !(c->flags & CONN_WRITE_BLOCK)) - c->flags |= CONN_WRITE_POSSIBLE; - - if (c->handle != NULL && !c->handle(c)) - kore_connection_disconnect(c); - break; -#if defined(KORE_USE_PGSQL) - case KORE_TYPE_PGSQL_CONN: - kore_pgsql_handle(events[i].data.ptr, 0); - break; -#endif -#if defined(KORE_USE_TASKS) - case KORE_TYPE_TASK: - kore_task_handle(events[i].data.ptr, 0); - break; -#endif - default: - fatal("wrong type in event %d", type); - } + if (events[i].events & EPOLLERR|| events[i].events & EPOLLHUP) + r = 1; + + evt->handle(events[i].data.ptr, r); } return (r); @@ -288,7 +227,7 @@ resend: sent = sendfile(c->fd, nb->file_ref->fd, &nb->fd_off, len); if (sent == -1) { if (errno == EAGAIN) { - c->flags &= ~CONN_WRITE_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_WRITE; return (KORE_RESULT_OK); } diff --git a/src/msg.c b/src/msg.c @@ -107,7 +107,7 @@ kore_msg_worker_init(void) worker->msg[1]->state = CONN_STATE_ESTABLISHED; worker->msg[1]->disconnect = msg_disconnected_parent; worker->msg[1]->handle = kore_connection_handle; - worker->msg[1]->flags = CONN_WRITE_POSSIBLE; + worker->msg[1]->evt.flags = KORE_EVENT_WRITE; TAILQ_INSERT_TAIL(&connections, worker->msg[1], list); kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]); diff --git a/src/net.c b/src/net.c @@ -234,7 +234,7 @@ net_send(struct connection *c) if (!c->write(c, len, &r)) return (KORE_RESULT_ERROR); - if (!(c->flags & CONN_WRITE_POSSIBLE)) + if (!(c->evt.flags & KORE_EVENT_WRITE)) return (KORE_RESULT_OK); c->snb->s_off += r; @@ -256,7 +256,7 @@ net_send_flush(struct connection *c) kore_debug("net_send_flush(%p)", c); while (!TAILQ_EMPTY(&(c->send_queue)) && - (c->flags & CONN_WRITE_POSSIBLE)) { + (c->evt.flags & KORE_EVENT_WRITE)) { if (!net_send(c)) return (KORE_RESULT_ERROR); } @@ -278,13 +278,13 @@ net_recv_flush(struct connection *c) if (c->rnb == NULL) return (KORE_RESULT_OK); - while (c->flags & CONN_READ_POSSIBLE) { + while (c->evt.flags & KORE_EVENT_READ) { if (c->rnb->buf == NULL) return (KORE_RESULT_OK); if (!c->read(c, &r)) return (KORE_RESULT_ERROR); - if (!(c->flags & CONN_READ_POSSIBLE)) + if (!(c->evt.flags & KORE_EVENT_READ)) break; c->rnb->s_off += r; @@ -345,8 +345,8 @@ net_write_tls(struct connection *c, size_t len, size_t *written) switch (r) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: + c->evt.flags &= ~KORE_EVENT_WRITE; c->snb->flags |= NETBUF_MUST_RESEND; - c->flags &= ~CONN_WRITE_POSSIBLE; return (KORE_RESULT_OK); case SSL_ERROR_SYSCALL: switch (errno) { @@ -354,8 +354,8 @@ net_write_tls(struct connection *c, size_t len, size_t *written) *written = 0; return (KORE_RESULT_OK); case EAGAIN: + c->evt.flags &= ~KORE_EVENT_WRITE; c->snb->flags |= NETBUF_MUST_RESEND; - c->flags &= ~CONN_WRITE_POSSIBLE; return (KORE_RESULT_OK); default: break; @@ -389,7 +389,7 @@ net_read_tls(struct connection *c, size_t *bytes) switch (r) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: - c->flags &= ~CONN_READ_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_READ; return (KORE_RESULT_OK); case SSL_ERROR_SYSCALL: switch (errno) { @@ -397,8 +397,8 @@ net_read_tls(struct connection *c, size_t *bytes) *bytes = 0; return (KORE_RESULT_OK); case EAGAIN: + c->evt.flags &= ~KORE_EVENT_READ; c->snb->flags |= NETBUF_MUST_RESEND; - c->flags &= ~CONN_WRITE_POSSIBLE; return (KORE_RESULT_OK); default: break; @@ -428,7 +428,7 @@ net_write(struct connection *c, size_t len, size_t *written) *written = 0; return (KORE_RESULT_OK); case EAGAIN: - c->flags &= ~CONN_WRITE_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_WRITE; return (KORE_RESULT_OK); default: kore_debug("write: %s", errno_s); @@ -454,7 +454,7 @@ net_read(struct connection *c, size_t *bytes) *bytes = 0; return (KORE_RESULT_OK); case EAGAIN: - c->flags &= ~CONN_READ_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_READ; return (KORE_RESULT_OK); default: kore_debug("read(): %s", errno_s); @@ -464,7 +464,7 @@ net_read(struct connection *c, size_t *bytes) if (r == 0) { kore_connection_disconnect(c); - c->flags &= ~CONN_READ_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_READ; return (KORE_RESULT_OK); } diff --git a/src/pgsql.c b/src/pgsql.c @@ -308,6 +308,9 @@ kore_pgsql_handle(void *c, int err) return; } + if (!(conn->evt.flags & KORE_EVENT_READ)) + fatal("%s: read event not set", __func__); + pgsql = conn->job->pgsql; if (!PQconsumeInput(conn->db)) { @@ -597,13 +600,15 @@ pgsql_conn_create(struct kore_pgsql *pgsql, struct pgsql_db *db) db->conn_count++; - conn = kore_malloc(sizeof(*conn)); + conn = kore_calloc(1, sizeof(*conn)); conn->job = NULL; conn->flags = PGSQL_CONN_FREE; - conn->type = KORE_TYPE_PGSQL_CONN; conn->name = kore_strdup(db->name); TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list); + conn->evt.type = KORE_TYPE_PGSQL_CONN; + conn->evt.handle = kore_pgsql_handle; + conn->db = PQconnectdb(db->conn_string); if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) { pgsql_set_error(pgsql, PQerrorMessage(conn->db)); diff --git a/src/tasks.c b/src/tasks.c @@ -59,8 +59,10 @@ kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *)) #if !defined(KORE_NO_HTTP) t->req = NULL; #endif + t->evt.type = KORE_TYPE_TASK; + t->evt.handle = kore_task_handle; + t->entry = entry; - t->type = KORE_TYPE_TASK; t->state = KORE_TASK_STATE_CREATED; pthread_rwlock_init(&(t->lock), NULL); @@ -201,8 +203,10 @@ kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len) } void -kore_task_handle(struct kore_task *t, int finished) +kore_task_handle(void *arg, int finished) { + struct kore_task *t = arg; + kore_debug("kore_task_handle: %p, %d", t, finished); #if !defined(KORE_NO_HTTP) diff --git a/src/websocket.c b/src/websocket.c @@ -344,7 +344,7 @@ websocket_recv_frame(struct netbuf *nb) } break; case WEBSOCKET_OP_CLOSE: - c->flags &= ~CONN_READ_POSSIBLE; + c->evt.flags &= ~KORE_EVENT_READ; if (!(c->flags & CONN_WS_CLOSE_SENT)) { c->flags |= CONN_WS_CLOSE_SENT; kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0); @@ -372,8 +372,8 @@ websocket_disconnect(struct connection *c) kore_runtime_wsdisconnect(c->ws_disconnect, c); if (!(c->flags & CONN_WS_CLOSE_SENT)) { - c->flags &= ~CONN_READ_POSSIBLE; c->flags |= CONN_WS_CLOSE_SENT; + c->evt.flags &= ~KORE_EVENT_READ; kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0); } }