kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 8f8ab9252172e86fabc3f713430e2eb594902aa7
parent 0de28488a69ba1170922b9d790abef5cbda19984
Author: Joris Vink <joris@coders.se>
Date:   Tue,  4 Jun 2013 13:43:11 +0200

upon quit time, workers will not accept new connections but will
handle all outstanding http requests.

Diffstat:
includes/http.h | 2++
src/http.c | 6++++++
src/kore.c | 41++++++++++++++++++++++++++++++-----------
3 files changed, 38 insertions(+), 11 deletions(-)

diff --git a/includes/http.h b/includes/http.h @@ -56,6 +56,8 @@ struct http_request { TAILQ_ENTRY(http_request) list; }; +extern int http_request_count; + void http_init(void); void http_process(void); time_t http_date_to_time(char *); diff --git a/src/http.c b/src/http.c @@ -44,9 +44,12 @@ static int http_send_done(struct netbuf *); static TAILQ_HEAD(, http_request) http_requests; +int http_request_count; + void http_init(void) { + http_request_count = 0; TAILQ_INIT(&http_requests); } @@ -84,6 +87,7 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, if (out != NULL) *out = req; + http_request_count++; TAILQ_INSERT_TAIL(&http_requests, req, list); return (KORE_RESULT_OK); } @@ -100,6 +104,7 @@ http_process(void) if (req->flags & HTTP_REQUEST_DELETE) { TAILQ_REMOVE(&http_requests, req, list); http_request_free(req); + http_request_count--; continue; } @@ -128,6 +133,7 @@ http_process(void) if (r != KORE_RESULT_RETRY) { TAILQ_REMOVE(&http_requests, req, list); http_request_free(req); + http_request_count--; } } } diff --git a/src/kore.c b/src/kore.c @@ -57,6 +57,7 @@ static TAILQ_HEAD(, connection) worker_clients; static TAILQ_HEAD(, kore_worker) kore_workers; static struct passwd *pw = NULL; static u_int16_t workerid = 0; +static u_int16_t cpu_count = 1; int server_port = 0; char *server_ip = NULL; @@ -100,6 +101,10 @@ main(int argc, char *argv[]) fatal("missing a username to run as"); if ((pw = getpwnam(runas_user)) == NULL) fatal("user '%s' does not exist"); + if ((cpu_count = sysconf(_SC_NPROCESSORS_ONLN)) == -1) { + kore_log("could not get number of cpu's falling back to 1"); + cpu_count = 1; + } if (!kore_server_bind(&server, server_ip, server_port)) fatal("cannot bind to %s:%d", server_ip, server_port); @@ -111,14 +116,11 @@ main(int argc, char *argv[]) fatal("chroot(%s): %s", chroot_path, errno_s); if (chdir("/") == -1) fatal("chdir(/): %s", errno_s); - if (setgroups(1, &pw->pw_gid) || setresgid(pw->pw_gid, pw->pw_gid, - pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) - fatal("unable to drop privileges"); kore_worker_init(); sig_recv = 0; - signal(SIGINT, kore_signal); + signal(SIGQUIT, kore_signal); signal(SIGHUP, kore_signal); for (;;) { @@ -131,7 +133,7 @@ main(int argc, char *argv[]) kw->pid, errno_s); } } - } else if (sig_recv == SIGINT) { + } else if (sig_recv == SIGQUIT) { break; } sig_recv = 0; @@ -426,7 +428,10 @@ kore_worker_init(void) if (worker_count == 0) fatal("no workers specified"); + kore_log("kore_worker_init(): system has %d cpu's", cpu_count); kore_log("kore_worker_init(): starting %d workers", worker_count); + if (worker_count > cpu_count) + kore_log("kore_worker_init(): more workers then cpu's"); TAILQ_INIT(&kore_workers); for (i = 0; i < worker_count; i++) @@ -461,7 +466,10 @@ kore_worker_wait(int final) struct kore_worker *kw, *next; memset(&info, 0, sizeof(info)); - r = waitid(P_ALL, 0, &info, WEXITED | WNOHANG); + if (final) + r = waitid(P_ALL, 0, &info, WEXITED); + else + r = waitid(P_ALL, 0, &info, WEXITED | WNOHANG); if (r == -1) { kore_log("waitid(): %s", errno_s); return; @@ -496,9 +504,13 @@ static void kore_worker_entry(struct kore_worker *kw) { struct epoll_event *events; - int n, i, *fd; struct connection *c, *cnext; struct kore_worker *k, *next; + int n, i, *fd, quit; + + if (setgroups(1, &pw->pw_gid) || setresgid(pw->pw_gid, pw->pw_gid, + pw->pw_gid) || setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) + fatal("unable to drop privileges"); for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) { next = TAILQ_NEXT(k, list); @@ -515,20 +527,21 @@ kore_worker_entry(struct kore_worker *kw) sig_recv = 0; signal(SIGHUP, kore_signal); - signal(SIGINT, kore_signal); + signal(SIGQUIT, kore_signal); http_init(); TAILQ_INIT(&disconnected); TAILQ_INIT(&worker_clients); + quit = 0; kore_event(server.fd, EPOLLIN, &server); events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); for (;;) { if (sig_recv != 0) { if (sig_recv == SIGHUP) kore_module_reload(); - else if (sig_recv == SIGINT) - break; + else if (sig_recv == SIGQUIT) + quit = 1; sig_recv = 0; } @@ -556,7 +569,8 @@ kore_worker_entry(struct kore_worker *kw) } if (*fd == server.fd) { - kore_server_accept(&server); + if (!quit) + kore_server_accept(&server); } else { c = (struct connection *)events[i].data.ptr; if (!kore_connection_handle(c, @@ -571,15 +585,20 @@ kore_worker_entry(struct kore_worker *kw) cnext = TAILQ_NEXT(c, list); kore_server_final_disconnect(c); } + + if (quit && http_request_count == 0) + break; } for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) { cnext = TAILQ_NEXT(c, list); + net_send_flush(c); kore_server_final_disconnect(c); } for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { cnext = TAILQ_NEXT(c, list); + net_send_flush(c); kore_server_final_disconnect(c); }