kore

An easy to use, scalable and secure web application framework for writing web APIs in C.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

tasks.c (7467B)



      1 /*
      2  * Copyright (c) 2014 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/queue.h>
     19 #include <sys/socket.h>
     20 
     21 #include <pthread.h>
     22 #include <stdio.h>
     23 #include <stdlib.h>
     24 
     25 #include "kore.h"
     26 #include "http.h"
     27 #include "tasks.h"
     28 
     29 #if defined(__linux__)
     30 #include "seccomp.h"
     31 
     32 static struct sock_filter filter_task[] = {
     33 	KORE_SYSCALL_ALLOW(clone),
     34 	KORE_SYSCALL_ALLOW(socketpair),
     35 	KORE_SYSCALL_ALLOW(set_robust_list),
     36 };
     37 #endif
     38 
     39 static u_int8_t				threads;
     40 static TAILQ_HEAD(, kore_task_thread)	task_threads;
     41 
     42 u_int16_t	kore_task_threads = KORE_TASK_THREADS;
     43 
     44 static void	*task_thread(void *);
     45 static void	task_channel_read(int, void *, u_int32_t);
     46 static void	task_channel_write(int, void *, u_int32_t);
     47 static void	task_thread_spawn(struct kore_task_thread **);
     48 
     49 #define THREAD_FD_ASSIGN(t, f, i, o)				\
     50 	do {							\
     51 		if (pthread_self() == t) {			\
     52 			f = i;					\
     53 		} else {					\
     54 			f = o;					\
     55 		}						\
     56 	} while (0);
     57 
     58 void
     59 kore_task_init(void)
     60 {
     61 	threads = 0;
     62 	TAILQ_INIT(&task_threads);
     63 
     64 #if defined(__linux__)
     65 	kore_seccomp_filter("task", filter_task, KORE_FILTER_LEN(filter_task));
     66 #endif
     67 }
     68 
     69 void
     70 kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *))
     71 {
     72 	t->cb = NULL;
     73 #if !defined(KORE_NO_HTTP)
     74 	t->req = NULL;
     75 #endif
     76 	t->evt.type = KORE_TYPE_TASK;
     77 	t->evt.handle = kore_task_handle;
     78 
     79 	t->entry = entry;
     80 	t->state = KORE_TASK_STATE_CREATED;
     81 	pthread_rwlock_init(&(t->lock), NULL);
     82 
     83 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
     84 		fatal("kore_task_create: socketpair() %s", errno_s);
     85 }
     86 
     87 void
     88 kore_task_run(struct kore_task *t)
     89 {
     90 	struct kore_task_thread		*tt;
     91 
     92 	kore_platform_schedule_read(t->fds[0], t);
     93 	if (threads < kore_task_threads) {
     94 		/* task_thread_spawn() will lock tt->lock for us. */
     95 		task_thread_spawn(&tt);
     96 	} else {
     97 		/* Cycle task around. */
     98 		if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
     99 			fatal("no available tasks threads?");
    100 		pthread_mutex_lock(&(tt->lock));
    101 		TAILQ_REMOVE(&task_threads, tt, list);
    102 		TAILQ_INSERT_TAIL(&task_threads, tt, list);
    103 	}
    104 
    105 	t->thread = tt;
    106 	TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
    107 
    108 	pthread_mutex_unlock(&(tt->lock));
    109 	pthread_cond_signal(&(tt->cond));
    110 }
    111 
    112 #if !defined(KORE_NO_HTTP)
    113 void
    114 kore_task_bind_request(struct kore_task *t, struct http_request *req)
    115 {
    116 	if (t->cb != NULL)
    117 		fatal("cannot bind cbs and requests at the same time");
    118 
    119 	t->req = req;
    120 	LIST_INSERT_HEAD(&(req->tasks), t, rlist);
    121 
    122 	http_request_sleep(req);
    123 }
    124 #endif
    125 
    126 void
    127 kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
    128 {
    129 #if !defined(KORE_NO_HTTP)
    130 	if (t->req != NULL)
    131 		fatal("cannot bind requests and cbs at the same time");
    132 #endif
    133 	t->cb = cb;
    134 }
    135 
    136 void
    137 kore_task_destroy(struct kore_task *t)
    138 {
    139 #if !defined(KORE_NO_HTTP)
    140 	if (t->req != NULL) {
    141 		t->req = NULL;
    142 		LIST_REMOVE(t, rlist);
    143 	}
    144 #endif
    145 
    146 	pthread_rwlock_wrlock(&(t->lock));
    147 
    148 	if (t->fds[0] != -1) {
    149 		(void)close(t->fds[0]);
    150 		t->fds[0] = -1;
    151 	}
    152 
    153 	if (t->fds[1] != -1) {
    154 		(void)close(t->fds[1]);
    155 		t->fds[1] = -1;
    156 	}
    157 
    158 	pthread_rwlock_unlock(&(t->lock));
    159 	pthread_rwlock_destroy(&(t->lock));
    160 }
    161 
    162 int
    163 kore_task_finished(struct kore_task *t)
    164 {
    165 	return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
    166 }
    167 
    168 void
    169 kore_task_finish(struct kore_task *t)
    170 {
    171 	pthread_rwlock_wrlock(&(t->lock));
    172 
    173 	if (t->fds[1] != -1) {
    174 		(void)close(t->fds[1]);
    175 		t->fds[1] = -1;
    176 	}
    177 
    178 	pthread_rwlock_unlock(&(t->lock));
    179 }
    180 
    181 void
    182 kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
    183 {
    184 	int		fd;
    185 
    186 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    187 	task_channel_write(fd, &len, sizeof(len));
    188 	task_channel_write(fd, data, len);
    189 }
    190 
    191 u_int32_t
    192 kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
    193 {
    194 	int		fd;
    195 	u_int32_t	dlen, bytes;
    196 
    197 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
    198 	task_channel_read(fd, &dlen, sizeof(dlen));
    199 
    200 	if (dlen > len)
    201 		bytes = len;
    202 	else
    203 		bytes = dlen;
    204 
    205 	task_channel_read(fd, out, bytes);
    206 
    207 	return (dlen);
    208 }
    209 
    210 void
    211 kore_task_handle(void *arg, int finished)
    212 {
    213 	struct kore_task	*t = arg;
    214 
    215 #if !defined(KORE_NO_HTTP)
    216 	if (t->req != NULL)
    217 		http_request_wakeup(t->req);
    218 #endif
    219 
    220 	if (finished) {
    221 		kore_platform_disable_read(t->fds[0]);
    222 		kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
    223 #if !defined(KORE_NO_HTTP)
    224 		if (t->req != NULL) {
    225 			if (t->req->flags & HTTP_REQUEST_DELETE)
    226 				kore_task_destroy(t);
    227 		}
    228 #endif
    229 	}
    230 
    231 	if (t->cb != NULL)
    232 		t->cb(t);
    233 }
    234 
    235 int
    236 kore_task_state(struct kore_task *t)
    237 {
    238 	int	s;
    239 
    240 	pthread_rwlock_rdlock(&(t->lock));
    241 	s = t->state;
    242 	pthread_rwlock_unlock(&(t->lock));
    243 
    244 	return (s);
    245 }
    246 
    247 void
    248 kore_task_set_state(struct kore_task *t, int state)
    249 {
    250 	pthread_rwlock_wrlock(&(t->lock));
    251 	t->state = state;
    252 	pthread_rwlock_unlock(&(t->lock));
    253 }
    254 
    255 int
    256 kore_task_result(struct kore_task *t)
    257 {
    258 	int	r;
    259 
    260 	pthread_rwlock_rdlock(&(t->lock));
    261 	r = t->result;
    262 	pthread_rwlock_unlock(&(t->lock));
    263 
    264 	return (r);
    265 }
    266 
    267 void
    268 kore_task_set_result(struct kore_task *t, int result)
    269 {
    270 	pthread_rwlock_wrlock(&(t->lock));
    271 	t->result = result;
    272 	pthread_rwlock_unlock(&(t->lock));
    273 }
    274 
    275 static void
    276 task_channel_write(int fd, void *data, u_int32_t len)
    277 {
    278 	ssize_t		r;
    279 	u_int8_t	*d;
    280 	u_int32_t	offset;
    281 
    282 	d = data;
    283 	offset = 0;
    284 	while (offset != len) {
    285 		r = send(fd, d + offset, len - offset, 0);
    286 		if (r == -1 && errno == EINTR)
    287 			continue;
    288 		if (r == -1)
    289 			fatal("task_channel_write: %s", errno_s);
    290 		offset += r;
    291 	}
    292 }
    293 
    294 static void
    295 task_channel_read(int fd, void *out, u_int32_t len)
    296 {
    297 	ssize_t		r;
    298 	u_int8_t	*d;
    299 	u_int32_t	offset;
    300 
    301 	d = out;
    302 	offset = 0;
    303 	while (offset != len) {
    304 		r = read(fd, d + offset, len - offset);
    305 		if (r == -1 && errno == EINTR)
    306 			continue;
    307 		if (r == -1)
    308 			fatal("task_channel_read: %s", errno_s);
    309 		if (r == 0)
    310 			fatal("task_channel_read: unexpected eof");
    311 
    312 		offset += r;
    313 	}
    314 }
    315 
    316 static void
    317 task_thread_spawn(struct kore_task_thread **out)
    318 {
    319 	struct kore_task_thread		*tt;
    320 
    321 	tt = kore_malloc(sizeof(*tt));
    322 	tt->idx = threads++;
    323 
    324 	TAILQ_INIT(&(tt->tasks));
    325 	pthread_cond_init(&(tt->cond), NULL);
    326 	pthread_mutex_init(&(tt->lock), NULL);
    327 	pthread_mutex_lock(&(tt->lock));
    328 	TAILQ_INSERT_TAIL(&task_threads, tt, list);
    329 
    330 	if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
    331 		fatal("pthread_create: %s", errno_s);
    332 
    333 	*out = tt;
    334 }
    335 
    336 static void *
    337 task_thread(void *arg)
    338 {
    339 	struct kore_task		*t;
    340 	struct kore_task_thread		*tt = arg;
    341 
    342 	pthread_mutex_lock(&(tt->lock));
    343 
    344 	for (;;) {
    345 		if (TAILQ_EMPTY(&(tt->tasks)))
    346 			pthread_cond_wait(&(tt->cond), &(tt->lock));
    347 
    348 		t = TAILQ_FIRST(&(tt->tasks));
    349 		TAILQ_REMOVE(&(tt->tasks), t, list);
    350 		pthread_mutex_unlock(&(tt->lock));
    351 
    352 		kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
    353 		kore_task_set_result(t, t->entry(t));
    354 		kore_task_finish(t);
    355 
    356 		pthread_mutex_lock(&(tt->lock));
    357 	}
    358 
    359 	pthread_exit(NULL);
    360 
    361 	/* NOTREACHED */
    362 	return (NULL);
    363 }