commit cf700b34f74aa853b71dba0add1566ae755265b5
parent 437f2e9ed2c56173412c27658ac9619d91aebaba
Author: Joris Vink <joris@coders.se>
Date: Sat, 28 Jun 2014 16:17:18 +0200
Add initial stab at asynchronous background tasks.
More to follow.
Diffstat:
8 files changed, 250 insertions(+), 0 deletions(-)
diff --git a/Makefile b/Makefile
@@ -28,6 +28,12 @@ ifneq ("$(PGSQL)", "")
CFLAGS+=-I$(shell pg_config --includedir) -DKORE_USE_PGSQL
endif
+ifneq ("$(TASKS)", "")
+ S_SRC+=contrib/tasks/kore_tasks.c
+ LDFLAGS+=-lpthread
+ CFLAGS+=-DKORE_USE_TASKS
+endif
+
OSNAME=$(shell uname -s | sed -e 's/[-_].*//g' | tr A-Z a-z)
ifeq ("$(OSNAME)", "darwin")
CFLAGS+=-I/opt/local/include/
diff --git a/contrib/tasks/kore_tasks.c b/contrib/tasks/kore_tasks.c
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2014 Joris Vink <joris@coders.se>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+#include <sys/queue.h>
+#include <sys/socket.h>
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "kore.h"
+#include "http.h"
+#include "kore_tasks.h"
+
+static u_int8_t threads;
+static pthread_mutex_t task_lock;
+static pthread_cond_t task_broadcast;
+
+static TAILQ_HEAD(, kore_task) task_list;
+static TAILQ_HEAD(, kore_task_thread) task_threads;
+
+static void *task_thread(void *);
+static void task_thread_spawn(void);
+
+void
+kore_task_init(void)
+{
+ threads = 0;
+
+ TAILQ_INIT(&task_list);
+ TAILQ_INIT(&task_threads);
+
+ pthread_mutex_init(&task_lock, NULL);
+ pthread_cond_init(&task_broadcast, NULL);
+
+ task_thread_spawn();
+}
+
+void
+kore_task_setup(struct http_request *req)
+{
+ int i;
+
+ for (i = 0; i < HTTP_TASK_MAX; i++)
+ req->tasks[i] = NULL;
+}
+
+void
+kore_task_create(struct http_request *req, int idx,
+ void (*entry)(struct kore_task *))
+{
+ if (idx >= HTTP_TASK_MAX)
+ fatal("kore_task_create: idx > HTTP_TASK_MAX");
+ if (req->tasks[idx] != NULL)
+ return;
+
+ req->flags |= HTTP_REQUEST_SLEEPING;
+
+ req->tasks[idx] = kore_malloc(sizeof(struct kore_task));
+ req->tasks[idx]->owner = req;
+ req->tasks[idx]->entry = entry;
+ req->tasks[idx]->type = KORE_TYPE_TASK;
+
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, req->tasks[idx]->fds) == -1)
+ fatal("kore_task_create: socketpair() %s", errno_s);
+
+ kore_platform_schedule_read(req->tasks[idx]->fds[0], req->tasks[idx]);
+
+ pthread_mutex_lock(&task_lock);
+ TAILQ_INSERT_TAIL(&task_list, req->tasks[idx], list);
+ pthread_mutex_unlock(&task_lock);
+
+ pthread_cond_broadcast(&task_broadcast);
+}
+
+void
+kore_task_finish(struct kore_task *t)
+{
+ kore_debug("kore_task_finished: %p", t);
+
+ close(t->fds[1]);
+}
+
+void
+kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
+{
+ kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len);
+}
+
+void
+kore_task_handle(struct kore_task *t, int finished)
+{
+ struct http_request *req = t->owner;
+
+ kore_debug("kore_task_handle: %p, %d", t, finished);
+
+ if (finished) {
+ close(t->fds[0]);
+ req->flags &= ~HTTP_REQUEST_SLEEPING;
+ kore_mem_free(t);
+ }
+}
+
+static void
+task_thread_spawn(void)
+{
+ struct kore_task_thread *tt;
+
+ tt = kore_malloc(sizeof(*tt));
+ tt->idx = threads++;
+ TAILQ_INSERT_TAIL(&task_threads, tt, list);
+
+ if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
+ fatal("pthread_create: %s", errno_s);
+}
+
+static void *
+task_thread(void *arg)
+{
+ struct kore_task *t;
+ struct kore_task_thread *tt = arg;
+
+ kore_debug("task_thread: #%d starting", tt->idx);
+
+ for (;;) {
+ pthread_mutex_lock(&task_lock);
+ if (TAILQ_EMPTY(&task_list))
+ pthread_cond_wait(&task_broadcast, &task_lock);
+
+ kore_debug("task_thread#%d: woke up", tt->idx);
+
+ t = TAILQ_FIRST(&task_list);
+ TAILQ_REMOVE(&task_list, t, list);
+ pthread_mutex_unlock(&task_lock);
+
+ kore_debug("task_thread#%d: executing %p", tt->idx, t);
+ t->thread = tt;
+ t->entry(t);
+ }
+
+ pthread_exit(NULL);
+
+ /* NOTREACHED */
+ return (NULL);
+}
diff --git a/includes/http.h b/includes/http.h
@@ -121,7 +121,9 @@ struct http_file {
#define HTTP_REQUEST_SLEEPING 0x04
#define HTTP_PGSQL_MAX 20
+#define HTTP_TASK_MAX 10
struct kore_pgsql;
+struct kore_task;
struct http_request {
u_int8_t method;
@@ -142,6 +144,7 @@ struct http_request {
struct kore_module_handle *hdlr;
struct kore_pgsql *pgsql[HTTP_PGSQL_MAX];
+ struct kore_task *tasks[HTTP_TASK_MAX];
TAILQ_HEAD(, http_header) req_headers;
TAILQ_HEAD(, http_header) resp_headers;
diff --git a/includes/kore.h b/includes/kore.h
@@ -107,6 +107,7 @@ TAILQ_HEAD(netbuf_head, netbuf);
#define KORE_TYPE_LISTENER 1
#define KORE_TYPE_CONNECTION 2
#define KORE_TYPE_PGSQL_CONN 3
+#define KORE_TYPE_TASK 4
struct listener {
u_int8_t type;
diff --git a/includes/kore_tasks.h b/includes/kore_tasks.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2014 Joris Vink <joris@coders.se>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef __H_KORE_TASKS
+#define __H_KORE_TASKS
+
+struct kore_task {
+ u_int8_t type;
+
+ int fds[2];
+ void *owner;
+ void *thread;
+ void (*entry)(struct kore_task *);
+
+ TAILQ_ENTRY(kore_task) list;
+};
+
+struct kore_task_thread {
+ u_int8_t idx;
+ pthread_t tid;
+
+ TAILQ_ENTRY(kore_task_thread) list;
+};
+
+void kore_task_init(void);
+void kore_task_finish(struct kore_task *);
+void kore_task_destroy(struct kore_task *);
+void kore_task_setup(struct http_request *);
+void kore_task_handle(struct kore_task *, int);
+void kore_task_create(struct http_request *, int,
+ void (*entry)(struct kore_task *));
+
+void kore_task_channel_write(struct kore_task *, void *, u_int32_t);
+void kore_task_channel_read(struct kore_task *,
+ u_int8_t **, u_int32_t *);
+
+#endif
diff --git a/src/http.c b/src/http.c
@@ -27,6 +27,10 @@
#include "contrib/postgres/kore_pgsql.h"
#endif
+#if defined(KORE_USE_TASKS)
+#include "kore_tasks.h"
+#endif
+
static char *http_status_text(int);
static int http_post_data_recv(struct netbuf *);
static char *http_post_data_text(struct http_request *);
@@ -137,6 +141,10 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host,
req->agent = kore_strdup("unknown");
}
+#if defined(KORE_USE_TASKS)
+ kore_task_setup(req);
+#endif
+
http_request_count++;
TAILQ_INSERT_HEAD(&http_requests, req, list);
TAILQ_INSERT_TAIL(&(c->http_requests), req, olist);
diff --git a/src/linux.c b/src/linux.c
@@ -25,6 +25,10 @@
#include "contrib/postgres/kore_pgsql.h"
#endif
+#if defined(KORE_USE_TASKS)
+#include "kore_tasks.h"
+#endif
+
static int efd = -1;
static u_int32_t event_count = 0;
static struct epoll_event *events = NULL;
@@ -103,6 +107,12 @@ kore_platform_event_wait(void)
}
#endif
+#if defined(KORE_USE_TASKS)
+ if (type == KORE_TYPE_TASK) {
+ kore_task_handle(events[i].data.ptr, 1);
+ continue;
+ }
+#endif
c = (struct connection *)events[i].data.ptr;
kore_connection_disconnect(c);
continue;
@@ -141,6 +151,11 @@ kore_platform_event_wait(void)
kore_pgsql_handle(events[i].data.ptr, 0);
break;
#endif
+#if defined(KORE_USE_TASK)
+ case KORE_TYPE_TASK:
+ kore_task_handle(events[i].data.ptr, 0);
+ break;
+#endif
default:
fatal("wrong type in event %d", type);
}
diff --git a/src/worker.c b/src/worker.c
@@ -29,6 +29,10 @@
#include "contrib/postgres/kore_pgsql.h"
#endif
+#if defined(KORE_USE_TASKS)
+#include "kore_tasks.h"
+#endif
+
#if defined(WORKER_DEBUG)
#define worker_debug(fmt, ...) printf(fmt, ##__VA_ARGS__)
#else
@@ -229,6 +233,10 @@ kore_worker_entry(struct kore_worker *kw)
kore_pgsql_init();
#endif
+#if defined(KORE_USE_TASKS)
+ kore_task_init();
+#endif
+
worker->accept_treshold = worker_max_connections / 10;
kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu);