kore

An easy to use, scalable and secure web application framework for writing web APIs in C.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

tasks.c (7988B)



      1 /*
      2  * Copyright (c) 2014 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/queue.h>
     19 #include <sys/socket.h>
     20 
     21 #include <pthread.h>
     22 #include <stdio.h>
     23 #include <stdlib.h>
     24 
     25 #include "kore.h"
     26 #include "http.h"
     27 #include "tasks.h"
     28 
     29 #if defined(__linux__)
     30 #include "seccomp.h"
     31 
     32 static struct sock_filter filter_task[] = {
     33 	KORE_SYSCALL_ALLOW(clone),
     34 	KORE_SYSCALL_ALLOW(socketpair),
     35 	KORE_SYSCALL_ALLOW(set_robust_list),
     36 };
     37 #endif
     38 
     39 static u_int8_t				threads;
     40 static TAILQ_HEAD(, kore_task_thread)	task_threads;
     41 
     42 u_int16_t	kore_task_threads = KORE_TASK_THREADS;
     43 
     44 static void	*task_thread(void *);
     45 static void	task_channel_read(int, void *, u_int32_t);
     46 static void	task_channel_write(int, void *, u_int32_t);
     47 static void	task_thread_spawn(struct kore_task_thread **);
     48 
     49 #define THREAD_FD_ASSIGN(t, f, i, o)				\
     50 	do {							\
     51 		if (pthread_self() == t) {			\
     52 			f = i;					\
     53 		} else {					\
     54 			f = o;					\
     55 		}						\
     56 	} while (0);
     57 
     58 void
     59 kore_task_init(void)
     60 {
     61 	threads = 0;
     62 	TAILQ_INIT(&task_threads);
     63 
     64 #if defined(__linux__)
     65 	kore_seccomp_filter("task", filter_task, KORE_FILTER_LEN(filter_task));
     66 #endif
     67 }
     68 
     69 void
     70 kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *))
     71 {
     72 	t->cb = NULL;
     73 #if !defined(KORE_NO_HTTP)
     74 	t->req = NULL;
     75 #endif
     76 	t->evt.type = KORE_TYPE_TASK;
     77 	t->evt.handle = kore_task_handle;
     78 
     79 	t->entry = entry;
     80 	t->state = KORE_TASK_STATE_CREATED;
     81 	pthread_rwlock_init(&(t->lock), NULL);
     82 
     83 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
     84 		fatal("kore_task_create: socketpair() %s", errno_s);
     85 }
     86 
     87 void
     88 kore_task_run(struct kore_task *t)
     89 {
     90 	struct kore_task_thread		*tt;
     91 
     92 	kore_platform_schedule_read(t->fds[0], t);
     93 	if (threads < kore_task_threads) {
     94 		/* task_thread_spawn() will lock tt->lock for us. */
     95 		task_thread_spawn(&tt);
     96 	} else {
     97 		/* Cycle task around. */
     98 		if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
     99 			fatal("no available tasks threads?");
    100 		pthread_mutex_lock(&(tt->lock));
    101 		TAILQ_REMOVE(&task_threads, tt, list);
    102 		TAILQ_INSERT_TAIL(&task_threads, tt, list);
    103 	}
    104 
    105 	t->thread = tt;
    106 	TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
    107 
    108 	pthread_mutex_unlock(&(tt->lock));
    109 	pthread_cond_signal(&(tt->cond));
    110 }
    111 
    112 #if !defined(KORE_NO_HTTP)
    113 void
    114 kore_task_bind_request(struct kore_task *t, struct http_request *req)
    115 {
    116 	kore_debug("kore_task_bind_request: %p bound to %p", req, t);
    117 
    118 	if (t->cb != NULL)
    119 		fatal("cannot bind cbs and requests at the same time");
    120 
    121 	t->req = req;
    122 	LIST_INSERT_HEAD(&(req->tasks), t, rlist);
    123 
    124 	http_request_sleep(req);
    125 }
    126 #endif
    127 
    128 void
    129 kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
    130 {
    131 #if !defined(KORE_NO_HTTP)
    132 	if (t->req != NULL)
    133 		fatal("cannot bind requests and cbs at the same time");
    134 #endif
    135 	t->cb = cb;
    136 }
    137 
    138 void
    139 kore_task_destroy(struct kore_task *t)
    140 {
    141 	kore_debug("kore_task_destroy: %p", t);
    142 
    143 #if !defined(KORE_NO_HTTP)
    144 	if (t->req != NULL) {
    145 		t->req = NULL;
    146 		LIST_REMOVE(t, rlist);
    147 	}
    148 #endif
    149 
    150 	pthread_rwlock_wrlock(&(t->lock));
    151 
    152 	if (t->fds[0] != -1) {
    153 		(void)close(t->fds[0]);
    154 		t->fds[0] = -1;
    155 	}
    156 
    157 	if (t->fds[1] != -1) {
    158 		(void)close(t->fds[1]);
    159 		t->fds[1] = -1;
    160 	}
    161 
    162 	pthread_rwlock_unlock(&(t->lock));
    163 	pthread_rwlock_destroy(&(t->lock));
    164 }
    165 
    166 int
    167 kore_task_finished(struct kore_task *t)
    168 {
    169 	return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
    170 }
    171 
    172 void
    173 kore_task_finish(struct kore_task *t)
    174 {
    175 	kore_debug("kore_task_finished: %p (%d)", t, t->result);
    176 	pthread_rwlock_wrlock(&(t->lock));
    177 
    178 	if (t->fds[1] != -1) {
    179 		(void)close(t->fds[1]);
    180 		t->fds[1] = -1;
    181 	}
    182 
    183 	pthread_rwlock_unlock(&(t->lock));
    184 }
    185 
    186 void
    187 kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
    188 {
    189 	int		fd;
    190 
    191 	kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len);
    192 
    193 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    194 	task_channel_write(fd, &len, sizeof(len));
    195 	task_channel_write(fd, data, len);
    196 }
    197 
    198 u_int32_t
    199 kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
    200 {
    201 	int		fd;
    202 	u_int32_t	dlen, bytes;
    203 
    204 	kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len);
    205 
    206 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    207 	task_channel_read(fd, &dlen, sizeof(dlen));
    208 
    209 	if (dlen > len)
    210 		bytes = len;
    211 	else
    212 		bytes = dlen;
    213 
    214 	task_channel_read(fd, out, bytes);
    215 
    216 	return (dlen);
    217 }
    218 
    219 void
    220 kore_task_handle(void *arg, int finished)
    221 {
    222 	struct kore_task	*t = arg;
    223 
    224 	kore_debug("kore_task_handle: %p, %d", t, finished);
    225 
    226 #if !defined(KORE_NO_HTTP)
    227 	if (t->req != NULL)
    228 		http_request_wakeup(t->req);
    229 #endif
    230 
    231 	if (finished) {
    232 		kore_platform_disable_read(t->fds[0]);
    233 		kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
    234 #if !defined(KORE_NO_HTTP)
    235 		if (t->req != NULL) {
    236 			if (t->req->flags & HTTP_REQUEST_DELETE)
    237 				kore_task_destroy(t);
    238 		}
    239 #endif
    240 	}
    241 
    242 	if (t->cb != NULL)
    243 		t->cb(t);
    244 }
    245 
    246 int
    247 kore_task_state(struct kore_task *t)
    248 {
    249 	int	s;
    250 
    251 	pthread_rwlock_rdlock(&(t->lock));
    252 	s = t->state;
    253 	pthread_rwlock_unlock(&(t->lock));
    254 
    255 	return (s);
    256 }
    257 
    258 void
    259 kore_task_set_state(struct kore_task *t, int state)
    260 {
    261 	pthread_rwlock_wrlock(&(t->lock));
    262 	t->state = state;
    263 	pthread_rwlock_unlock(&(t->lock));
    264 }
    265 
    266 int
    267 kore_task_result(struct kore_task *t)
    268 {
    269 	int	r;
    270 
    271 	pthread_rwlock_rdlock(&(t->lock));
    272 	r = t->result;
    273 	pthread_rwlock_unlock(&(t->lock));
    274 
    275 	return (r);
    276 }
    277 
    278 void
    279 kore_task_set_result(struct kore_task *t, int result)
    280 {
    281 	pthread_rwlock_wrlock(&(t->lock));
    282 	t->result = result;
    283 	pthread_rwlock_unlock(&(t->lock));
    284 }
    285 
    286 static void
    287 task_channel_write(int fd, void *data, u_int32_t len)
    288 {
    289 	ssize_t		r;
    290 	u_int8_t	*d;
    291 	u_int32_t	offset;
    292 
    293 	d = data;
    294 	offset = 0;
    295 	while (offset != len) {
    296 		r = send(fd, d + offset, len - offset, 0);
    297 		if (r == -1 && errno == EINTR)
    298 			continue;
    299 		if (r == -1)
    300 			fatal("task_channel_write: %s", errno_s);
    301 		offset += r;
    302 	}
    303 }
    304 
    305 static void
    306 task_channel_read(int fd, void *out, u_int32_t len)
    307 {
    308 	ssize_t		r;
    309 	u_int8_t	*d;
    310 	u_int32_t	offset;
    311 
    312 	d = out;
    313 	offset = 0;
    314 	while (offset != len) {
    315 		r = read(fd, d + offset, len - offset);
    316 		if (r == -1 && errno == EINTR)
    317 			continue;
    318 		if (r == -1)
    319 			fatal("task_channel_read: %s", errno_s);
    320 		if (r == 0)
    321 			fatal("task_channel_read: unexpected eof");
    322 
    323 		offset += r;
    324 	}
    325 }
    326 
    327 static void
    328 task_thread_spawn(struct kore_task_thread **out)
    329 {
    330 	struct kore_task_thread		*tt;
    331 
    332 	tt = kore_malloc(sizeof(*tt));
    333 	tt->idx = threads++;
    334 
    335 	TAILQ_INIT(&(tt->tasks));
    336 	pthread_cond_init(&(tt->cond), NULL);
    337 	pthread_mutex_init(&(tt->lock), NULL);
    338 	pthread_mutex_lock(&(tt->lock));
    339 	TAILQ_INSERT_TAIL(&task_threads, tt, list);
    340 
    341 	if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
    342 		fatal("pthread_create: %s", errno_s);
    343 
    344 	*out = tt;
    345 }
    346 
    347 static void *
    348 task_thread(void *arg)
    349 {
    350 	struct kore_task		*t;
    351 	struct kore_task_thread		*tt = arg;
    352 
    353 	kore_debug("task_thread: #%d starting", tt->idx);
    354 
    355 	pthread_mutex_lock(&(tt->lock));
    356 
    357 	for (;;) {
    358 		if (TAILQ_EMPTY(&(tt->tasks)))
    359 			pthread_cond_wait(&(tt->cond), &(tt->lock));
    360 
    361 		kore_debug("task_thread#%d: woke up", tt->idx);
    362 
    363 		t = TAILQ_FIRST(&(tt->tasks));
    364 		TAILQ_REMOVE(&(tt->tasks), t, list);
    365 		pthread_mutex_unlock(&(tt->lock));
    366 
    367 		kore_debug("task_thread#%d: executing %p", tt->idx, t);
    368 
    369 		kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
    370 		kore_task_set_result(t, t->entry(t));
    371 		kore_task_finish(t);
    372 
    373 		pthread_mutex_lock(&(tt->lock));
    374 	}
    375 
    376 	pthread_exit(NULL);
    377 
    378 	/* NOTREACHED */
    379 	return (NULL);
    380 }