commit 6de0f8568ad762ea599aa9f4eb7e3b6467633b41
parent 0e8bdf38c475e246ef8f1a3ed823bfa05fe64f89
Author: Joris Vink <joris@coders.se>
Date: Wed, 22 Oct 2014 21:16:49 +0200
Rework net, worker and some http internals.
- The net code no longer has a recv_queue, instead reuse same recv buffer.
- Introduce net_recv_reset() to reset the recv buffer when needed.
- Have the workers spread the load better between them by slightly
delaying their next accept lock and giving them an accept treshold
so they don't go ahead and keep accepting connections if they end
up winning the race constantly between the workers.
- The kore_worker_acceptlock_release() is no longer available.
- Prepopulate the HTTP server response header that is added to each
response in both normal HTTP and SPDY modes.
- The path and host members of http_request are now allocated on the heap.
These changes overall result better performance on a multicore machine,
especially the worker load changes shine through.
Diffstat:
12 files changed, 194 insertions(+), 166 deletions(-)
diff --git a/examples/generic/conf/generic.conf b/examples/generic/conf/generic.conf
@@ -26,7 +26,6 @@ domain 127.0.0.1 {
static /b64test serve_b64test
static /spdy-reset serve_spdyreset
static /upload serve_file_upload
- static /lock-test serve_lock_test
static /validator serve_validator
static /params-test serve_params_test
static /private serve_private
diff --git a/examples/generic/src/example.c b/examples/generic/src/example.c
@@ -27,7 +27,6 @@ int serve_intro(struct http_request *);
int serve_b64test(struct http_request *);
int serve_spdyreset(struct http_request *);
int serve_file_upload(struct http_request *);
-int serve_lock_test(struct http_request *);
int serve_validator(struct http_request *);
int serve_params_test(struct http_request *);
int serve_private(struct http_request *);
@@ -181,16 +180,6 @@ serve_file_upload(struct http_request *req)
return (KORE_RESULT_OK);
}
-int
-serve_lock_test(struct http_request *req)
-{
- kore_log(LOG_NOTICE, "lock-test called on worker %d", worker->id);
- kore_worker_acceptlock_release();
-
- http_response(req, 200, "OK", 2);
- return (KORE_RESULT_OK);
-}
-
void
test_base64(u_int8_t *src, u_int32_t slen, struct kore_buf *res)
{
diff --git a/examples/ktunnel/src/ktunnel.c b/examples/ktunnel/src/ktunnel.c
@@ -82,8 +82,6 @@ open_connection(struct http_request *req)
* Kore understands. We set the disconnect method so we get a callback
* whenever either of the connections will go away so we can cleanup the
* one it is attached to.
- *
- * We are storing the "piped" connection in hdlr_extra.
*/
static int
ktunnel_pipe_create(struct connection *c, const char *host, const char *port)
@@ -92,7 +90,6 @@ ktunnel_pipe_create(struct connection *c, const char *host, const char *port)
struct connection *cpipe;
u_int16_t nport;
int fd, err;
- struct netbuf *nb, *next;
nport = kore_strtonum(port, 10, 1, SHRT_MAX, &err);
if (err == KORE_RESULT_ERROR) {
@@ -143,20 +140,11 @@ ktunnel_pipe_create(struct connection *c, const char *host, const char *port)
kore_worker_connection_add(cpipe);
kore_connection_start_idletimer(cpipe);
-
- for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) {
- next = TAILQ_NEXT(nb, list);
- TAILQ_REMOVE(&(c->recv_queue), nb, list);
- kore_mem_free(nb->buf);
- kore_pool_put(&nb_pool, nb);
- }
-
kore_platform_event_all(cpipe->fd, cpipe);
- net_recv_queue(c, NETBUF_SEND_PAYLOAD_MAX,
- NETBUF_CALL_CB_ALWAYS, NULL, ktunnel_pipe_data);
- net_recv_queue(cpipe, NETBUF_SEND_PAYLOAD_MAX,
- NETBUF_CALL_CB_ALWAYS, NULL, ktunnel_pipe_data);
+ net_recv_reset(c, NETBUF_SEND_PAYLOAD_MAX, ktunnel_pipe_data);
+ net_recv_queue(cpipe, NETBUF_SEND_PAYLOAD_MAX, NETBUF_CALL_CB_ALWAYS,
+ ktunnel_pipe_data);
printf("connection started to %s (%p -> %p)\n", host, c, cpipe);
return (KORE_RESULT_OK);
@@ -176,9 +164,7 @@ ktunnel_pipe_data(struct netbuf *nb)
net_send_queue(dst, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN);
net_send_flush(dst);
-
- /* Reuse the netbuf so we don't have to recreate them all the time. */
- nb->s_off = 0;
+ net_recv_reset(src, NETBUF_SEND_PAYLOAD_MAX, ktunnel_pipe_data);
return (KORE_RESULT_OK);
}
diff --git a/includes/http.h b/includes/http.h
@@ -172,8 +172,8 @@ struct http_request {
u_int64_t start;
u_int64_t end;
u_int64_t total;
- char host[KORE_DOMAINNAME_LEN];
- char path[HTTP_URI_LEN];
+ char *host;
+ char *path;
char *agent;
struct connection *owner;
struct spdy_stream *stream;
diff --git a/includes/kore.h b/includes/kore.h
@@ -48,7 +48,6 @@ extern int daemon(int, int);
#define KORE_RESULT_OK 1
#define KORE_RESULT_RETRY 2
-#define KORE_NAME_STRING "kore"
#define KORE_VERSION_MAJOR 1
#define KORE_VERSION_MINOR 2
#define KORE_VERSION_STATE "current"
@@ -186,7 +185,6 @@ struct connection {
struct netbuf_head send_queue;
struct netbuf *snb;
- struct netbuf_head recv_queue;
struct netbuf *rnb;
u_int32_t client_stream_id;
@@ -255,6 +253,7 @@ struct kore_worker {
u_int8_t cpu;
pid_t pid;
u_int8_t has_lock;
+ u_int32_t accept_treshold;
struct kore_module_handle *active_hdlr;
};
@@ -350,7 +349,6 @@ void kore_worker_wait(int);
void kore_worker_init(void);
void kore_worker_shutdown(void);
void kore_worker_dispatch_signal(int);
-void kore_worker_acceptlock_release(void);
void kore_worker_spawn(u_int16_t, u_int16_t);
void kore_worker_entry(struct kore_worker *);
void kore_worker_connection_add(struct connection *);
@@ -363,7 +361,7 @@ void kore_platform_proctitle(char *);
void kore_platform_disable_read(int);
void kore_platform_enable_accept(void);
void kore_platform_disable_accept(void);
-void kore_platform_event_wait(u_int64_t);
+int kore_platform_event_wait(u_int64_t);
void kore_platform_event_all(int, void *);
void kore_platform_schedule_read(int, void *);
void kore_platform_event_schedule(int, int, int, void *);
@@ -463,7 +461,6 @@ u_int32_t net_read32(u_int8_t *);
void net_write16(u_int8_t *, u_int16_t);
void net_write32(u_int8_t *, u_int32_t);
void net_init(void);
-int net_recv(struct connection *);
int net_send(struct connection *);
int net_send_flush(struct connection *);
int net_recv_flush(struct connection *);
@@ -471,11 +468,13 @@ int net_read(struct connection *, int *);
int net_read_ssl(struct connection *, int *);
int net_write(struct connection *, int, int *);
int net_write_ssl(struct connection *, int, int *);
+void net_recv_reset(struct connection *, u_int32_t,
+ int (*cb)(struct netbuf *));
void net_remove_netbuf(struct netbuf_head *, struct netbuf *);
void net_recv_queue(struct connection *, u_int32_t, int,
- struct netbuf **, int (*cb)(struct netbuf *));
-int net_recv_expand(struct connection *c, struct netbuf *,
- u_int32_t, int (*cb)(struct netbuf *));
+ int (*cb)(struct netbuf *));
+void net_recv_expand(struct connection *c, u_int32_t, void *,
+ int (*cb)(struct netbuf *));
void net_send_queue(struct connection *, void *,
u_int32_t, struct spdy_stream *, int);
void net_send_stream(struct connection *, void *,
diff --git a/src/bsd.c b/src/bsd.c
@@ -82,9 +82,10 @@ kore_platform_event_init(void)
}
}
-void
+int
kore_platform_event_wait(u_int64_t timer)
{
+ u_int32_t r;
struct listener *l;
struct connection *c;
u_int8_t type;
@@ -96,7 +97,7 @@ kore_platform_event_wait(u_int64_t timer)
n = kevent(kfd, changelist, nchanges, events, event_count, &timeo);
if (n == -1) {
if (errno == EINTR)
- return;
+ return (0);
fatal("kevent(): %s", errno_s);
}
@@ -104,6 +105,7 @@ kore_platform_event_wait(u_int64_t timer)
if (n > 0)
kore_debug("main(): %d sockets available", n);
+ r = 0;
for (i = 0; i < n; i++) {
if (events[i].udata == NULL)
fatal("events[%d].udata == NULL", i);
@@ -139,12 +141,14 @@ kore_platform_event_wait(u_int64_t timer)
case KORE_TYPE_LISTENER:
l = (struct listener *)events[i].udata;
- while (worker_active_connections <
+ while (r < worker->accept_treshold &&
+ worker_active_connections <
worker_max_connections) {
kore_connection_accept(l, &c);
if (c == NULL)
break;
+ r++;
kore_platform_event_all(c->fd, c);
}
break;
@@ -181,6 +185,8 @@ kore_platform_event_wait(u_int64_t timer)
fatal("wrong type in event %d", type);
}
}
+
+ return (r);
}
void
diff --git a/src/connection.c b/src/connection.c
@@ -42,6 +42,8 @@ kore_connection_new(void *owner)
c->ssl = NULL;
c->flags = 0;
+ c->rnb = NULL;
+ c->snb = NULL;
c->cert = NULL;
c->owner = owner;
c->disconnect = NULL;
@@ -58,7 +60,6 @@ kore_connection_new(void *owner)
c->idle_timer.length = KORE_IDLE_TIMER_MAX;
TAILQ_INIT(&(c->send_queue));
- TAILQ_INIT(&(c->recv_queue));
TAILQ_INIT(&(c->spdy_streams));
TAILQ_INIT(&(c->http_requests));
@@ -111,7 +112,7 @@ kore_connection_accept(struct listener *l, struct connection **out)
if (http_keepalive_time != 0)
c->idle_timer.length = http_keepalive_time * 1000;
- net_recv_queue(c, http_header_max, NETBUF_CALL_CB_ALWAYS, NULL,
+ net_recv_queue(c, http_header_max, NETBUF_CALL_CB_ALWAYS,
http_header_recv);
#endif
@@ -203,7 +204,7 @@ kore_connection_handle(struct connection *c)
c->proto = CONN_PROTO_SPDY;
c->idle_timer.length = spdy_idle_time;
net_recv_queue(c, SPDY_FRAME_SIZE, 0,
- NULL, spdy_frame_recv);
+ spdy_frame_recv);
} else if (!memcmp(data, "http/1.1", MIN(8, len))) {
c->proto = CONN_PROTO_HTTP;
if (http_keepalive_time != 0) {
@@ -212,7 +213,7 @@ kore_connection_handle(struct connection *c)
}
net_recv_queue(c, http_header_max,
- NETBUF_CALL_CB_ALWAYS, NULL,
+ NETBUF_CALL_CB_ALWAYS,
http_header_recv);
} else {
kore_log(LOG_NOTICE,
@@ -227,7 +228,7 @@ kore_connection_handle(struct connection *c)
}
net_recv_queue(c, http_header_max,
- NETBUF_CALL_CB_ALWAYS, NULL,
+ NETBUF_CALL_CB_ALWAYS,
http_header_recv);
}
@@ -304,11 +305,9 @@ kore_connection_remove(struct connection *c)
kore_pool_put(&nb_pool, nb);
}
- for (nb = TAILQ_FIRST(&(c->recv_queue)); nb != NULL; nb = next) {
- next = TAILQ_NEXT(nb, list);
- TAILQ_REMOVE(&(c->recv_queue), nb, list);
- kore_mem_free(nb->buf);
- kore_pool_put(&nb_pool, nb);
+ if (c->rnb != NULL) {
+ kore_mem_free(c->rnb->buf);
+ kore_pool_put(&nb_pool, c->rnb);
}
for (s = TAILQ_FIRST(&(c->spdy_streams)); s != NULL; s = snext) {
diff --git a/src/http.c b/src/http.c
@@ -45,6 +45,9 @@ static void http_response_spdy(struct http_request *,
int, void *, u_int32_t);
static struct kore_buf *header_buf;
+static char http_version[32];
+static u_int16_t http_version_len;
+static char http_version_spdy[32];
static TAILQ_HEAD(, http_request) http_requests;
static TAILQ_HEAD(, http_request) http_requests_sleeping;
static struct kore_pool http_request_pool;
@@ -59,15 +62,29 @@ u_int64_t http_body_max = HTTP_BODY_MAX_LEN;
void
http_init(void)
{
- int prealloc;
+ int prealloc, l;
http_request_count = 0;
TAILQ_INIT(&http_requests);
TAILQ_INIT(&http_requests_sleeping);
+
header_buf = kore_buf_create(1024);
- prealloc = MIN((worker_max_connections / 10), 1000);
+ l = snprintf(http_version_spdy, sizeof(http_version_spdy),
+ "kore (%d.%d-%s)", KORE_VERSION_MAJOR,
+ KORE_VERSION_MINOR, KORE_VERSION_STATE);
+ if (l == -1 || (size_t)l >= sizeof(http_version_spdy))
+ fatal("http_init(): http_version_spdy buffer too small");
+
+ l = snprintf(http_version, sizeof(http_version),
+ "server: kore (%d.%d-%s)\r\n",
+ KORE_VERSION_MAJOR, KORE_VERSION_MINOR, KORE_VERSION_STATE);
+ if (l == -1 || (size_t)l >= sizeof(http_version))
+ fatal("http_init(): http_version buffer too small");
+ http_version_len = l;
+
+ prealloc = MIN((worker_max_connections / 10), 1000);
kore_pool_init(&http_request_pool, "http_request_pool",
sizeof(struct http_request), prealloc);
kore_pool_init(&http_header_pool, "http_header_pool",
@@ -141,8 +158,8 @@ http_request_new(struct connection *c, struct spdy_stream *s, const char *host,
if ((p = strrchr(host, ':')) != NULL)
*p = '\0';
- kore_strlcpy(req->host, host, sizeof(req->host));
- kore_strlcpy(req->path, path, sizeof(req->path));
+ req->host = kore_strdup(host);
+ req->path = kore_strdup(path);
if ((req->query_string = strchr(req->path, '?')) != NULL)
*(req->query_string)++ = '\0';
@@ -354,6 +371,12 @@ http_request_free(struct http_request *req)
kore_debug("http_request_free: %p->%p", req->owner, req);
+ kore_mem_free(req->host);
+ kore_mem_free(req->path);
+
+ req->host = NULL;
+ req->path = NULL;
+
TAILQ_REMOVE(&http_requests, req, list);
TAILQ_REMOVE(&(req->owner->http_requests), req, olist);
@@ -484,7 +507,6 @@ http_header_recv(struct netbuf *nb)
u_int64_t clen;
struct http_header *hdr;
struct http_request *req;
- struct netbuf *nnb;
u_int8_t *end_headers;
int h, i, v, skip, bytes_left;
char *request[4], *host[3], *hbuf;
@@ -620,8 +642,7 @@ http_header_recv(struct netbuf *nb)
if (bytes_left > 0) {
kore_debug("%ld/%ld (%ld - %ld) more bytes for body",
bytes_left, clen, nb->s_off, len);
- net_recv_queue(c, bytes_left, 0, &nnb, http_body_recv);
- nnb->extra = req;
+ net_recv_expand(c, bytes_left, req, http_body_recv);
} else if (bytes_left == 0) {
req->flags |= HTTP_REQUEST_COMPLETE;
req->flags &= ~HTTP_REQUEST_EXPECT_BODY;
@@ -1136,11 +1157,7 @@ http_response_spdy(struct http_request *req, struct connection *c,
hblock = spdy_header_block_create(SPDY_HBLOCK_NORMAL);
spdy_header_block_add(hblock, ":status", sbuf);
spdy_header_block_add(hblock, ":version", "HTTP/1.1");
-
- (void)snprintf(sbuf, sizeof(sbuf), "%s (%d.%d-%s)",
- KORE_NAME_STRING, KORE_VERSION_MAJOR, KORE_VERSION_MINOR,
- KORE_VERSION_STATE);
- spdy_header_block_add(hblock, ":server", sbuf);
+ spdy_header_block_add(hblock, ":server", http_version_spdy);
if (http_hsts_enable) {
(void)snprintf(sbuf, sizeof(sbuf),
@@ -1188,9 +1205,7 @@ http_response_normal(struct http_request *req, struct connection *c,
kore_buf_appendf(header_buf, "HTTP/1.1 %d %s\r\n",
status, http_status_text(status));
- kore_buf_appendf(header_buf, "server: %s (%d.%d-%s)\r\n",
- KORE_NAME_STRING, KORE_VERSION_MAJOR, KORE_VERSION_MINOR,
- KORE_VERSION_STATE);
+ kore_buf_append(header_buf, http_version, http_version_len);
if (c->flags & CONN_CLOSE_EMPTY)
connection_close = 1;
@@ -1238,10 +1253,8 @@ http_response_normal(struct http_request *req, struct connection *c,
if (d != NULL && req != NULL && req->method != HTTP_METHOD_HEAD)
net_send_queue(c, d, len, NULL, NETBUF_LAST_CHAIN);
- if (!(c->flags & CONN_CLOSE_EMPTY)) {
- net_recv_queue(c, http_header_max,
- NETBUF_CALL_CB_ALWAYS, NULL, http_header_recv);
- }
+ if (!(c->flags & CONN_CLOSE_EMPTY))
+ net_recv_reset(c, http_header_max, http_header_recv);
}
const char *
diff --git a/src/linux.c b/src/linux.c
@@ -71,9 +71,10 @@ kore_platform_event_init(void)
events = kore_calloc(event_count, sizeof(struct epoll_event));
}
-void
+int
kore_platform_event_wait(u_int64_t timer)
{
+ u_int32_t r;
struct connection *c;
struct listener *l;
u_int8_t type;
@@ -82,13 +83,14 @@ kore_platform_event_wait(u_int64_t timer)
n = epoll_wait(efd, events, event_count, timer);
if (n == -1) {
if (errno == EINTR)
- return;
+ return (0);
fatal("epoll_wait(): %s", errno_s);
}
if (n > 0)
kore_debug("main(): %d sockets available", n);
+ r = 0;
for (i = 0; i < n; i++) {
if (events[i].data.ptr == NULL)
fatal("events[%d].data.ptr == NULL", i);
@@ -125,12 +127,14 @@ kore_platform_event_wait(u_int64_t timer)
case KORE_TYPE_LISTENER:
l = (struct listener *)events[i].data.ptr;
- while (worker_active_connections <
+ while (r < worker->accept_treshold &&
+ worker_active_connections <
worker_max_connections) {
kore_connection_accept(l, &c);
if (c == NULL)
break;
+ r++;
kore_platform_event_all(c->fd, c);
}
break;
@@ -160,6 +164,8 @@ kore_platform_event_wait(u_int64_t timer)
fatal("wrong type in event %d", type);
}
}
+
+ return (r);
}
void
@@ -174,7 +180,7 @@ kore_platform_event_schedule(int fd, int type, int flags, void *udata)
{
struct epoll_event evt;
- kore_debug("kore_platform_event(%d, %d, %d, %p)",
+ kore_debug("kore_platform_event_schedule(%d, %d, %d, %p)",
fd, type, flags, udata);
evt.events = type;
@@ -207,6 +213,8 @@ kore_platform_enable_accept(void)
{
struct listener *l;
+ kore_debug("kore_platform_enable_accept()");
+
LIST_FOREACH(l, &listeners, list)
kore_platform_event_schedule(l->fd, EPOLLIN, 0, l);
}
@@ -216,6 +224,8 @@ kore_platform_disable_accept(void)
{
struct listener *l;
+ kore_debug("kore_platform_disable_accept()");
+
LIST_FOREACH(l, &listeners, list) {
if (epoll_ctl(efd, EPOLL_CTL_DEL, l->fd, NULL) == -1)
fatal("kore_platform_disable_accept: %s", errno_s);
diff --git a/src/net.c b/src/net.c
@@ -109,45 +109,62 @@ net_send_stream(struct connection *c, void *data, u_int32_t len,
}
void
-net_recv_queue(struct connection *c, u_int32_t len, int flags,
- struct netbuf **out, int (*cb)(struct netbuf *))
+net_recv_reset(struct connection *c, u_int32_t len, int (*cb)(struct netbuf *))
{
- struct netbuf *nb;
+ kore_debug("net_recv_reset(): %p %d", c, len);
- nb = kore_pool_get(&nb_pool);
- nb->cb = cb;
- nb->b_len = len;
- nb->m_len = len;
- nb->owner = c;
- nb->s_off = 0;
- nb->stream = NULL;
- nb->flags = flags;
- nb->type = NETBUF_RECV;
- nb->buf = kore_malloc(nb->b_len);
+ if (c->rnb->type != NETBUF_RECV)
+ fatal("net_recv_expand(): wrong netbuf type");
- TAILQ_INSERT_TAIL(&(c->recv_queue), nb, list);
- if (out != NULL)
- *out = nb;
+ c->rnb->cb = cb;
+ c->rnb->s_off = 0;
+ c->rnb->b_len = len;
+
+ if (c->rnb->b_len <= c->rnb->m_len &&
+ c->rnb->m_len < (NETBUF_SEND_PAYLOAD_MAX / 2))
+ return;
+
+ kore_mem_free(c->rnb->buf);
+ c->rnb->m_len = len;
+ c->rnb->buf = kore_malloc(c->rnb->m_len);
}
-int
-net_recv_expand(struct connection *c, struct netbuf *nb, u_int32_t len,
+void
+net_recv_queue(struct connection *c, u_int32_t len, int flags,
int (*cb)(struct netbuf *))
{
- if (nb->type != NETBUF_RECV) {
- kore_debug("net_recv_expand(): wrong netbuf type");
- return (KORE_RESULT_ERROR);
- }
+ kore_debug("net_recv_queue(): %p %d %d", c, len, flags);
+
+ if (c->rnb != NULL)
+ fatal("net_recv_queue(): called incorrectly for %p", c);
+
+ c->rnb = kore_pool_get(&nb_pool);
+ c->rnb->cb = cb;
+ c->rnb->owner = c;
+ c->rnb->s_off = 0;
+ c->rnb->b_len = len;
+ c->rnb->m_len = len;
+ c->rnb->extra = NULL;
+ c->rnb->stream = NULL;
+ c->rnb->flags = flags;
+ c->rnb->type = NETBUF_RECV;
+ c->rnb->buf = kore_malloc(c->rnb->b_len);
+}
- nb->cb = cb;
- nb->b_len += len;
- nb->m_len = nb->b_len;
- nb->buf = kore_realloc(nb->buf, nb->b_len);
+void
+net_recv_expand(struct connection *c, u_int32_t len, void *extra,
+ int (*cb)(struct netbuf *))
+{
+ kore_debug("net_recv_expand(): %p %d %p", c, len, extra);
- TAILQ_REMOVE(&(c->recv_queue), nb, list);
- TAILQ_INSERT_HEAD(&(c->recv_queue), nb, list);
+ if (c->rnb->type != NETBUF_RECV)
+ fatal("net_recv_expand(): wrong netbuf type");
- return (KORE_RESULT_OK);
+ c->rnb->cb = cb;
+ c->rnb->b_len += len;
+ c->rnb->extra = extra;
+ c->rnb->m_len = c->rnb->b_len;
+ c->rnb->buf = kore_realloc(c->rnb->buf, c->rnb->b_len);
}
int
@@ -217,50 +234,31 @@ net_send_flush(struct connection *c)
}
int
-net_recv(struct connection *c)
+net_recv_flush(struct connection *c)
{
int r;
- c->rnb = TAILQ_FIRST(&(c->recv_queue));
- if (c->rnb == NULL) {
- kore_debug("kore_read_client(): nb->cb == NULL");
- return (KORE_RESULT_ERROR);
- }
+ kore_debug("net_recv_flush(%p)", c);
- if (!c->read(c, &r))
- return (KORE_RESULT_ERROR);
- if (!(c->flags & CONN_READ_POSSIBLE))
- return (KORE_RESULT_OK);
+ if (c->rnb == NULL)
+ fatal("net_recv_flush(): nb->cb == NULL");
- kore_debug("net_recv(%ld/%ld bytes), progress with %d",
- c->rnb->s_off, c->rnb->b_len, r);
+ while (c->flags & CONN_READ_POSSIBLE) {
+ if (!c->read(c, &r))
+ return (KORE_RESULT_ERROR);
+ if (!(c->flags & CONN_READ_POSSIBLE))
+ break;
+
+ kore_debug("net_recv(%ld/%ld bytes), progress with %d",
+ c->rnb->s_off, c->rnb->b_len, r);
- c->rnb->s_off += (size_t)r;
- if (c->rnb->s_off == c->rnb->b_len ||
- (c->rnb->flags & NETBUF_CALL_CB_ALWAYS)) {
- r = c->rnb->cb(c->rnb);
+ c->rnb->s_off += (size_t)r;
if (c->rnb->s_off == c->rnb->b_len ||
- (c->rnb->flags & NETBUF_FORCE_REMOVE)) {
- net_remove_netbuf(&(c->recv_queue), c->rnb);
- c->rnb = NULL;
+ (c->rnb->flags & NETBUF_CALL_CB_ALWAYS)) {
+ r = c->rnb->cb(c->rnb);
+ if (r != KORE_RESULT_OK)
+ return (r);
}
-
- if (r != KORE_RESULT_OK)
- return (r);
- }
-
- return (KORE_RESULT_OK);
-}
-
-int
-net_recv_flush(struct connection *c)
-{
- kore_debug("net_recv_flush(%p)", c);
-
- while (!TAILQ_EMPTY(&(c->recv_queue)) &&
- (c->flags & CONN_READ_POSSIBLE)) {
- if (!net_recv(c))
- return (KORE_RESULT_ERROR);
}
return (KORE_RESULT_OK);
@@ -271,6 +269,9 @@ net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb)
{
kore_debug("net_remove_netbuf(%p, %p, %p)", list, nb, nb->stream);
+ if (nb->type == NETBUF_RECV)
+ fatal("net_remove_netbuf(): cannot remove recv netbuf");
+
nb->stream = NULL;
if (nb->flags & NETBUF_MUST_RESEND) {
kore_debug("retaining %p (MUST_RESEND)", nb);
diff --git a/src/spdy.c b/src/spdy.c
@@ -97,11 +97,12 @@ spdy_frame_recv(struct netbuf *nb)
break;
}
+ r = KORE_RESULT_OK;
+
if (cb != NULL) {
- r = net_recv_expand(c, nb, ctrl.length, cb);
+ net_recv_expand(c, ctrl.length, NULL, cb);
} else {
kore_debug("no callback for type %u", ctrl.type);
- r = KORE_RESULT_OK;
}
} else {
data.stream_id = net_read32(nb->buf) & ~(1 << 31);
@@ -122,15 +123,14 @@ spdy_frame_recv(struct netbuf *nb)
if ((int)data.length < 0) {
r = KORE_RESULT_ERROR;
} else {
- r = net_recv_expand(c, nb, data.length,
+ r = KORE_RESULT_OK;
+ net_recv_expand(c, data.length, NULL,
spdy_data_frame_recv);
}
}
}
- if (r == KORE_RESULT_OK) {
- net_recv_queue(c, SPDY_FRAME_SIZE, 0, NULL, spdy_frame_recv);
- } else {
+ if (r != KORE_RESULT_OK) {
r = KORE_RESULT_OK;
spdy_session_teardown(c, SPDY_SESSION_ERROR_PROTOCOL);
}
@@ -596,6 +596,7 @@ spdy_ctrl_frame_syn_stream(struct netbuf *nb)
kore_mem_free(method);
kore_mem_free(host);
kore_mem_free(version);
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
kore_debug("SPDY_SYN_STREAM: %u:%u:%u", s->stream_id,
s->flags, s->prio);
@@ -622,6 +623,7 @@ spdy_ctrl_frame_rst_stream(struct netbuf *nb)
}
spdy_stream_close(c, s, SPDY_REMOVE_NETBUFS);
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
return (KORE_RESULT_OK);
}
@@ -676,6 +678,8 @@ spdy_ctrl_frame_settings(struct netbuf *nb)
buf += 8;
}
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
+
return (KORE_RESULT_OK);
}
@@ -696,6 +700,8 @@ spdy_ctrl_frame_ping(struct netbuf *nb)
}
spdy_frame_send(c, SPDY_CTRL_FRAME_PING, 0, 4, NULL, id);
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
+
return (KORE_RESULT_OK);
}
@@ -739,6 +745,8 @@ spdy_ctrl_frame_window(struct netbuf *nb)
}
}
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
+
return (r);
}
@@ -806,6 +814,7 @@ spdy_data_frame_recv(struct netbuf *nb)
if (s->post_size == 0) {
req->flags |= HTTP_REQUEST_COMPLETE;
req->flags &= ~HTTP_REQUEST_EXPECT_BODY;
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
return (KORE_RESULT_OK);
}
@@ -826,7 +835,7 @@ spdy_data_frame_recv(struct netbuf *nb)
kore_buf_append(req->http_body, (nb->buf + SPDY_FRAME_SIZE),
data.length);
- if (data.flags & FLAG_FIN) {
+ if (data.flags & FLAG_FIN || req->http_body->offset == s->post_size) {
if (req->http_body->offset != s->post_size) {
kore_debug("FLAG_FIN before all POST data received");
return (KORE_RESULT_ERROR);
@@ -838,6 +847,8 @@ spdy_data_frame_recv(struct netbuf *nb)
req->flags &= ~HTTP_REQUEST_EXPECT_BODY;
}
+ net_recv_reset(c, SPDY_FRAME_SIZE, spdy_frame_recv);
+
/*
* XXX - This can be implemented better so we can stagger
* window updates a bit and not constantly hit flow control.
diff --git a/src/worker.c b/src/worker.c
@@ -42,6 +42,8 @@
#endif
#define KORE_SHM_KEY 15000
+#define WORKER_LOCK_TIMEOUT 500
+
#define WORKER(id) \
(struct kore_worker *)((u_int8_t *)kore_workers + \
(sizeof(struct kore_worker) * id))
@@ -54,7 +56,8 @@ struct wlock {
static int worker_trylock(void);
static void worker_unlock(void);
-static int kore_worker_acceptlock_obtain(void);
+static inline int kore_worker_acceptlock_obtain(void);
+static inline void kore_worker_acceptlock_release(void);
static TAILQ_HEAD(, connection) disconnected;
static TAILQ_HEAD(, connection) worker_clients;
@@ -179,8 +182,8 @@ kore_worker_entry(struct kore_worker *kw)
struct rlimit rl;
char buf[16];
struct connection *c, *cnext;
- int quit, had_lock;
- u_int64_t now, idle_check, timer;
+ int quit, had_lock, r;
+ u_int64_t now, idle_check, next_lock;
worker = kw;
@@ -235,6 +238,7 @@ kore_worker_entry(struct kore_worker *kw)
quit = 0;
had_lock = 0;
+ next_lock = 0;
idle_check = 0;
kore_platform_event_init();
kore_accesslog_worker_init();
@@ -250,6 +254,11 @@ kore_worker_entry(struct kore_worker *kw)
kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu);
kore_module_onload();
+ if (worker_count > 1)
+ kw->accept_treshold = worker_max_connections / 16;
+ else
+ kw->accept_treshold = worker_max_connections;
+
for (;;) {
if (sig_recv != 0) {
if (sig_recv == SIGHUP)
@@ -260,30 +269,33 @@ kore_worker_entry(struct kore_worker *kw)
sig_recv = 0;
}
- if (kore_worker_acceptlock_obtain()) {
- if (had_lock == 0)
- kore_platform_enable_accept();
+ now = kore_time_ms();
+
+ if (now > next_lock) {
+ if (kore_worker_acceptlock_obtain()) {
+ if (had_lock == 0) {
+ kore_platform_enable_accept();
+ had_lock = 1;
+ }
+ }
+ }
- timer = 100;
- had_lock = 1;
- } else {
+ if (!worker->has_lock) {
if (had_lock == 1) {
had_lock = 0;
kore_platform_disable_accept();
}
-
- timer = 1;
}
- kore_platform_event_wait(timer);
-
- if (worker->has_lock)
+ r = kore_platform_event_wait(100);
+ if (worker->has_lock && r > 0) {
kore_worker_acceptlock_release();
+ next_lock = now + WORKER_LOCK_TIMEOUT;
+ }
http_process();
- now = kore_time_ms();
- if ((now - idle_check) >= 1000) {
+ if ((now - idle_check) >= 10000) {
idle_check = now;
TAILQ_FOREACH(c, &worker_clients, list) {
if (c->proto == CONN_PROTO_SPDY &&
@@ -409,7 +421,7 @@ kore_worker_wait(int final)
}
}
-void
+static inline void
kore_worker_acceptlock_release(void)
{
if (worker_count == 1)
@@ -422,11 +434,14 @@ kore_worker_acceptlock_release(void)
worker->has_lock = 0;
}
-static int
+static inline int
kore_worker_acceptlock_obtain(void)
{
int r;
+ if (worker->has_lock == 1)
+ return (1);
+
if (worker_count == 1) {
worker->has_lock = 1;
return (1);