kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 29fa49ba83143acf448fd36d905c3f2a704d13aa
parent f54e7ace83e7b3a51012bd74285245f0a624b65d
Author: Joris Vink <joris@coders.se>
Date:   Mon, 15 Jul 2013 10:13:36 +0200

Add fixed size memory pools and use them throughout Kore.

Diffstat:
Makefile | 4++--
includes/kore.h | 35+++++++++++++++++++++++++++++++++++
src/connection.c | 21+++++++++++++++------
src/http.c | 21+++++++++++++++------
src/kore.c | 6+++++-
src/linux.c | 4+++-
src/mem.c | 2+-
src/net.c | 22++++++++++++----------
src/pool.c | 110+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/spdy.c | 6++----
src/worker.c | 8++++++--
11 files changed, 206 insertions(+), 33 deletions(-)

diff --git a/Makefile b/Makefile @@ -4,8 +4,8 @@ CC=gcc BIN=kore S_SRC+= src/kore.c src/accesslog.c src/buf.c src/config.c src/connection.c \ - src/domain.c src/http.c src/mem.c src/module.c src/net.c src/spdy.c \ - src/utils.c src/worker.c src/zlib_dict.c + src/domain.c src/http.c src/mem.c src/module.c src/net.c src/pool.c \ + src/spdy.c src/utils.c src/worker.c src/zlib_dict.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes diff --git a/includes/kore.h b/includes/kore.h @@ -50,9 +50,13 @@ #define KORE_PIDFILE_DEFAULT "/var/run/kore.pid" #define KORE_DEFAULT_CIPHER_LIST "HIGH:!aNULL:!MD5;" +#if defined(KORE_DEBUG) #define kore_debug(fmt, ...) \ if (kore_debug) \ kore_debug_internal(__FILE__, __LINE__, fmt, ##__VA_ARGS__) +#else +#define kore_debug(fmt, ...) +#endif #define NETBUF_RECV 0 #define NETBUF_SEND 1 @@ -176,6 +180,29 @@ struct buf_vec { u_int32_t length; }; +struct kore_pool_region { + void *start; + + LIST_ENTRY(kore_pool_region) list; +} __attribute__((__packed__)); + +struct kore_pool_entry { + u_int8_t state; + struct kore_pool_region *region; + LIST_ENTRY(kore_pool_entry) list; +} __attribute__((__packed__)); + +struct kore_pool { + u_int32_t elen; + u_int32_t slen; + u_int32_t elms; + u_int32_t inuse; + char *name; + + LIST_HEAD(, kore_pool_region) regions; + LIST_HEAD(, kore_pool_entry) freelist; +} __attribute__((__packed__)); + extern pid_t kore_pid; extern int kore_debug; extern int server_port; @@ -200,6 +227,7 @@ extern struct kore_worker *worker; extern struct kore_domain_h domains; extern struct kore_domain *primary_dom; extern struct passwd *pw; +extern struct kore_pool nb_pool; void kore_signal(int); void kore_worker_wait(int); @@ -227,6 +255,7 @@ void kore_worker_entry(struct kore_worker *); int kore_ssl_sni_cb(SSL *, int *, void *); int kore_ssl_npn_cb(SSL *, const u_char **, unsigned int *, void *); +void kore_connection_init(void); int kore_connection_nonblock(int); int kore_connection_handle(struct connection *); void kore_connection_remove(struct connection *); @@ -246,6 +275,11 @@ void *kore_realloc(void *, size_t); void kore_mem_free(void *); void kore_mem_init(void); +void *kore_pool_get(struct kore_pool *); +void kore_pool_put(struct kore_pool *, void *); +void kore_pool_init(struct kore_pool *, char *, + u_int32_t, u_int32_t); + time_t kore_date_to_time(char *); char *kore_time_to_date(time_t); char *kore_strdup(const char *); @@ -275,6 +309,7 @@ u_int16_t net_read16(u_int8_t *); u_int32_t net_read32(u_int8_t *); void net_write16(u_int8_t *, u_int16_t); void net_write32(u_int8_t *, u_int32_t); +void net_init(void); int net_recv(struct connection *); int net_send(struct connection *); int net_send_flush(struct connection *); diff --git a/src/connection.c b/src/connection.c @@ -22,6 +22,15 @@ #include "kore.h" #include "http.h" +struct kore_pool connection_pool; + +void +kore_connection_init(void) +{ + kore_pool_init(&connection_pool, "connection_pool", + sizeof(struct connection), worker_max_connections); +} + int kore_connection_accept(struct listener *l, struct connection **out) { @@ -33,16 +42,16 @@ kore_connection_accept(struct listener *l, struct connection **out) *out = NULL; len = sizeof(struct sockaddr_in); - c = kore_malloc(sizeof(*c)); + c = kore_pool_get(&connection_pool); if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { - kore_mem_free(c); + kore_pool_put(&connection_pool, c); kore_debug("accept(): %s", errno_s); return (KORE_RESULT_ERROR); } if (!kore_connection_nonblock(c->fd)) { close(c->fd); - kore_mem_free(c); + kore_pool_put(&connection_pool, c); return (KORE_RESULT_ERROR); } @@ -191,14 +200,14 @@ kore_connection_remove(struct connection *c) TAILQ_REMOVE(&(c->send_queue), nb, list); if (nb->buf != NULL) kore_mem_free(nb->buf); - kore_mem_free(nb); + kore_pool_put(&nb_pool, nb); } for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); TAILQ_REMOVE(&(c->recv_queue), nb, list); kore_mem_free(nb->buf); - kore_mem_free(nb); + kore_pool_put(&nb_pool, nb); } for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { @@ -215,7 +224,7 @@ kore_connection_remove(struct connection *c) } kore_worker_connection_remove(c); - kore_mem_free(c); + kore_pool_put(&connection_pool, c); } void diff --git a/src/http.c b/src/http.c @@ -24,6 +24,8 @@ static int http_post_data_recv(struct netbuf *); static int http_send_done(struct netbuf *); static TAILQ_HEAD(, http_request) http_requests; +static struct kore_pool http_request_pool; +static struct kore_pool http_header_pool; int http_request_count; @@ -32,6 +34,12 @@ http_init(void) { http_request_count = 0; TAILQ_INIT(&http_requests); + + kore_pool_init(&http_request_pool, "http_request_pool", + sizeof(struct http_request), worker_max_connections); + kore_pool_init(&http_header_pool, "http_header_pool", + sizeof(struct http_header), + worker_max_connections * HTTP_REQ_HEADER_MAX); } int @@ -48,7 +56,7 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, if (strlen(path) >= HTTP_URI_LEN - 1) return (KORE_RESULT_ERROR); - req = kore_malloc(sizeof(*req)); + req = kore_pool_get(&http_request_pool); req->end = 0; req->start = 0; req->flags = 0; @@ -152,7 +160,7 @@ http_response_header_add(struct http_request *req, char *header, char *value) kore_debug("http_response_header_add(%p, %s, %s)", req, header, value); - hdr = kore_malloc(sizeof(*hdr)); + hdr = kore_pool_get(&http_header_pool); hdr->header = kore_strdup(header); hdr->value = kore_strdup(value); TAILQ_INSERT_TAIL(&(req->resp_headers), hdr, list); @@ -170,7 +178,7 @@ http_request_free(struct http_request *req) TAILQ_REMOVE(&(req->resp_headers), hdr, list); kore_mem_free(hdr->header); kore_mem_free(hdr->value); - kore_mem_free(hdr); + kore_pool_put(&http_header_pool, hdr); } for (hdr = TAILQ_FIRST(&(req->req_headers)); hdr != NULL; hdr = next) { @@ -179,7 +187,7 @@ http_request_free(struct http_request *req) TAILQ_REMOVE(&(req->req_headers), hdr, list); kore_mem_free(hdr->header); kore_mem_free(hdr->value); - kore_mem_free(hdr); + kore_pool_put(&http_header_pool, hdr); } for (q = TAILQ_FIRST(&(req->arguments)); q != NULL; q = qnext) { @@ -194,7 +202,8 @@ http_request_free(struct http_request *req) if (req->agent != NULL) kore_mem_free(req->agent); - kore_mem_free(req); + + kore_pool_put(&http_request_pool, req); } int @@ -387,7 +396,7 @@ http_header_recv(struct netbuf *nb) *(p++) = '\0'; if (*p == ' ') p++; - hdr = kore_malloc(sizeof(*hdr)); + hdr = kore_pool_get(&http_header_pool); hdr->header = kore_strdup(headers[i]); hdr->value = kore_strdup(p); TAILQ_INSERT_TAIL(&(req->req_headers), hdr, list); diff --git a/src/kore.c b/src/kore.c @@ -63,7 +63,11 @@ main(int argc, char *argv[]) config_file = optarg; break; case 'd': +#if defined(KORE_DEBUG) kore_debug = 1; +#else + printf("kore not compiled with debug support\n"); +#endif break; default: usage(); @@ -75,12 +79,12 @@ main(int argc, char *argv[]) kore_pid = getpid(); + kore_log_init(); kore_mem_init(); kore_domain_init(); kore_server_sslstart(); kore_parse_config(); - kore_log_init(); kore_platform_init(); kore_accesslog_init(); diff --git a/src/linux.c b/src/linux.c @@ -93,7 +93,9 @@ kore_platform_event_wait(void) } if (*fd == server.fd) { - while (worker->accepted < worker->accept_treshold) { + while ((worker->accepted < worker->accept_treshold) && + (worker_active_connections < + worker_max_connections)) { kore_connection_accept(&server, &c); if (c == NULL) break; diff --git a/src/mem.c b/src/mem.c @@ -27,7 +27,7 @@ struct meminfo { u_int16_t magic; TAILQ_ENTRY(meminfo) list; -}; +} __attribute__((__packed__)); u_int32_t meminuse; TAILQ_HEAD(, meminfo) memused; diff --git a/src/net.c b/src/net.c @@ -16,15 +16,21 @@ #include "kore.h" +struct kore_pool nb_pool; + +void +net_init(void) +{ + kore_pool_init(&nb_pool, "nb_pool", sizeof(struct netbuf), 1000); +} + void net_send_queue(struct connection *c, u_int8_t *data, size_t len, int flags, struct netbuf **out, int (*cb)(struct netbuf *)) { struct netbuf *nb; - //kore_debug("net_send_queue(%p, %p, %d, %p)", c, data, len, cb); - - nb = kore_malloc(sizeof(*nb)); + nb = kore_pool_get(&nb_pool); nb->cb = cb; nb->len = len; nb->owner = c; @@ -50,9 +56,7 @@ net_recv_queue(struct connection *c, size_t len, int flags, { struct netbuf *nb; - //kore_debug("net_recv_queue(%p, %d, %p)", c, len, cb); - - nb = kore_malloc(sizeof(*nb)); + nb = kore_pool_get(&nb_pool); nb->cb = cb; nb->len = len; nb->owner = c; @@ -70,8 +74,6 @@ int net_recv_expand(struct connection *c, struct netbuf *nb, size_t len, int (*cb)(struct netbuf *)) { - //kore_debug("net_recv_expand(%p, %p, %d, %p)", c, nb, len, cb); - if (nb->type != NETBUF_RECV) { kore_debug("net_recv_expand(): wrong netbuf type"); return (KORE_RESULT_ERROR); @@ -131,7 +133,7 @@ net_send(struct connection *c) if (nb->offset == nb->len) { if (nb->buf != NULL) kore_mem_free(nb->buf); - kore_mem_free(nb); + kore_pool_put(&nb_pool, nb); } if (r != KORE_RESULT_OK) @@ -202,7 +204,7 @@ handle: TAILQ_REMOVE(&(c->recv_queue), nb, list); kore_mem_free(nb->buf); - kore_mem_free(nb); + kore_pool_put(&nb_pool, nb); } if (r != KORE_RESULT_OK) diff --git a/src/pool.c b/src/pool.c @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/queue.h> + +#include "kore.h" + +#define POOL_ELEMENT_BUSY 0 +#define POOL_ELEMENT_FREE 1 + +static void pool_region_create(struct kore_pool *, u_int32_t); + +void +kore_pool_init(struct kore_pool *pool, char *name, u_int32_t len, u_int32_t elm) +{ + kore_debug("kore_pool_init(%p, %s, %d, %d)", pool, name, len, elm); + + pool->elms = 0; + pool->inuse = 0; + pool->elen = len; + pool->name = kore_strdup(name); + pool->slen = pool->elen + sizeof(struct kore_pool_entry); + + LIST_INIT(&(pool->regions)); + LIST_INIT(&(pool->freelist)); + + pool_region_create(pool, elm); +} + +void * +kore_pool_get(struct kore_pool *pool) +{ + u_int8_t *ptr; + struct kore_pool_entry *entry; + + if (LIST_EMPTY(&(pool->freelist))) { + kore_log(LOG_NOTICE, "pool %s is exhausted (%d/%d)", + pool->name, pool->inuse, pool->elms); + + pool_region_create(pool, pool->elms); + } + + entry = LIST_FIRST(&(pool->freelist)); + LIST_REMOVE(entry, list); + + entry->state = POOL_ELEMENT_BUSY; + ptr = (u_int8_t *)entry + sizeof(struct kore_pool_entry); + + pool->inuse++; + + return (ptr); +} + +void +kore_pool_put(struct kore_pool *pool, void *ptr) +{ + struct kore_pool_entry *entry; + + entry = (struct kore_pool_entry *) + ((u_int8_t *)ptr - sizeof(struct kore_pool_entry)); + + if (entry->state != POOL_ELEMENT_BUSY) + fatal("%s: element %p was not busy", pool->name, ptr); + + entry->state = POOL_ELEMENT_FREE; + LIST_INSERT_HEAD(&(pool->freelist), entry, list); + + pool->inuse--; +} + +static void +pool_region_create(struct kore_pool *pool, u_int32_t elms) +{ + u_int32_t i; + u_int8_t *p; + struct kore_pool_region *reg; + struct kore_pool_entry *entry; + + kore_debug("pool_region_create(%p, %d)", pool, elms); + + reg = kore_malloc(sizeof(struct kore_pool_region)); + LIST_INSERT_HEAD(&(pool->regions), reg, list); + + reg->start = kore_malloc(elms * pool->slen); + p = (u_int8_t *)reg->start; + + for (i = 0; i < elms; i++) { + entry = (struct kore_pool_entry *)p; + entry->region = reg; + entry->state = POOL_ELEMENT_FREE; + LIST_INSERT_HEAD(&(pool->freelist), entry, list); + + p = p + pool->slen; + } + + pool->elms += elms; +} diff --git a/src/spdy.c b/src/spdy.c @@ -461,7 +461,7 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) static int spdy_ctrl_frame_settings(struct netbuf *nb) { - u_int8_t *buf, flags; + u_int8_t *buf; u_int32_t ecount, i, id, val, length; struct connection *c = (struct connection *)nb->owner; @@ -478,7 +478,6 @@ spdy_ctrl_frame_settings(struct netbuf *nb) buf = nb->buf + SPDY_FRAME_SIZE + 4; for (i = 0; i < ecount; i++) { - flags = *(u_int8_t *)buf; id = net_read32(buf) & 0xffffff; val = net_read32(buf + 4); @@ -487,8 +486,7 @@ spdy_ctrl_frame_settings(struct netbuf *nb) c->wsize_initial = val; break; default: - kore_debug("no handling for setting %d:%d (%d)", - id, val, flags); + kore_debug("no handling for setting %d:%d", id, val); break; } diff --git a/src/worker.c b/src/worker.c @@ -192,7 +192,9 @@ kore_worker_entry(struct kore_worker *kw) signal(SIGQUIT, kore_signal); signal(SIGPIPE, SIG_IGN); + net_init(); http_init(); + kore_connection_init(); TAILQ_INIT(&disconnected); TAILQ_INIT(&worker_clients); @@ -213,12 +215,14 @@ kore_worker_entry(struct kore_worker *kw) sig_recv = 0; } - if (!worker->has_lock) + if (!worker->has_lock && + (worker_active_connections < worker_max_connections)) kore_worker_acceptlock_obtain(); kore_platform_event_wait(); - if (worker->accepted >= worker->accept_treshold && + if (((worker->accepted >= worker->accept_treshold) || + (worker_active_connections < worker_max_connections)) && worker->has_lock) { worker->accepted = 0; kore_worker_acceptlock_release();