kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit b4a0330a96ec1790590e1d2a50c786d7da652509
parent 1582528ba5d8e077c53b3cc5b43b7c56c9a51abe
Author: Joris Vink <joris@coders.se>
Date:   Thu, 27 Jun 2013 08:43:07 +0200

- Better spread load between all worker processes.
- Introduce own memory management system on top of malloc to keep track
  of all our allocations and free's. Later we should introduce a pooling
  mechanism for fixed size allocations (http_request comes to mind).
- Introduce ssl_cipher in configuration.

Memory usage is kind of high right now, but it seems its OpenSSL
doing it rather then Kore.

Diffstat:
Makefile | 2+-
includes/http.h | 4++--
includes/kore.h | 28+++++++++++++++++++---------
src/buf.c | 2+-
src/config.c | 17+++++++++++++++++
src/connection.c | 21+++++++++++----------
src/domain.c | 12+++++++-----
src/http.c | 57++++++++++++++++++++++++++++-----------------------------
src/kore.c | 12+++++++++---
src/linux.c | 22+++++++++++-----------
src/mem.c | 161+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/module.c | 6+++---
src/net.c | 11++++++-----
src/spdy.c | 50+++++++++++++++++++++++++-------------------------
src/utils.c | 48+-----------------------------------------------
src/worker.c | 79+++++++++++++++++++++++++++++++++++++++----------------------------------------
16 files changed, 341 insertions(+), 191 deletions(-)

