kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 9aa0e95643303805113970b6fc75fffda06c96c0
parent 0d6a188b0146dcb491eba9121eb76f0c84634bac
Author: Joris Vink <joris@coders.se>
Date:   Sat, 22 Dec 2018 09:25:00 +0100

Rework accesslog handling.

Move away from the parent constantly hitting the disk for every
accesslog the workers are sending.

The workers will now write their own accesslogs to shared
memory before the parent will pick those up. The parent
will flush them to disk once every second or if they grow
larger then 1MB.

This removes the heavy penalty for having access logs
turned on when you are dealing with a large volume
of requests.

Diffstat:
include/kore/kore.h | 53++++++++++++++++++++++++++++++++++++++++-------------
src/accesslog.c | 368+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
src/bsd.c | 17++++++++---------
src/connection.c | 2+-
src/domain.c | 43+++++++++++++++++++++++++++++++++++++++++++
src/http.c | 2+-
src/kore.c | 17+++++++++++++++--
src/linux.c | 8+++-----
src/msg.c | 12------------
src/net.c | 10++++++----
src/worker.c | 20++++++++++++++------
11 files changed, 377 insertions(+), 175 deletions(-)

diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -368,6 +368,19 @@ struct kore_module_handle { }; #endif +/* + * The workers get a 128KB log buffer per worker, and parent will fetch their + * logs when it reached at least 75% of that or if its been > 1 second since + * it was last synced. + */ +#define KORE_ACCESSLOG_BUFLEN 131072 +#define KORE_ACCESSLOG_SYNC 98304 + +struct kore_alog_header { + u_int16_t domain; + u_int16_t loglen; +} __attribute__((packed)); + struct kore_worker { u_int8_t id; u_int8_t cpu; @@ -378,11 +391,22 @@ struct kore_worker { int restarted; u_int64_t time_locked; struct kore_module_handle *active_hdlr; + + /* Used by the workers to store accesslogs. */ + struct { + int lock; + size_t offset; + char buf[KORE_ACCESSLOG_BUFLEN]; + } lb; }; struct kore_domain { + u_int16_t id; char *domain; int accesslog; + struct kore_buf *logbuf; + int logerr; + u_int64_t logwarn; #if !defined(KORE_NO_TLS) char *cafile; char *crlfile; @@ -462,15 +486,14 @@ struct kore_timer { #define KORE_WORKER_KEYMGR 0 /* Reserved message ids, registered on workers. */ -#define KORE_MSG_ACCESSLOG 1 -#define KORE_MSG_WEBSOCKET 2 -#define KORE_MSG_KEYMGR_REQ 3 -#define KORE_MSG_KEYMGR_RESP 4 -#define KORE_MSG_SHUTDOWN 5 -#define KORE_MSG_ENTROPY_REQ 6 -#define KORE_MSG_ENTROPY_RESP 7 -#define KORE_MSG_CERTIFICATE 8 -#define KORE_MSG_CERTIFICATE_REQ 9 +#define KORE_MSG_WEBSOCKET 1 +#define KORE_MSG_KEYMGR_REQ 2 +#define KORE_MSG_KEYMGR_RESP 3 +#define KORE_MSG_SHUTDOWN 4 +#define KORE_MSG_ENTROPY_REQ 4 +#define KORE_MSG_ENTROPY_RESP 6 +#define KORE_MSG_CERTIFICATE 7 +#define KORE_MSG_CERTIFICATE_REQ 8 /* Predefined message targets. */ #define KORE_MSG_PARENT 1000 @@ -515,6 +538,8 @@ extern char *kore_root_path; extern char *kore_runas_user; extern char *kore_tls_cipher_list; +extern volatile sig_atomic_t sig_recv; + #if !defined(KORE_NO_TLS) extern int tls_version; extern DH *tls_dhparam; @@ -564,7 +589,7 @@ void kore_platform_disable_read(int); void kore_platform_disable_write(int); void kore_platform_enable_accept(void); void kore_platform_disable_accept(void); -int kore_platform_event_wait(u_int64_t); +void kore_platform_event_wait(u_int64_t); void kore_platform_event_all(int, void *); void kore_platform_schedule_read(int, void *); void kore_platform_schedule_write(int, void *); @@ -580,9 +605,10 @@ void kore_platform_pledge(void); void kore_platform_add_pledge(const char *); #endif -void kore_accesslog_init(void); +void kore_accesslog_init(u_int16_t); void kore_accesslog_worker_init(void); -int kore_accesslog_write(const void *, u_int32_t); +void kore_accesslog_run(void *, u_int64_t); +void kore_accesslog_gather(void *, u_int64_t, int); #if !defined(KORE_NO_HTTP) int kore_auth_run(struct http_request *, struct kore_auth *); @@ -748,6 +774,7 @@ void kore_runtime_wsmessage(struct kore_runtime_call *, struct connection *, u_int8_t, const void *, size_t); #endif +struct kore_domain *kore_domain_byid(u_int16_t); struct kore_domain *kore_domain_lookup(const char *); #if !defined(KORE_NO_HTTP) @@ -783,7 +810,7 @@ int net_write(struct connection *, size_t, size_t *); int net_write_tls(struct connection *, size_t, size_t *); void net_recv_reset(struct connection *, size_t, int (*cb)(struct netbuf *)); -void net_remove_netbuf(struct netbuf_head *, struct netbuf *); +void net_remove_netbuf(struct connection *, struct netbuf *); void net_recv_queue(struct connection *, size_t, int, int (*cb)(struct netbuf *)); void net_recv_expand(struct connection *c, size_t, diff --git a/src/accesslog.c b/src/accesslog.c @@ -16,31 +16,42 @@ #include <sys/socket.h> -#include <poll.h> #include <time.h> +#include <signal.h> #include "kore.h" #include "http.h" -struct kore_log_packet { - u_int8_t method; - int status; - size_t length; - int family; - u_int8_t addr[sizeof(struct in6_addr)]; - char host[KORE_DOMAINNAME_LEN]; - char path[HTTP_URI_LEN]; - char agent[HTTP_USERAGENT_LEN]; - char referer[HTTP_REFERER_LEN]; +/* + * The worker will write accesslogs to its worker data structure which is + * held in shared memory. + * + * Each accesslog is prefixed with the internal domain ID (2 bytes) and + * the length of the log entry (2 bytes) (packed in kore_alog_header). + * + * The parent will every 10ms fetch the produced accesslogs from the workers + * and copy them to its own log buffer. Once this log buffer becomes full + * or 1 second has passed the parent will parse the logs and append them + * to the correct domain logbuffer which is eventually flushed to disk. + */ + +#define LOGBUF_SIZE (KORE_ACCESSLOG_BUFLEN * worker_count) +#define DOMAIN_LOGBUF_LEN (1024 * 1024) +#define LOG_ENTRY_MINSIZE_GUESS 90 + +static void accesslog_lock(struct kore_worker *); +static void accesslog_unlock(struct kore_worker *); +static void accesslog_flush_cb(struct kore_domain *); +static void accesslog_flush(struct kore_domain *, u_int64_t, int); + +static u_int64_t time_cache = 0; +static char tbuf[128] = { '\0' }; + #if !defined(KORE_NO_TLS) - char cn[X509_CN_LENGTH]; +char cnbuf[1024] = { '\0' }; #endif -}; -void -kore_accesslog_init(void) -{ -} +static struct kore_buf *logbuf = NULL; void kore_accesslog_worker_init(void) @@ -48,31 +59,20 @@ kore_accesslog_worker_init(void) kore_domain_closelogs(); } -int -kore_accesslog_write(const void *data, u_int32_t len) +void +kore_accesslog(struct http_request *req) { - int l; - time_t now; + struct timespec ts; struct tm *tm; - ssize_t sent; - struct kore_domain *dom; - struct kore_log_packet logpacket; - char *method, *buf, *cn; - char addr[INET6_ADDRSTRLEN], tbuf[128]; - - if (len != sizeof(struct kore_log_packet)) - return (KORE_RESULT_ERROR); - - (void)memcpy(&logpacket, data, sizeof(logpacket)); - - if ((dom = kore_domain_lookup(logpacket.host)) == NULL) { - kore_log(LOG_WARNING, - "got accesslog packet for unknown domain: %s", - logpacket.host); - return (KORE_RESULT_OK); - } + u_int64_t now; + struct kore_alog_header *hdr; + size_t avail; + time_t curtime; + int len, attempts; + char addr[INET6_ADDRSTRLEN]; + const char *ptr, *method, *cn, *referer; - switch (logpacket.method) { + switch (req->method) { case HTTP_METHOD_GET: method = "GET"; break; @@ -96,113 +96,237 @@ kore_accesslog_write(const void *data, u_int32_t len) break; } + if (req->referer != NULL) + referer = req->referer; + else + referer = "-"; + cn = "-"; #if !defined(KORE_NO_TLS) - if (logpacket.cn[0] != '\0') - cn = logpacket.cn; + if (req->owner->cert != NULL) { + if (X509_GET_CN(req->owner->cert, cnbuf, sizeof(cnbuf)) != -1) + cn = cnbuf; + } #endif - if (logpacket.family != AF_UNIX) { - if (inet_ntop(logpacket.family, &(logpacket.addr), - addr, sizeof(addr)) == NULL) - (void)kore_strlcpy(addr, "-", sizeof(addr)); - } else { - (void)kore_strlcpy(addr, "unix-socket", sizeof(addr)); + switch (req->owner->family) { + case AF_INET: + ptr = inet_ntop(req->owner->family, + &(req->owner->addr.ipv4.sin_addr), addr, sizeof(addr)); + break; + case AF_INET6: + ptr = inet_ntop(req->owner->family, + &(req->owner->addr.ipv6.sin6_addr), addr, sizeof(addr)); + break; + case AF_UNIX: + ptr = NULL; + break; + default: + fatal("unknown family %d", req->owner->family); } - time(&now); - tm = localtime(&now); - (void)strftime(tbuf, sizeof(tbuf), "%d/%b/%Y:%H:%M:%S %z", tm); - - l = asprintf(&buf, - "%s - %s [%s] \"%s %s HTTP/1.1\" %d %zu \"%s\" \"%s\"\n", - addr, cn, tbuf, method, logpacket.path, logpacket.status, - logpacket.length, logpacket.referer, logpacket.agent); - if (l == -1) { - kore_log(LOG_WARNING, - "kore_accesslog_write(): asprintf(): %s", errno_s); - return (KORE_RESULT_ERROR); + if (ptr == NULL) { + addr[0] = '-'; + addr[1] = '\0'; } - sent = write(dom->accesslog, buf, l); - if (sent == -1) { - free(buf); - kore_log(LOG_WARNING, - "kore_accesslog_write(): write(): %s", errno_s); - return (KORE_RESULT_ERROR); + now = kore_time_ms(); + if ((now - time_cache) >= 1000) { + time(&curtime); + tm = localtime(&curtime); + (void)strftime(tbuf, sizeof(tbuf), "%d/%b/%Y:%H:%M:%S %z", tm); + time_cache = now; } - if (sent != l) - kore_log(LOG_WARNING, "kore_accesslog_write(): short write"); + attempts = 0; + ts.tv_sec = 0; + ts.tv_nsec = 1000000; + + for (;;) { + if (attempts++ > 1000) { + if (getppid() == 1) { + if (kill(worker->pid, SIGQUIT) == -1) + fatal("failed to shutdown"); + return; + } + + attempts = 0; + } + + accesslog_lock(worker); - free(buf); - return (KORE_RESULT_OK); + avail = KORE_ACCESSLOG_BUFLEN - worker->lb.offset; + if (avail < sizeof(*hdr) + LOG_ENTRY_MINSIZE_GUESS) { + accesslog_unlock(worker); + nanosleep(&ts, NULL); + continue; + } + + hdr = (struct kore_alog_header *) + (worker->lb.buf + worker->lb.offset); + worker->lb.offset += sizeof(*hdr); + + len = snprintf(worker->lb.buf + worker->lb.offset, avail, + "%s - %s [%s] \"%s %s HTTP/1.1\" %d %zu \"%s\" \"%s\"\n", + addr, cn, tbuf, method, req->path, req->status, + req->content_length, referer, req->agent); + if (len == -1) + fatal("failed to create log entry"); + + if ((size_t)len >= avail) { + worker->lb.offset -= sizeof(*hdr); + accesslog_unlock(worker); + nanosleep(&ts, NULL); + continue; + } + + if ((size_t)len > USHRT_MAX) { + kore_log(LOG_WARNING, + "log entry length exceeds limit (%d)", len); + worker->lb.offset -= sizeof(*hdr); + break; + } + + hdr->loglen = len; + hdr->domain = req->hdlr->dom->id; + + worker->lb.offset += (size_t)len; + break; + } + + accesslog_unlock(worker); } void -kore_accesslog(struct http_request *req) +kore_accesslog_gather(void *arg, u_int64_t now, int force) { - struct kore_log_packet logpacket; + int id; + struct kore_worker *kw; + struct kore_alog_header *hdr; + struct kore_domain *dom; + size_t off, remain; - logpacket.family = req->owner->family; + if (logbuf == NULL) + logbuf = kore_buf_alloc(LOGBUF_SIZE); - switch (logpacket.family) { - case AF_INET: - memcpy(logpacket.addr, - &(req->owner->addr.ipv4.sin_addr), - sizeof(req->owner->addr.ipv4.sin_addr)); - break; - case AF_INET6: - memcpy(logpacket.addr, - &(req->owner->addr.ipv6.sin6_addr), - sizeof(req->owner->addr.ipv6.sin6_addr)); - break; - default: - break; - } + for (id = 0; id < worker_count; id++) { + kw = kore_worker_data(id); + + accesslog_lock(kw); - logpacket.status = req->status; - logpacket.method = req->method; - logpacket.length = req->content_length; - - if (kore_strlcpy(logpacket.host, - req->host, sizeof(logpacket.host)) >= sizeof(logpacket.host)) - kore_log(LOG_NOTICE, "kore_accesslog: host truncated"); - - if (kore_strlcpy(logpacket.path, - req->path, sizeof(logpacket.path)) >= sizeof(logpacket.path)) - kore_log(LOG_NOTICE, "kore_accesslog: path truncated"); - - if (req->agent != NULL) { - if (kore_strlcpy(logpacket.agent, req->agent, - sizeof(logpacket.agent)) >= sizeof(logpacket.agent)) - kore_log(LOG_NOTICE, "kore_accesslog: agent truncated"); - } else { - (void)kore_strlcpy(logpacket.agent, "-", - sizeof(logpacket.agent)); + if (force || kw->lb.offset >= KORE_ACCESSLOG_SYNC) { + kore_buf_append(logbuf, kw->lb.buf, kw->lb.offset); + kw->lb.offset = 0; + } + + accesslog_unlock(kw); } - if (req->referer != NULL) { - if (kore_strlcpy(logpacket.referer, req->referer, - sizeof(logpacket.referer)) >= sizeof(logpacket.referer)) { - kore_log(LOG_NOTICE, - "kore_accesslog: referer truncated"); + if (force || logbuf->offset >= LOGBUF_SIZE) { + off = 0; + remain = logbuf->offset; + + while (remain > 0) { + if (remain < sizeof(*hdr)) { + kore_log(LOG_ERR, + "invalid log buffer: (%zu remain)", remain); + break; + } + + hdr = (struct kore_alog_header *)(logbuf->data + off); + off += sizeof(*hdr); + remain -= sizeof(*hdr); + + if (hdr->loglen > remain) { + kore_log(LOG_ERR, + "invalid log header: %u (%zu remain)", + hdr->loglen, remain); + break; + } + + if ((dom = kore_domain_byid(hdr->domain)) == NULL) + fatal("unknown domain id %u", hdr->domain); + + if (dom->logbuf == NULL) + dom->logbuf = kore_buf_alloc(DOMAIN_LOGBUF_LEN); + + kore_buf_append(dom->logbuf, &logbuf->data[off], + hdr->loglen); + + off += hdr->loglen; + remain -= hdr->loglen; + + accesslog_flush(dom, now, force); } - } else { - (void)kore_strlcpy(logpacket.referer, "-", - sizeof(logpacket.referer)); + + kore_buf_reset(logbuf); } -#if !defined(KORE_NO_TLS) - memset(logpacket.cn, '\0', sizeof(logpacket.cn)); - if (req->owner->cert != NULL) { - if (X509_GET_CN(req->owner->cert, - logpacket.cn, sizeof(logpacket.cn)) == -1) { - kore_log(LOG_WARNING, "client cert without a CN?"); + if (force) + kore_domain_callback(accesslog_flush_cb); +} + +void +kore_accesslog_run(void *arg, u_int64_t now) +{ + static int ticks = 0; + + kore_accesslog_gather(arg, now, ticks++ % 100 ? 0 : 1); +} + +static void +accesslog_flush_cb(struct kore_domain *dom) +{ + accesslog_flush(dom, 0, 1); +} + +static void +accesslog_flush(struct kore_domain *dom, u_int64_t now, int force) +{ + ssize_t written; + + if (force && dom->logbuf == NULL) + return; + + if (force || dom->logbuf->offset >= DOMAIN_LOGBUF_LEN) { + written = write(dom->accesslog, dom->logbuf->data, + dom->logbuf->offset); + if (written == -1) { + if (errno == EINTR) + return; + if (dom->logwarn == 0 || + errno != dom->logerr) { + kore_log(LOG_NOTICE, + "error writing log for %s (%s)", + dom->domain, errno_s); + dom->logwarn = now; + dom->logerr = errno; + } + kore_buf_reset(dom->logbuf); + return; } + + if ((size_t)written != dom->logbuf->offset) { + kore_log(LOG_ERR, "partial accesslog write for %s", + dom->domain); + } + + kore_buf_reset(dom->logbuf); } -#endif +} - kore_msg_send(KORE_MSG_PARENT, - KORE_MSG_ACCESSLOG, &logpacket, sizeof(logpacket)); +static void +accesslog_lock(struct kore_worker *kw) +{ + for (;;) { + if (__sync_bool_compare_and_swap(&kw->lb.lock, 0, 1)) + break; + } +} + +static void +accesslog_unlock(struct kore_worker *kw) +{ + if (!__sync_bool_compare_and_swap(&kw->lb.lock, 1, 0)) + fatal("accesslog_unlock: failed to release"); } diff --git a/src/bsd.c b/src/bsd.c @@ -114,7 +114,7 @@ kore_platform_event_cleanup(void) } } -int +void kore_platform_event_wait(u_int64_t timer) { u_int32_t r; @@ -127,7 +127,7 @@ kore_platform_event_wait(u_int64_t timer) n = kevent(kfd, NULL, 0, events, event_count, &timeo); if (n == -1) { if (errno == EINTR) - return (0); + return; fatal("kevent(): %s", errno_s); } @@ -135,11 +135,12 @@ kore_platform_event_wait(u_int64_t timer) kore_debug("main(): %d sockets available", n); for (i = 0; i < n; i++) { - if (events[i].udata == NULL) - fatal("events[%d].udata == NULL", i); + evt = (struct kore_event *)events[i].udata; + + if (evt == NULL) + fatal("evt == NULL"); r = 0; - evt = (struct kore_event *)events[i].udata; if (events[i].filter == EVFILT_READ) evt->flags |= KORE_EVENT_READ; @@ -150,10 +151,8 @@ kore_platform_event_wait(u_int64_t timer) if (events[i].flags & EV_EOF || events[i].flags & EV_ERROR) r = 1; - evt->handle(events[i].udata, r); + evt->handle(evt, r); } - - return (r); } void @@ -257,7 +256,7 @@ kore_platform_sendfile(struct connection *c, struct netbuf *nb) nb->fd_off += len; if (len == 0 || nb->fd_off == nb->fd_len) { - net_remove_netbuf(&(c->send_queue), nb); + net_remove_netbuf(c, nb); c->snb = NULL; } diff --git a/src/connection.c b/src/connection.c @@ -400,7 +400,7 @@ kore_connection_remove(struct connection *c) for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); nb->flags &= ~NETBUF_MUST_RESEND; - net_remove_netbuf(&(c->send_queue), nb); + net_remove_netbuf(c, nb); } if (c->rnb != NULL) { diff --git a/src/domain.c b/src/domain.c @@ -39,6 +39,7 @@ #include "http.h" #endif +#define KORE_DOMAIN_CACHE 16 #define SSL_SESSION_ID "kore_ssl_sessionid" struct kore_domain_h domains; @@ -121,11 +122,19 @@ static RSA_METHOD keymgr_rsa = { #endif /* OPENSSL_VERSION_NUMBER */ #endif /* KORE_NO_TLS */ +static u_int16_t domain_id = 0; +static struct kore_domain *cached[KORE_DOMAIN_CACHE]; + void kore_domain_init(void) { + int i; + TAILQ_INIT(&domains); + for (i = 0; i < KORE_DOMAIN_CACHE; i++) + cached[i] = NULL; + #if !defined(KORE_NO_TLS) #if !defined(LIBRESSL_VERSION_TEXT) && OPENSSL_VERSION_NUMBER >= 0x10100000L if (keymgr_rsa_meth == NULL) { @@ -191,7 +200,11 @@ kore_domain_new(char *domain) kore_debug("kore_domain_new(%s)", domain); dom = kore_malloc(sizeof(*dom)); + dom->id = domain_id++; + dom->logbuf = NULL; dom->accesslog = -1; + dom->logwarn = 0; + dom->logerr = 0; #if !defined(KORE_NO_TLS) dom->cafile = NULL; dom->certkey = NULL; @@ -206,6 +219,12 @@ kore_domain_new(char *domain) TAILQ_INIT(&(dom->handlers)); #endif + if (dom->id < KORE_DOMAIN_CACHE) { + if (cached[dom->id] != NULL) + fatal("non free domain cache slot"); + cached[dom->id] = dom; + } + TAILQ_INSERT_TAIL(&domains, dom, list); if (primary_dom == NULL) @@ -444,6 +463,26 @@ kore_domain_lookup(const char *domain) return (NULL); } +struct kore_domain * +kore_domain_byid(u_int16_t id) +{ + struct kore_domain *dom; + + if (id < KORE_DOMAIN_CACHE) + return (cached[id]); + + TAILQ_FOREACH(dom, &domains, list) { + if (dom->id == id) + return (dom); + } + + return (NULL); +} + +/* + * Called by the worker processes to close the file descriptor towards + * the accesslog as they do not need it locally. + */ void kore_domain_closelogs(void) { @@ -452,6 +491,10 @@ kore_domain_closelogs(void) TAILQ_FOREACH(dom, &domains, list) { if (dom->accesslog != -1) { (void)close(dom->accesslog); + /* turn into flag to indicate accesslogs are active. */ + dom->accesslog = 1; + } else { + dom->accesslog = 0; } } } diff --git a/src/http.c b/src/http.c @@ -338,7 +338,7 @@ http_process_request(struct http_request *req) fatal("A page handler returned an unknown result: %d", r); } - if (req->hdlr->dom->accesslog != -1) + if (req->hdlr->dom->accesslog) kore_accesslog(req); req->flags |= HTTP_REQUEST_DELETE; diff --git a/src/kore.c b/src/kore.c @@ -215,7 +215,6 @@ main(int argc, char *argv[]) kore_platform_init(); #if !defined(KORE_NO_HTTP) - kore_accesslog_init(); if (http_body_disk_offload > 0) { if (mkdir(http_body_disk_path, 0700) == -1 && errno != EEXIST) { printf("can't create http_body_disk_path '%s': %s\n", @@ -599,6 +598,7 @@ kore_server_start(int argc, char *argv[]) u_int32_t tmp; int quit; struct kore_runtime_call *rcall; + u_int64_t now, netwait, timerwait; if (foreground == 0) { if (daemon(1, 0) == -1) @@ -655,6 +655,9 @@ kore_server_start(int argc, char *argv[]) quit = 0; worker_max_connections = tmp; + kore_timer_init(); + kore_timer_add(kore_accesslog_run, 10, NULL, 0); + while (quit != 1) { if (sig_recv != 0) { switch (sig_recv) { @@ -681,10 +684,20 @@ kore_server_start(int argc, char *argv[]) sig_recv = 0; } - kore_platform_event_wait(100); + netwait = 100; + now = kore_time_ms(); + + timerwait = kore_timer_run(now); + if (timerwait < netwait) + netwait = timerwait; + + kore_platform_event_wait(netwait); kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); } + now = kore_time_ms(); + kore_accesslog_gather(NULL, now, 1); + kore_platform_event_cleanup(); kore_connection_cleanup(); kore_domain_cleanup(); diff --git a/src/linux.c b/src/linux.c @@ -92,7 +92,7 @@ kore_platform_event_cleanup(void) } } -int +void kore_platform_event_wait(u_int64_t timer) { u_int32_t r; @@ -102,7 +102,7 @@ kore_platform_event_wait(u_int64_t timer) n = epoll_wait(efd, events, event_count, timer); if (n == -1) { if (errno == EINTR) - return (0); + return; fatal("epoll_wait(): %s", errno_s); } @@ -131,8 +131,6 @@ kore_platform_event_wait(u_int64_t timer) evt->handle(events[i].data.ptr, r); } - - return (r); } void @@ -240,7 +238,7 @@ resend: goto resend; if (sent == 0 || nb->fd_off == nb->fd_len) { - net_remove_netbuf(&(c->send_queue), nb); + net_remove_netbuf(c, nb); c->snb = NULL; } diff --git a/src/msg.c b/src/msg.c @@ -37,7 +37,6 @@ static void msg_disconnected_worker(struct connection *); static void msg_type_shutdown(struct kore_msg *, const void *); #if !defined(KORE_NO_HTTP) -static void msg_type_accesslog(struct kore_msg *, const void *); static void msg_type_websocket(struct kore_msg *, const void *); #endif @@ -59,10 +58,6 @@ kore_msg_parent_init(void) } kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown); - -#if !defined(KORE_NO_HTTP) - kore_msg_register(KORE_MSG_ACCESSLOG, msg_type_accesslog); -#endif } void @@ -231,13 +226,6 @@ msg_type_shutdown(struct kore_msg *msg, const void *data) #if !defined(KORE_NO_HTTP) static void -msg_type_accesslog(struct kore_msg *msg, const void *data) -{ - if (kore_accesslog_write(data, msg->length) == -1) - kore_log(LOG_WARNING, "failed to write to accesslog"); -} - -static void msg_type_websocket(struct kore_msg *msg, const void *data) { struct connection *c; diff --git a/src/net.c b/src/net.c @@ -138,6 +138,7 @@ net_send_stream(struct connection *c, void *data, size_t len, nb->flags = NETBUF_IS_STREAM; TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); + if (out != NULL) *out = nb; } @@ -243,7 +244,7 @@ net_send(struct connection *c) if (c->snb->s_off == c->snb->b_len || (c->snb->flags & NETBUF_FORCE_REMOVE)) { - net_remove_netbuf(&(c->send_queue), c->snb); + net_remove_netbuf(c, c->snb); c->snb = NULL; } @@ -300,9 +301,9 @@ net_recv_flush(struct connection *c) } void -net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) +net_remove_netbuf(struct connection *c, struct netbuf *nb) { - kore_debug("net_remove_netbuf(%p, %p)", list, nb); + kore_debug("net_remove_netbuf(%p, %p)", c, nb); if (nb->type == NETBUF_RECV) fatal("net_remove_netbuf(): cannot remove recv netbuf"); @@ -322,7 +323,8 @@ net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) if (nb->flags & NETBUF_IS_FILEREF) kore_fileref_release(nb->file_ref); - TAILQ_REMOVE(list, nb, list); + TAILQ_REMOVE(&(c->send_queue), nb, list); + kore_pool_put(&nb_pool, nb); } diff --git a/src/worker.c b/src/worker.c @@ -98,6 +98,7 @@ void kore_worker_init(void) { size_t len; + struct kore_worker *kw; u_int16_t i, cpu; worker_no_lock = 0; @@ -133,6 +134,12 @@ kore_worker_init(void) kore_debug("kore_worker_init(): more workers than cpu's"); } + /* Setup log buffers. */ + for (i = 0; i < worker_count; i++) { + kw = WORKER(i); + kw->lb.offset = 0; + } + cpu = 0; for (i = 0; i < worker_count; i++) { kore_worker_spawn(i, cpu++); @@ -298,8 +305,8 @@ kore_worker_entry(struct kore_worker *kw) { struct kore_runtime_call *rcall; char buf[16]; + int quit, had_lock; u_int64_t now, next_prune; - int quit, had_lock, r; u_int64_t timerwait, netwait; #if !defined(KORE_NO_TLS) u_int64_t last_seed; @@ -421,8 +428,9 @@ kore_worker_entry(struct kore_worker *kw) if (timerwait < netwait) netwait = timerwait; - r = kore_platform_event_wait(netwait); - if (worker->has_lock && r > 0) { + kore_platform_event_wait(netwait); + + if (worker->has_lock) { if (netwait > 10) now = kore_time_ms(); if (worker_acceptlock_release(now)) @@ -458,6 +466,9 @@ kore_worker_entry(struct kore_worker *kw) sig_recv = 0; } + if (quit) + break; + #if !defined(KORE_NO_HTTP) http_process(); #endif @@ -470,9 +481,6 @@ kore_worker_entry(struct kore_worker *kw) kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); next_prune = now + 500; } - - if (quit) - break; } rcall = kore_runtime_getcall("kore_worker_teardown");