kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

tasks.c (7583B)



      1 /*
      2  * Copyright (c) 2014 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/queue.h>
     19 #include <sys/socket.h>
     20 
     21 #include <pthread.h>
     22 #include <stdio.h>
     23 #include <stdlib.h>
     24 
     25 #include "kore.h"
     26 #include "http.h"
     27 #include "tasks.h"
     28 
     29 #if defined(__linux__)
     30 #include "seccomp.h"
     31 
     32 static struct sock_filter filter_task[] = {
     33 	KORE_SYSCALL_ALLOW(clone),
     34 	KORE_SYSCALL_ALLOW(socketpair),
     35 	KORE_SYSCALL_ALLOW(set_robust_list),
     36 #if defined(SYS_clone3)
     37 	KORE_SYSCALL_ALLOW(clone3),
     38 #endif
     39 #if defined(SYS_rseq)
     40 	KORE_SYSCALL_ALLOW(rseq),
     41 #endif
     42 };
     43 #endif
     44 
     45 static u_int8_t				threads;
     46 static TAILQ_HEAD(, kore_task_thread)	task_threads;
     47 
     48 u_int16_t	kore_task_threads = KORE_TASK_THREADS;
     49 
     50 static void	*task_thread(void *);
     51 static void	task_channel_read(int, void *, u_int32_t);
     52 static void	task_channel_write(int, void *, u_int32_t);
     53 static void	task_thread_spawn(struct kore_task_thread **);
     54 
     55 #define THREAD_FD_ASSIGN(t, f, i, o)				\
     56 	do {							\
     57 		if (pthread_self() == t) {			\
     58 			f = i;					\
     59 		} else {					\
     60 			f = o;					\
     61 		}						\
     62 	} while (0);
     63 
     64 void
     65 kore_task_init(void)
     66 {
     67 	threads = 0;
     68 	TAILQ_INIT(&task_threads);
     69 
     70 #if defined(__linux__)
     71 	kore_seccomp_filter("task", filter_task, KORE_FILTER_LEN(filter_task));
     72 #endif
     73 }
     74 
     75 void
     76 kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *))
     77 {
     78 	t->cb = NULL;
     79 #if !defined(KORE_NO_HTTP)
     80 	t->req = NULL;
     81 #endif
     82 	t->evt.type = KORE_TYPE_TASK;
     83 	t->evt.handle = kore_task_handle;
     84 
     85 	t->entry = entry;
     86 	t->state = KORE_TASK_STATE_CREATED;
     87 	pthread_rwlock_init(&(t->lock), NULL);
     88 
     89 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
     90 		fatal("kore_task_create: socketpair() %s", errno_s);
     91 }
     92 
     93 void
     94 kore_task_run(struct kore_task *t)
     95 {
     96 	struct kore_task_thread		*tt;
     97 
     98 	kore_platform_schedule_read(t->fds[0], t);
     99 	if (threads < kore_task_threads) {
    100 		/* task_thread_spawn() will lock tt->lock for us. */
    101 		task_thread_spawn(&tt);
    102 	} else {
    103 		/* Cycle task around. */
    104 		if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
    105 			fatal("no available tasks threads?");
    106 		pthread_mutex_lock(&(tt->lock));
    107 		TAILQ_REMOVE(&task_threads, tt, list);
    108 		TAILQ_INSERT_TAIL(&task_threads, tt, list);
    109 	}
    110 
    111 	t->thread = tt;
    112 	TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
    113 
    114 	pthread_mutex_unlock(&(tt->lock));
    115 	pthread_cond_signal(&(tt->cond));
    116 }
    117 
    118 #if !defined(KORE_NO_HTTP)
    119 void
    120 kore_task_bind_request(struct kore_task *t, struct http_request *req)
    121 {
    122 	if (t->cb != NULL)
    123 		fatal("cannot bind cbs and requests at the same time");
    124 
    125 	t->req = req;
    126 	LIST_INSERT_HEAD(&(req->tasks), t, rlist);
    127 
    128 	http_request_sleep(req);
    129 }
    130 #endif
    131 
    132 void
    133 kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
    134 {
    135 #if !defined(KORE_NO_HTTP)
    136 	if (t->req != NULL)
    137 		fatal("cannot bind requests and cbs at the same time");
    138 #endif
    139 	t->cb = cb;
    140 }
    141 
    142 void
    143 kore_task_destroy(struct kore_task *t)
    144 {
    145 #if !defined(KORE_NO_HTTP)
    146 	if (t->req != NULL) {
    147 		t->req = NULL;
    148 		LIST_REMOVE(t, rlist);
    149 	}
    150 #endif
    151 
    152 	pthread_rwlock_wrlock(&(t->lock));
    153 
    154 	if (t->fds[0] != -1) {
    155 		(void)close(t->fds[0]);
    156 		t->fds[0] = -1;
    157 	}
    158 
    159 	if (t->fds[1] != -1) {
    160 		(void)close(t->fds[1]);
    161 		t->fds[1] = -1;
    162 	}
    163 
    164 	pthread_rwlock_unlock(&(t->lock));
    165 	pthread_rwlock_destroy(&(t->lock));
    166 }
    167 
    168 int
    169 kore_task_finished(struct kore_task *t)
    170 {
    171 	return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
    172 }
    173 
    174 void
    175 kore_task_finish(struct kore_task *t)
    176 {
    177 	pthread_rwlock_wrlock(&(t->lock));
    178 
    179 	if (t->fds[1] != -1) {
    180 		(void)close(t->fds[1]);
    181 		t->fds[1] = -1;
    182 	}
    183 
    184 	pthread_rwlock_unlock(&(t->lock));
    185 }
    186 
    187 void
    188 kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
    189 {
    190 	int		fd;
    191 
    192 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    193 	task_channel_write(fd, &len, sizeof(len));
    194 	task_channel_write(fd, data, len);
    195 }
    196 
    197 u_int32_t
    198 kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
    199 {
    200 	int		fd;
    201 	u_int32_t	dlen, bytes;
    202 
    203 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    204 	task_channel_read(fd, &dlen, sizeof(dlen));
    205 
    206 	if (dlen > len)
    207 		bytes = len;
    208 	else
    209 		bytes = dlen;
    210 
    211 	task_channel_read(fd, out, bytes);
    212 
    213 	return (dlen);
    214 }
    215 
    216 void
    217 kore_task_handle(void *arg, int finished)
    218 {
    219 	struct kore_task	*t = arg;
    220 
    221 #if !defined(KORE_NO_HTTP)
    222 	if (t->req != NULL)
    223 		http_request_wakeup(t->req);
    224 #endif
    225 
    226 	if (finished) {
    227 		kore_platform_disable_read(t->fds[0]);
    228 		kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
    229 #if !defined(KORE_NO_HTTP)
    230 		if (t->req != NULL) {
    231 			if (t->req->flags & HTTP_REQUEST_DELETE)
    232 				kore_task_destroy(t);
    233 		}
    234 #endif
    235 	}
    236 
    237 	if (t->cb != NULL)
    238 		t->cb(t);
    239 }
    240 
    241 int
    242 kore_task_state(struct kore_task *t)
    243 {
    244 	int	s;
    245 
    246 	pthread_rwlock_rdlock(&(t->lock));
    247 	s = t->state;
    248 	pthread_rwlock_unlock(&(t->lock));
    249 
    250 	return (s);
    251 }
    252 
    253 void
    254 kore_task_set_state(struct kore_task *t, int state)
    255 {
    256 	pthread_rwlock_wrlock(&(t->lock));
    257 	t->state = state;
    258 	pthread_rwlock_unlock(&(t->lock));
    259 }
    260 
    261 int
    262 kore_task_result(struct kore_task *t)
    263 {
    264 	int	r;
    265 
    266 	pthread_rwlock_rdlock(&(t->lock));
    267 	r = t->result;
    268 	pthread_rwlock_unlock(&(t->lock));
    269 
    270 	return (r);
    271 }
    272 
    273 void
    274 kore_task_set_result(struct kore_task *t, int result)
    275 {
    276 	pthread_rwlock_wrlock(&(t->lock));
    277 	t->result = result;
    278 	pthread_rwlock_unlock(&(t->lock));
    279 }
    280 
    281 static void
    282 task_channel_write(int fd, void *data, u_int32_t len)
    283 {
    284 	ssize_t		r;
    285 	u_int8_t	*d;
    286 	u_int32_t	offset;
    287 
    288 	d = data;
    289 	offset = 0;
    290 	while (offset != len) {
    291 		r = send(fd, d + offset, len - offset, 0);
    292 		if (r == -1 && errno == EINTR)
    293 			continue;
    294 		if (r == -1)
    295 			fatal("task_channel_write: %s", errno_s);
    296 		offset += r;
    297 	}
    298 }
    299 
    300 static void
    301 task_channel_read(int fd, void *out, u_int32_t len)
    302 {
    303 	ssize_t		r;
    304 	u_int8_t	*d;
    305 	u_int32_t	offset;
    306 
    307 	d = out;
    308 	offset = 0;
    309 	while (offset != len) {
    310 		r = read(fd, d + offset, len - offset);
    311 		if (r == -1 && errno == EINTR)
    312 			continue;
    313 		if (r == -1)
    314 			fatal("task_channel_read: %s", errno_s);
    315 		if (r == 0)
    316 			fatal("task_channel_read: unexpected eof");
    317 
    318 		offset += r;
    319 	}
    320 }
    321 
    322 static void
    323 task_thread_spawn(struct kore_task_thread **out)
    324 {
    325 	struct kore_task_thread		*tt;
    326 
    327 	tt = kore_malloc(sizeof(*tt));
    328 	tt->idx = threads++;
    329 
    330 	TAILQ_INIT(&(tt->tasks));
    331 	pthread_cond_init(&(tt->cond), NULL);
    332 	pthread_mutex_init(&(tt->lock), NULL);
    333 	pthread_mutex_lock(&(tt->lock));
    334 	TAILQ_INSERT_TAIL(&task_threads, tt, list);
    335 
    336 	if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
    337 		fatal("pthread_create: %s", errno_s);
    338 
    339 	*out = tt;
    340 }
    341 
    342 static void *
    343 task_thread(void *arg)
    344 {
    345 	struct kore_task		*t;
    346 	struct kore_task_thread		*tt = arg;
    347 
    348 	pthread_mutex_lock(&(tt->lock));
    349 
    350 	for (;;) {
    351 		if (TAILQ_EMPTY(&(tt->tasks)))
    352 			pthread_cond_wait(&(tt->cond), &(tt->lock));
    353 
    354 		t = TAILQ_FIRST(&(tt->tasks));
    355 		TAILQ_REMOVE(&(tt->tasks), t, list);
    356 		pthread_mutex_unlock(&(tt->lock));
    357 
    358 		kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
    359 		kore_task_set_result(t, t->entry(t));
    360 		kore_task_finish(t);
    361 
    362 		pthread_mutex_lock(&(tt->lock));
    363 	}
    364 
    365 	pthread_exit(NULL);
    366 
    367 	/* NOTREACHED */
    368 	return (NULL);
    369 }