diff --git a/Makefile b/Makefile @@ -5,7 +5,7 @@ BIN=kore S_SRC+= src/kore.c src/buf.c src/config.c src/net.c src/spdy.c src/http.c \ src/accesslog.c src/domain.c src/module.c src/utils.c \ - src/worker.c src/connection.c src/zlib_dict.c + src/worker.c src/connection.c src/mem.c src/zlib_dict.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes diff --git a/includes/http.h b/includes/http.h @@ -49,8 +49,8 @@ struct http_request { int status; u_int64_t start; u_int64_t end; - char *host; - char *path; + char host[KORE_DOMAINNAME_LEN]; + char path[HTTP_URI_LEN]; char *agent; struct connection *owner; struct spdy_stream *stream; diff --git a/includes/kore.h b/includes/kore.h @@ -29,8 +29,9 @@ #define errno_s strerror(errno) #define ssl_errno_s ERR_error_string(ERR_get_error(), NULL) -#define KORE_DOMAINNAME_LEN 254 -#define KORE_PIDFILE_DEFAULT "/var/run/kore.pid" +#define KORE_DOMAINNAME_LEN 254 +#define KORE_PIDFILE_DEFAULT "/var/run/kore.pid" +#define KORE_DEFAULT_CIPHER_LIST "HIGH:!aNULL:!MD5;" #define kore_debug(fmt, ...) \ if (kore_debug) \ @@ -76,12 +77,12 @@ struct listener { struct connection { int fd; - int state; - int proto; + u_int8_t state; + u_int8_t proto; struct sockaddr_in sin; void *owner; SSL *ssl; - int flags; + u_int8_t flags; u_int8_t inflate_started; z_stream z_inflate; @@ -111,11 +112,13 @@ struct kore_module_handle { }; struct kore_worker { - u_int16_t id; - u_int16_t cpu; + u_int8_t id; + u_int8_t cpu; u_int16_t load; pid_t pid; - TAILQ_ENTRY(kore_worker) list; + u_int8_t has_lock; + u_int16_t accepted; + u_int16_t accept_treshold; }; struct kore_domain { @@ -154,7 +157,9 @@ extern char *kore_module_onload; extern char *kore_pidfile; extern char *config_file; extern char kore_version_string[]; +extern char *kore_ssl_cipher_list; +extern u_int32_t meminuse; extern u_int16_t cpu_count; extern u_int8_t worker_count; extern u_int32_t worker_max_connections; @@ -176,7 +181,7 @@ void kore_worker_connection_move(struct connection *); void kore_worker_connection_remove(struct connection *); void kore_platform_event_init(void); -int kore_platform_event_wait(void); +void kore_platform_event_wait(void); void kore_platform_proctitle(char *); void kore_platform_enable_accept(void); void kore_platform_disable_accept(void); @@ -200,10 +205,15 @@ int kore_connection_accept(struct listener *, struct connection **); u_int64_t kore_time_ms(void); void kore_log_init(void); + void *kore_malloc(size_t); void kore_parse_config(void); void *kore_calloc(size_t, size_t); void *kore_realloc(void *, size_t); +void kore_mem_free(void *); +void kore_mem_init(void); +void kore_mem_dump(void); + time_t kore_date_to_time(char *); char *kore_time_to_date(time_t); char *kore_strdup(const char *); diff --git a/src/buf.c b/src/buf.c @@ -105,7 +105,7 @@ kore_buf_release(struct kore_buf *buf, u_int32_t *len) p = buf->data; *len = buf->offset; - free(buf); + kore_mem_free(buf); return (p); } diff --git a/src/config.c b/src/config.c @@ -52,6 +52,7 @@ static int configure_accesslog(char **); static int configure_certfile(char **); static int configure_certkey(char **); static int configure_max_connections(char **); +static int configure_ssl_cipher(char **); static void domain_sslstart(void); static struct { @@ -63,6 +64,7 @@ static struct { { "onload", configure_onload }, { "static", configure_handler }, { "dynamic", configure_handler }, + { "ssl_cipher", configure_ssl_cipher }, { "domain", configure_domain }, { "chroot", configure_chroot }, { "runas", configure_runas }, @@ -186,6 +188,21 @@ configure_onload(char **argv) } static int +configure_ssl_cipher(char **argv) +{ + if (argv[1] == NULL) + return (KORE_RESULT_ERROR); + + if (strcmp(kore_ssl_cipher_list, KORE_DEFAULT_CIPHER_LIST)) { + kore_debug("duplicate ssl_cipher directive specified"); + return (KORE_RESULT_ERROR); + } + + kore_ssl_cipher_list = kore_strdup(argv[1]); + return (KORE_RESULT_OK); +} + +static int configure_domain(char **argv) { if (argv[2] == NULL) diff --git a/src/connection.c b/src/connection.c @@ -57,14 +57,14 @@ kore_connection_accept(struct listener *l, struct connection **out) len = sizeof(struct sockaddr_in); c = (struct connection *)kore_malloc(sizeof(*c)); if ((c->fd = accept(l->fd, (struct sockaddr *)&(c->sin), &len)) == -1) { - free(c); + kore_mem_free(c); kore_debug("accept(): %s", errno_s); return (KORE_RESULT_ERROR); } if (!kore_connection_nonblock(c->fd)) { close(c->fd); - free(c); + kore_mem_free(c); return (KORE_RESULT_ERROR); } @@ -194,15 +194,16 @@ kore_connection_remove(struct connection *c) for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); TAILQ_REMOVE(&(c->send_queue), nb, list); - free(nb->buf); - free(nb); + if (nb->buf != NULL) + kore_mem_free(nb->buf); + kore_mem_free(nb); } for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); TAILQ_REMOVE(&(c->recv_queue), nb, list); - free(nb->buf); - free(nb); + kore_mem_free(nb->buf); + kore_mem_free(nb); } for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) { @@ -211,15 +212,15 @@ kore_connection_remove(struct connection *c) if (s->hblock != NULL) { if (s->hblock->header_block != NULL) - free(s->hblock->header_block); - free(s->hblock); + kore_mem_free(s->hblock->header_block); + kore_mem_free(s->hblock); } - free(s); + kore_mem_free(s); } kore_worker_connection_remove(c); - free(c); + kore_mem_free(c); } int diff --git a/src/domain.c b/src/domain.c @@ -58,12 +58,13 @@ kore_domain_new(char *domain) if (kore_domain_lookup(domain) != NULL) return (KORE_RESULT_ERROR); - kore_debug("kore_domain_new(%s, %s, %s)", domain); + kore_debug("kore_domain_new(%s)", domain); dom = (struct kore_domain *)kore_malloc(sizeof(*dom)); dom->accesslog = -1; dom->certfile = NULL; dom->certkey = NULL; + dom->ssl_ctx = NULL; dom->domain = kore_strdup(domain); TAILQ_INIT(&(dom->handlers)); TAILQ_INSERT_TAIL(&domains, dom, list); @@ -81,7 +82,7 @@ kore_domain_sslstart(struct kore_domain *dom) dom->ssl_ctx = SSL_CTX_new(SSLv23_server_method()); if (dom->ssl_ctx == NULL) - fatal("kore_domain_new(): SSL_ctx_new(): %s", ssl_errno_s); + fatal("kore_domain_sslstart(): SSL_ctx_new(): %s", ssl_errno_s); if (!SSL_CTX_use_certificate_chain_file(dom->ssl_ctx, dom->certfile)) { fatal("SSL_CTX_use_certificate_chain_file(%s): %s", dom->certfile, ssl_errno_s); @@ -96,15 +97,16 @@ kore_domain_sslstart(struct kore_domain *dom) if (!SSL_CTX_check_private_key(dom->ssl_ctx)) fatal("Public/Private key for %s do not match", dom->domain); - SSL_CTX_set_mode(dom->ssl_ctx, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_mode(dom->ssl_ctx, SSL_MODE_RELEASE_BUFFERS); + SSL_CTX_set_cipher_list(dom->ssl_ctx, kore_ssl_cipher_list); SSL_CTX_set_mode(dom->ssl_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); SSL_CTX_set_options(dom->ssl_ctx, SSL_OP_NO_SSLv2); SSL_CTX_set_tlsext_servername_callback(dom->ssl_ctx, kore_ssl_sni_cb); SSL_CTX_set_next_protos_advertised_cb(dom->ssl_ctx, kore_ssl_npn_cb, NULL); - free(dom->certfile); - free(dom->certkey); + kore_mem_free(dom->certfile); + kore_mem_free(dom->certkey); } struct kore_domain * diff --git a/src/http.c b/src/http.c @@ -75,8 +75,9 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, req->status = 0; req->stream = s; req->post_data = NULL; - req->host = kore_strdup(host); - req->path = kore_strdup(path); + kore_strlcpy(req->host, host, sizeof(req->host)); + kore_strlcpy(req->path, path, sizeof(req->path)); + TAILQ_INIT(&(req->resp_headers)); TAILQ_INIT(&(req->req_headers)); TAILQ_INIT(&(req->arguments)); @@ -180,35 +181,33 @@ http_request_free(struct http_request *req) next = TAILQ_NEXT(hdr, list); TAILQ_REMOVE(&(req->resp_headers), hdr, list); - free(hdr->header); - free(hdr->value); - free(hdr); + kore_mem_free(hdr->header); + kore_mem_free(hdr->value); + kore_mem_free(hdr); } for (hdr = TAILQ_FIRST(&(req->req_headers)); hdr != NULL; hdr = next) { next = TAILQ_NEXT(hdr, list); TAILQ_REMOVE(&(req->req_headers), hdr, list); - free(hdr->header); - free(hdr->value); - free(hdr); + kore_mem_free(hdr->header); + kore_mem_free(hdr->value); + kore_mem_free(hdr); } for (q = TAILQ_FIRST(&(req->arguments)); q != NULL; q = qnext) { qnext = TAILQ_NEXT(q, list); TAILQ_REMOVE(&(req->arguments), q, list); - free(q->name); + kore_mem_free(q->name); if (q->value != NULL) - free(q->value); - free(q); + kore_mem_free(q->value); + kore_mem_free(q); } - free(req->path); - free(req->host); if (req->agent != NULL) - free(req->agent); - free(req); + kore_mem_free(req->agent); + kore_mem_free(req); } int @@ -241,7 +240,7 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) spdy_frame_send(req->owner, SPDY_CTRL_FRAME_SYN_REPLY, 0, hlen, req->stream, 0); net_send_queue(req->owner, htext, hlen, 0, NULL, NULL); - free(htext); + kore_mem_free(htext); if (len > 0) { spdy_frame_send(req->owner, SPDY_DATA_FRAME, @@ -267,7 +266,7 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) kore_buf_append(buf, (u_int8_t *)"\r\n", 2); htext = kore_buf_release(buf, &hlen); net_send_queue(req->owner, htext, hlen, 0, NULL, NULL); - free(htext); + kore_mem_free(htext); net_send_queue(req->owner, d, len, 0, NULL, http_send_done); } @@ -332,19 +331,19 @@ http_header_recv(struct netbuf *nb) h = kore_split_string(hbuf, "\r\n", headers, HTTP_REQ_HEADER_MAX); if (h < 2) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } if ((strlen(headers[0]) > 3 && strncasecmp(headers[0], "get", 3)) && (strlen(headers[0]) > 4 && strncasecmp(headers[0], "post", 4))) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } v = kore_split_string(headers[0], " ", request, 4); if (v != 3) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } @@ -356,13 +355,13 @@ http_header_recv(struct netbuf *nb) v = kore_split_string(headers[i], ":", host, 3); if (v != 2) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } if (strlen(host[0]) != 4 || strncasecmp(host[0], "host", 4) || strlen(host[1]) < 4) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } @@ -372,12 +371,12 @@ http_header_recv(struct netbuf *nb) } if (host[0] == NULL) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } if (!http_request_new(c, NULL, host[1], request[0], request[1], &req)) { - free(hbuf); + kore_mem_free(hbuf); return (KORE_RESULT_ERROR); } @@ -404,7 +403,7 @@ http_header_recv(struct netbuf *nb) req->agent = kore_strdup(hdr->value); } - free(hbuf); + kore_mem_free(hbuf); if (req->method == HTTP_METHOD_POST) { if (!http_request_header_get(req, "content-length", &p)) { @@ -415,13 +414,13 @@ http_header_recv(struct netbuf *nb) clen = kore_strtonum(p, 0, UINT_MAX, &v); if (v == KORE_RESULT_ERROR) { - free(p); + kore_mem_free(p); kore_debug("content-length invalid: %s", p); req->flags |= HTTP_REQUEST_DELETE; return (KORE_RESULT_ERROR); } - free(p); + kore_mem_free(p); req->post_data = kore_buf_create(clen); kore_buf_append(req->post_data, end_headers, (nb->offset - len)); @@ -468,7 +467,7 @@ http_populate_arguments(struct http_request *req) count++; } - free(query); + kore_mem_free(query); return (count); } @@ -501,7 +500,7 @@ http_post_data_text(struct http_request *req) text = (char *)kore_malloc(len); kore_strlcpy(text, (char *)data, len); - free(data); + kore_mem_free(data); return (text); } diff --git a/src/kore.c b/src/kore.c @@ -58,6 +58,7 @@ char *runas_user = NULL; char *chroot_path = NULL; char kore_version_string[32]; char *kore_pidfile = KORE_PIDFILE_DEFAULT; +char *kore_ssl_cipher_list = KORE_DEFAULT_CIPHER_LIST; static void usage(void); static void kore_server_start(void); @@ -97,10 +98,12 @@ main(int argc, char *argv[]) argv += optind; kore_pid = getpid(); + + kore_mem_init(); kore_domain_init(); kore_server_sslstart(); - kore_parse_config(); + kore_log_init(); kore_platform_init(); kore_accesslog_init(); @@ -162,6 +165,9 @@ kore_server_sslstart(void) SSL_library_init(); SSL_load_error_strings(); + +CRYPTO_malloc_debug_init(); +CRYPTO_set_mem_debug_options(V_CRYPTO_MDEBUG_ALL); } static void @@ -170,8 +176,8 @@ kore_server_start(void) if (!kore_server_bind(&server, server_ip, server_port)) fatal("cannot bind to %s:%d", server_ip, server_port); - free(server_ip); - free(runas_user); + kore_mem_free(server_ip); + kore_mem_free(runas_user); if (daemon(1, 1) == -1) fatal("cannot daemon(): %s", errno_s); diff --git a/src/linux.c b/src/linux.c @@ -45,9 +45,8 @@ #include "kore.h" #include "http.h" -#define EPOLL_EVENTS 500 - static int efd = -1; +static u_int32_t event_count = 0; static struct epoll_event *events = NULL; void @@ -80,26 +79,26 @@ kore_platform_event_init(void) if ((efd = epoll_create(10000)) == -1) fatal("epoll_create(): %s", errno_s); - events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); + event_count = worker_max_connections + 1; + events = kore_calloc(event_count, sizeof(struct epoll_event)); } -int +void kore_platform_event_wait(void) { struct connection *c; - int n, i, *fd, count; + int n, i, *fd; - n = epoll_wait(efd, events, EPOLL_EVENTS, 100); + n = epoll_wait(efd, events, event_count, 100); if (n == -1) { if (errno == EINTR) - return (0); + return; fatal("epoll_wait(): %s", errno_s); } if (n > 0) kore_debug("main(): %d sockets available", n); - count = 0; for (i = 0; i < n; i++) { fd = (int *)events[i].data.ptr; @@ -114,13 +113,16 @@ kore_platform_event_wait(void) } if (*fd == server.fd) { +#if 0 while (worker_active_connections < worker_max_connections) { +#endif + while (worker->accepted < worker->accept_treshold) { kore_connection_accept(&server, &c); if (c == NULL) break; - count++; + worker->accepted++; kore_platform_event_schedule(c->fd, EPOLLIN | EPOLLOUT | EPOLLET, 0, c); } @@ -135,8 +137,6 @@ kore_platform_event_wait(void) kore_connection_disconnect(c); } } - - return (count); } void diff --git a/src/mem.c b/src/mem.c @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2013 Joris Vink <joris@coders.se> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/queue.h> +#include <sys/time.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <openssl/err.h> +#include <openssl/ssl.h> + +#include <errno.h> +#include <fcntl.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <syslog.h> +#include <regex.h> +#include <zlib.h> + +#include "spdy.h" +#include "kore.h" + +#define KORE_MEM_MAGIC 0xd0d0 +#define KORE_MEMINFO(x) \ + ((struct meminfo *)((u_int8_t *)x - sizeof(struct meminfo))) + +struct meminfo { + u_int32_t id; + u_int32_t len; + u_int32_t clen; + u_int64_t t; + TAILQ_ENTRY(meminfo) list; + u_int16_t magic; + u_int8_t *addr; +}; + +static u_int64_t memid; +u_int32_t meminuse; +TAILQ_HEAD(, meminfo) memused; + +void +kore_mem_init(void) +{ + memid = 0; + meminuse = 0; + TAILQ_INIT(&memused); +} + +void * +kore_malloc(size_t len) +{ + size_t mlen; + struct meminfo *mem; + + mlen = sizeof(struct meminfo) + len; + if ((mem = (struct meminfo *)malloc(mlen)) == NULL) + fatal("kore_malloc(%d): %d", len, errno); + + mem->clen = len; + mem->len = mlen; + mem->id = memid++; + mem->t = kore_time_ms(); + mem->addr = (u_int8_t *)mem + sizeof(struct meminfo); + mem->magic = KORE_MEM_MAGIC; + TAILQ_INSERT_TAIL(&memused, mem, list); + if ((u_int8_t *)mem != mem->addr - sizeof(struct meminfo)) + fatal("kore_malloc(): addr offset is wrong"); + + meminuse += len; + memset(mem->addr, '\0', mem->clen); + + return (mem->addr); +} + +void * +kore_realloc(void *ptr, size_t len) +{ + struct meminfo *mem; + void *nptr; + + mem = KORE_MEMINFO(ptr); + if (mem->magic != KORE_MEM_MAGIC) + fatal("kore_realloc(): magic boundary not found"); + + nptr = kore_malloc(len); + memcpy(nptr, mem->addr, mem->clen); + kore_mem_free(mem); + + mem = (struct meminfo *)nptr - sizeof(*mem); + return (mem->addr); +} + +void * +kore_calloc(size_t memb, size_t len) +{ + return (kore_malloc(memb * len)); +} + +void +kore_mem_free(void *ptr) +{ + struct meminfo *mem; + + mem = KORE_MEMINFO(ptr); + if (mem->magic != KORE_MEM_MAGIC) + fatal("kore_mem_free(): magic boundary not found"); + + //t = kore_time_ms(); + //printf("mem#%d released, %ld ms old\n", mem->id, t - mem->t); + + meminuse -= mem->clen; + TAILQ_REMOVE(&memused, mem, list); + free(mem); +} + +void +kore_mem_dump(void) +{ + printf("wrk#%d: %d bytes in use\n", worker->id, meminuse); + +#if 0 + now = kore_time_ms(); + TAILQ_FOREACH(mem, &memused, list) { + length = now - mem->t; + printf("wrk#%d: mem#%d %d bytes (%ld ms)\n", + worker->id, mem->id, mem->clen, length); + } +#endif +} + +char * +kore_strdup(const char *str) +{ + size_t len; + char *nstr; + + len = strlen(str) + 1; + nstr = (char *)kore_malloc(len); + kore_strlcpy(nstr, str, len); + + return (nstr); +} diff --git a/src/module.c b/src/module.c @@ -141,9 +141,9 @@ kore_module_handler_new(char *path, char *domain, char *func, int type) if (hdlr->type == HANDLER_TYPE_DYNAMIC) { if (regcomp(&(hdlr->rctx), hdlr->path, REG_NOSUB)) { - free(hdlr->func); - free(hdlr->path); - free(hdlr); + kore_mem_free(hdlr->func); + kore_mem_free(hdlr->path); + kore_mem_free(hdlr); kore_debug("regcomp() on %s failed", path); return (KORE_RESULT_ERROR); } diff --git a/src/net.c b/src/net.c @@ -149,8 +149,8 @@ net_send(struct connection *c) if (nb->offset == nb->len) { if (nb->buf != NULL) - free(nb->buf); - free(nb); + kore_mem_free(nb->buf); + kore_mem_free(nb); } if (r != KORE_RESULT_OK) @@ -199,7 +199,8 @@ net_recv(struct connection *c) switch (r) { case SSL_ERROR_WANT_READ: c->flags &= ~CONN_READ_POSSIBLE; - if (nb->flags & NETBUF_CALL_CB_ALWAYS) + if (nb->flags & NETBUF_CALL_CB_ALWAYS && + nb-> offset > 0) goto handle; return (KORE_RESULT_OK); case SSL_ERROR_WANT_WRITE: @@ -220,8 +221,8 @@ handle: TAILQ_REMOVE(&(c->recv_queue), nb, list); if (!(nb->flags & NETBUF_RETAIN)) { - free(nb->buf); - free(nb); + kore_mem_free(nb->buf); + kore_mem_free(nb); } } diff --git a/src/spdy.c b/src/spdy.c @@ -253,13 +253,13 @@ spdy_header_block_release(struct connection *c, net_write32(hblock->header_block, hblock->header_pairs); if (!spdy_zlib_deflate(c, hblock->header_block, hblock->header_offset, &deflated, len)) { - free(hblock->header_block); - free(hblock); + kore_mem_free(hblock->header_block); + kore_mem_free(hblock); return (NULL); } - free(hblock->header_block); - free(hblock); + kore_mem_free(hblock->header_block); + kore_mem_free(hblock); return (deflated); } @@ -366,9 +366,9 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) kore_debug("compressed headers are %d bytes long", ctrl.length - 10); if (!spdy_zlib_inflate(c, src, (ctrl.length - SPDY_SYNFRAME_SIZE), &(s->hblock->header_block), &(s->hblock->header_block_len))) { - free(s->hblock->header_block); - free(s->hblock); - free(s); + kore_mem_free(s->hblock->header_block); + kore_mem_free(s->hblock); + kore_mem_free(s); return (KORE_RESULT_ERROR); } @@ -381,16 +381,16 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) #define GET_HEADER(n, r) \ if (!spdy_stream_get_header(s->hblock, n, r)) { \ - free(s->hblock->header_block); \ - free(s->hblock); \ - free(s); \ + kore_mem_free(s->hblock->header_block); \ + kore_mem_free(s->hblock); \ + kore_mem_free(s); \ kore_debug("no such header: %s", n); \ if (path != NULL) \ - free(path); \ + kore_mem_free(path); \ if (host != NULL) \ - free(host); \ + kore_mem_free(host); \ if (method != NULL) \ - free(method); \ + kore_mem_free(method); \ return (KORE_RESULT_ERROR); \ } @@ -400,18 +400,18 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb) if (!http_request_new(c, s, host, method, path, (struct http_request **)&(s->httpreq))) { - free(path); - free(method); - free(host); - free(s->hblock->header_block); - free(s->hblock); - free(s); + kore_mem_free(path); + kore_mem_free(method); + kore_mem_free(host); + kore_mem_free(s->hblock->header_block); + kore_mem_free(s->hblock); + kore_mem_free(s); return (KORE_RESULT_ERROR); } - free(path); - free(method); - free(host); + kore_mem_free(path); + kore_mem_free(method); + kore_mem_free(host); c->client_stream_id = s->stream_id; TAILQ_INSERT_TAIL(&(c->spdy_streams), s, list); @@ -505,11 +505,11 @@ spdy_stream_close(struct connection *c, struct spdy_stream *s) TAILQ_REMOVE(&(c->spdy_streams), s, list); if (s->hblock != NULL) { if (s->hblock->header_block != NULL) - free(s->hblock->header_block); - free(s->hblock); + kore_mem_free(s->hblock->header_block); + kore_mem_free(s->hblock); } - free(s); + kore_mem_free(s); } static int diff --git a/src/utils.c b/src/utils.c @@ -58,52 +58,6 @@ static struct { { NULL, 0 }, }; -void * -kore_malloc(size_t len) -{ - void *ptr; - - if ((ptr = malloc(len)) == NULL) - fatal("kore_malloc(%d): %d", len, errno); - - memset(ptr, 0, len); - return (ptr); -} - -void * -kore_realloc(void *ptr, size_t len) -{ - void *nptr; - - if ((nptr = realloc(ptr, len)) == NULL) - fatal("kore_realloc(%p, %d): %d", ptr, len, errno); - - return (nptr); -} - -void * -kore_calloc(size_t memb, size_t len) -{ - void *ptr; - - if ((ptr = calloc(memb, len)) == NULL) - fatal("kore_calloc(%d, %d): %d", memb, len, errno); - - memset(ptr, 0, memb * len); - return (ptr); -} - -char * -kore_strdup(const char *str) -{ - char *nstr; - - if ((nstr = strdup(str)) == NULL) - fatal("kore_strdup(): %d", errno); - - return (nstr); -} - void kore_debug_internal(char *file, int line, const char *fmt, ...) { @@ -278,7 +232,7 @@ kore_date_to_time(char *http_date) } out: - free(sdup); + kore_mem_free(sdup); return (t); } diff --git a/src/worker.c b/src/worker.c @@ -47,6 +47,8 @@ #include "kore.h" #include "http.h" +//#define WORKER_DEBUG 1 + #if defined(WORKER_DEBUG) #define worker_debug(fmt, ...) printf(fmt, ##__VA_ARGS__) #else @@ -55,7 +57,7 @@ #define KORE_SHM_KEY 15000 #define WORKER(id) \ - (struct kore_worker *)kore_workers + (sizeof(struct kore_worker) * id) + (struct kore_worker *)(kore_workers + (sizeof(struct kore_worker) * id)) struct wlock { pid_t lock; @@ -75,9 +77,7 @@ static TAILQ_HEAD(, connection) disconnected; static TAILQ_HEAD(, connection) worker_clients; static struct kore_worker *kore_workers; static int shm_accept_key; - -static struct wlock *accept_lock; -static u_int8_t worker_has_acceptlock = 0; +static struct wlock *accept_lock; extern volatile sig_atomic_t sig_recv; struct kore_worker *worker = NULL; @@ -130,7 +130,9 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu) kw->id = id; kw->cpu = cpu; kw->load = 0; + kw->accepted = 0; kw->pid = fork(); + if (kw->pid == -1) fatal("could not spawn worker child: %s", errno_s); @@ -217,7 +219,9 @@ kore_worker_entry(struct kore_worker *kw) kore_platform_event_init(); kore_accesslog_worker_init(); + worker->accept_treshold = worker_max_connections / 10; kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu); + for (;;) { if (sig_recv != 0) { if (sig_recv == SIGHUP) @@ -227,11 +231,19 @@ kore_worker_entry(struct kore_worker *kw) sig_recv = 0; } - if (!worker_has_acceptlock && - worker_active_connections < worker_max_connections) + if (!worker->has_lock) kore_worker_acceptlock_obtain(); - if (kore_platform_event_wait() && worker_has_acceptlock) + + kore_platform_event_wait(); + + if (worker->accepted >= worker->accept_treshold && + worker->has_lock) { + worker->accepted = 0; kore_worker_acceptlock_release(); + } + + printf("%d: %d conn / %d mem\n", worker->id, + worker_active_connections, meminuse); http_process(); @@ -325,6 +337,10 @@ kore_worker_wait(int final) "worker %d (pid: %d) gone, respawning new one", kw->id, kw->pid); kore_worker_spawn(kw->id, kw->cpu); + } else { + kore_log(LOG_NOTICE, + "worker %d (pid: %d) signaled us", + kw->id, kw->pid); } break; @@ -334,17 +350,15 @@ kore_worker_wait(int final) static void kore_worker_acceptlock_obtain(void) { - if (worker_count == 1 && !worker_has_acceptlock) { - worker_has_acceptlock = 1; + if (worker_count == 1 && !worker->has_lock) { + worker->has_lock = 1; kore_platform_enable_accept(); return; } if (worker_trylock()) { - worker_has_acceptlock = 1; + worker->has_lock = 1; kore_platform_enable_accept(); - worker_debug("%d: obtained accept lock (%d/%d)\n", worker->id, - worker_active_connections, worker_max_connections); } } @@ -354,17 +368,15 @@ kore_worker_acceptlock_release(void) if (worker_count == 1) return; - if (worker_has_acceptlock != 1) { + if (worker->has_lock != 1) { kore_log(LOG_NOTICE, "kore_worker_acceptlock_release() != 1"); return; } if (worker_unlock()) { - worker_has_acceptlock = 0; + worker->has_lock = 0; kore_platform_disable_accept(); - worker_debug("%d: released %d/%d\n", worker->id, - worker_active_connections, worker_max_connections); } } @@ -375,6 +387,8 @@ worker_trylock(void) worker->id, worker->pid) != worker->id) return (0); + worker_debug("wrk#%d grabbed lock (%d/%d)\n", worker->id, + worker_active_connections, worker_max_connections); worker_decide_next(); return (1); @@ -383,12 +397,8 @@ worker_trylock(void) static int worker_unlock(void) { - if (accept_lock->next == worker->id) { - worker_debug("%d: retaining lock\n", worker->id); - worker_decide_next(); - return (0); - } - + worker_debug("%d: wrk#%d releasing (%d/%d)\n", worker->id, worker->id, + worker_active_connections, worker_max_connections); if (__sync_val_compare_and_swap(&(accept_lock->lock), accept_lock->current, accept_lock->next) != accept_lock->current) kore_log(LOG_NOTICE, "kore_internal_unlock(): wasnt locked"); @@ -399,25 +409,14 @@ worker_unlock(void) static void worker_decide_next(void) { - u_int16_t id, load; - struct kore_worker *kw, *low; - - low = NULL; - load = worker_max_connections; - for (id = 0; id < worker_count; id++) { - kw = WORKER(id); - if (kw->load < load) { - load = kw->load; - low = kw; - } - } + struct kore_worker *kw; - if (low == NULL) { - low = WORKER(accept_lock->workerid++); - if (accept_lock->workerid == worker_count) - accept_lock->workerid = 0; - } + kw = WORKER(accept_lock->workerid++); + worker_debug("%d: next wrk#%d (%d, %p)\n", + worker->id, kw->id, kw->pid, kw); + if (accept_lock->workerid == worker_count) + accept_lock->workerid = 0; - accept_lock->next = low->id; + accept_lock->next = kw->id; accept_lock->current = worker->pid; }