kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 8b0279879ae3ca5e6e358c44cd4404e89d7e8ee4
parent ce012e7bd5dac16ca44921f2ad89bc51b1ff7d19
Author: Joris Vink <joris@coders.se>
Date:   Thu, 21 Mar 2019 10:17:08 +0100

rework timers so they fire more predictably.

this change also stops python coroutines from waking up very
late after their timeout has expired.

in filerefs, don't prime the timer until we actually have something
to expire, and kill the timer when the last ref drops.

Diffstat:
include/kore/kore.h | 4+++-
src/bsd.c | 14++++++++++----
src/fileref.c | 21++++++++++++++++++++-
src/kore.c | 16+++++-----------
src/linux.c | 9+++++++--
src/timer.c | 32+++++++++++++++++++++-----------
src/worker.c | 37+++++++++++--------------------------
7 files changed, 77 insertions(+), 56 deletions(-)

diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -75,6 +75,7 @@ extern int daemon(int, int); #define KORE_TLS_VERSION_1_2 1 #define KORE_TLS_VERSION_BOTH 2 +#define KORE_WAIT_INFINITE (u_int64_t)-1 #define KORE_RESEED_TIME (1800 * 1000) #define errno_s strerror(errno) @@ -624,7 +625,8 @@ struct kore_auth *kore_auth_lookup(const char *); #endif void kore_timer_init(void); -u_int64_t kore_timer_run(u_int64_t); +void kore_timer_run(u_int64_t); +u_int64_t kore_timer_next_run(u_int64_t); void kore_timer_remove(struct kore_timer *); struct kore_timer *kore_timer_add(void (*cb)(void *, u_int64_t), u_int64_t, void *, int); diff --git a/src/bsd.c b/src/bsd.c @@ -119,12 +119,18 @@ kore_platform_event_wait(u_int64_t timer) { u_int32_t r; struct kore_event *evt; - struct timespec timeo; int n, i; + struct timespec timeo, *ts; - timeo.tv_sec = timer / 1000; - timeo.tv_nsec = (timer % 1000) * 1000000; - n = kevent(kfd, NULL, 0, events, event_count, &timeo); + if (timer == KORE_WAIT_INFINITE) { + ts = NULL; + } else { + timeo.tv_sec = timer / 1000; + timeo.tv_nsec = (timer % 1000) * 1000000; + ts = &timeo; + } + + n = kevent(kfd, NULL, 0, events, event_count, ts); if (n == -1) { if (errno == EINTR) return; diff --git a/src/fileref.c b/src/fileref.c @@ -28,19 +28,20 @@ /* cached filerefs expire after 30 seconds of inactivity. */ #define FILEREF_EXPIRATION (1000 * 30) +static void fileref_timer_prime(void); static void fileref_drop(struct kore_fileref *); static void fileref_soft_remove(struct kore_fileref *); static void fileref_expiration_check(void *, u_int64_t); static TAILQ_HEAD(, kore_fileref) refs; static struct kore_pool ref_pool; +static struct kore_timer *ref_timer = NULL; void kore_fileref_init(void) { TAILQ_INIT(&refs); kore_pool_init(&ref_pool, "ref_pool", sizeof(struct kore_fileref), 100); - kore_timer_add(fileref_expiration_check, 10000, NULL, 0); } struct kore_fileref * @@ -48,6 +49,8 @@ kore_fileref_create(const char *path, int fd, off_t size, struct timespec *ts) { struct kore_fileref *ref; + fileref_timer_prime(); + if ((ref = kore_fileref_get(path)) != NULL) return (ref); @@ -152,6 +155,15 @@ kore_fileref_release(struct kore_fileref *ref) } static void +fileref_timer_prime(void) +{ + if (ref_timer != NULL) + return; + + ref_timer = kore_timer_add(fileref_expiration_check, 10000, NULL, 0); +} + +static void fileref_soft_remove(struct kore_fileref *ref) { if (ref->flags & KORE_FILEREF_SOFT_REMOVED) @@ -173,6 +185,7 @@ fileref_expiration_check(void *arg, u_int64_t now) { struct kore_fileref *ref, *next; + printf("ref timer run\n"); for (ref = TAILQ_FIRST(&refs); ref != NULL; ref = next) { next = TAILQ_NEXT(ref, list); @@ -188,6 +201,12 @@ fileref_expiration_check(void *arg, u_int64_t now) fileref_drop(ref); } + + if (TAILQ_EMPTY(&refs)) { + /* remove the timer. */ + ref_timer->flags |= KORE_TIMER_ONESHOT; + ref_timer = NULL; + } } static void diff --git a/src/kore.c b/src/kore.c @@ -598,7 +598,7 @@ kore_server_start(int argc, char *argv[]) u_int32_t tmp; int quit; struct kore_runtime_call *rcall; - u_int64_t now, netwait, timerwait; + u_int64_t netwait; if (foreground == 0) { if (daemon(1, 0) == -1) @@ -657,7 +657,7 @@ kore_server_start(int argc, char *argv[]) kore_timer_init(); #if !defined(KORE_NO_HTTP) - kore_timer_add(kore_accesslog_run, 10, NULL, 0); + kore_timer_add(kore_accesslog_run, 100, NULL, 0); #endif while (quit != 1) { @@ -686,20 +686,14 @@ kore_server_start(int argc, char *argv[]) sig_recv = 0; } - netwait = 100; - now = kore_time_ms(); - - timerwait = kore_timer_run(now); - if (timerwait < netwait) - netwait = timerwait; - + netwait = kore_timer_next_run(kore_time_ms()); kore_platform_event_wait(netwait); kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); + kore_timer_run(kore_time_ms()); } - now = kore_time_ms(); #if !defined(KORE_NO_HTTP) - kore_accesslog_gather(NULL, now, 1); + kore_accesslog_gather(NULL, kore_time_ms(), 1); #endif kore_platform_event_cleanup(); diff --git a/src/linux.c b/src/linux.c @@ -97,9 +97,14 @@ kore_platform_event_wait(u_int64_t timer) { u_int32_t r; struct kore_event *evt; - int n, i; + int n, i, timeo; - n = epoll_wait(efd, events, event_count, timer); + if (timer == KORE_WAIT_INFINITE) + timeo = -1; + else + timeo = timer; + + n = epoll_wait(efd, events, event_count, timeo); if (n == -1) { if (errno == EINTR) return; diff --git a/src/timer.c b/src/timer.c @@ -61,18 +61,32 @@ kore_timer_remove(struct kore_timer *timer) } u_int64_t +kore_timer_next_run(u_int64_t now) +{ + struct kore_timer *timer; + + if ((timer = TAILQ_FIRST(&kore_timers)) != NULL) { + if (timer->nextrun > now) + return (timer->nextrun - now); + return (0); + } + + return (KORE_WAIT_INFINITE); +} + +void kore_timer_run(u_int64_t now) { - struct kore_timer *timer, *t; - u_int64_t next_timer; + struct kore_timer *timer, *t, *prev; - next_timer = 1000; + prev = NULL; while ((timer = TAILQ_FIRST(&kore_timers)) != NULL) { - if (timer->nextrun > now) { - next_timer = timer->nextrun - now; + if (timer == prev) + break; + + if (timer->nextrun > now) break; - } TAILQ_REMOVE(&kore_timers, timer, list); timer->cb(timer->arg, now); @@ -80,6 +94,7 @@ kore_timer_run(u_int64_t now) if (timer->flags & KORE_TIMER_ONESHOT) { kore_free(timer); } else { + prev = timer; timer->nextrun = now + timer->interval; TAILQ_FOREACH(t, &kore_timers, list) { if (t->nextrun > timer->nextrun) { @@ -92,9 +107,4 @@ kore_timer_run(u_int64_t now) TAILQ_INSERT_TAIL(&kore_timers, timer, list); } } - - if (next_timer > 1) - next_timer -= 1; - - return (next_timer); } diff --git a/src/worker.c b/src/worker.c @@ -73,8 +73,8 @@ struct wlock { static int worker_trylock(void); static void worker_unlock(void); -static inline int worker_acceptlock_obtain(u_int64_t); -static inline int worker_acceptlock_release(u_int64_t); +static inline int worker_acceptlock_obtain(void); +static inline int worker_acceptlock_release(void); #if !defined(KORE_NO_TLS) static void worker_entropy_recv(struct kore_msg *, const void *); @@ -308,8 +308,7 @@ kore_worker_entry(struct kore_worker *kw) struct kore_runtime_call *rcall; char buf[16]; int quit, had_lock; - u_int64_t now, next_prune; - u_int64_t timerwait, netwait; + u_int64_t netwait, now, next_prune; #if !defined(KORE_NO_TLS) u_int64_t last_seed; #endif @@ -396,7 +395,6 @@ kore_worker_entry(struct kore_worker *kw) worker->restarted = 0; for (;;) { - netwait = 100; now = kore_time_ms(); #if !defined(KORE_NO_TLS) @@ -408,7 +406,7 @@ kore_worker_entry(struct kore_worker *kw) #endif if (!worker->has_lock && next_lock <= now) { - if (worker_acceptlock_obtain(now)) { + if (worker_acceptlock_obtain()) { if (had_lock == 0) { kore_platform_enable_accept(); had_lock = 1; @@ -418,25 +416,12 @@ kore_worker_entry(struct kore_worker *kw) } } - if (!worker->has_lock) { - if (worker_active_connections > 0) { - if (next_lock > now) - netwait = next_lock - now; - } else { - netwait = 10; - } - } - - timerwait = kore_timer_run(now); - if (timerwait < netwait) - netwait = timerwait; - + netwait = kore_timer_next_run(now); kore_platform_event_wait(netwait); + now = kore_time_ms(); if (worker->has_lock) { - if (netwait > 10) - now = kore_time_ms(); - if (worker_acceptlock_release(now)) + if (worker_acceptlock_release()) next_lock = now + WORKER_LOCK_TIMEOUT; } @@ -472,6 +457,8 @@ kore_worker_entry(struct kore_worker *kw) if (quit) break; + kore_timer_run(now); + #if !defined(KORE_NO_HTTP) http_process(); #endif @@ -618,7 +605,7 @@ kore_worker_make_busy(void) } static inline int -worker_acceptlock_release(u_int64_t now) +worker_acceptlock_release(void) { if (worker_count == WORKER_SOLO_COUNT || worker_no_lock == 1) return (0); @@ -637,7 +624,6 @@ worker_acceptlock_release(u_int64_t now) #if defined(WORKER_DEBUG) kore_log(LOG_DEBUG, "worker busy, releasing lock"); - kore_log(LOG_DEBUG, "had lock for %lu ms", now - worker->time_locked); #endif worker_unlock(); @@ -647,7 +633,7 @@ worker_acceptlock_release(u_int64_t now) } static inline int -worker_acceptlock_obtain(u_int64_t now) +worker_acceptlock_obtain(void) { int r; @@ -671,7 +657,6 @@ worker_acceptlock_obtain(u_int64_t now) if (worker_trylock()) { r = 1; worker->has_lock = 1; - worker->time_locked = now; #if defined(WORKER_DEBUG) kore_log(LOG_DEBUG, "got lock"); #endif