tasks.c (7583B)
1 /*
2 * Copyright (c) 2014 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/param.h>
18 #include <sys/queue.h>
19 #include <sys/socket.h>
20
21 #include <pthread.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24
25 #include "kore.h"
26 #include "http.h"
27 #include "tasks.h"
28
29 #if defined(__linux__)
30 #include "seccomp.h"
31
32 static struct sock_filter filter_task[] = {
33 KORE_SYSCALL_ALLOW(clone),
34 KORE_SYSCALL_ALLOW(socketpair),
35 KORE_SYSCALL_ALLOW(set_robust_list),
36 #if defined(SYS_clone3)
37 KORE_SYSCALL_ALLOW(clone3),
38 #endif
39 #if defined(SYS_rseq)
40 KORE_SYSCALL_ALLOW(rseq),
41 #endif
42 };
43 #endif
44
45 static u_int8_t threads;
46 static TAILQ_HEAD(, kore_task_thread) task_threads;
47
48 u_int16_t kore_task_threads = KORE_TASK_THREADS;
49
50 static void *task_thread(void *);
51 static void task_channel_read(int, void *, u_int32_t);
52 static void task_channel_write(int, void *, u_int32_t);
53 static void task_thread_spawn(struct kore_task_thread **);
54
55 #define THREAD_FD_ASSIGN(t, f, i, o) \
56 do { \
57 if (pthread_self() == t) { \
58 f = i; \
59 } else { \
60 f = o; \
61 } \
62 } while (0);
63
64 void
65 kore_task_init(void)
66 {
67 threads = 0;
68 TAILQ_INIT(&task_threads);
69
70 #if defined(__linux__)
71 kore_seccomp_filter("task", filter_task, KORE_FILTER_LEN(filter_task));
72 #endif
73 }
74
75 void
76 kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *))
77 {
78 t->cb = NULL;
79 #if !defined(KORE_NO_HTTP)
80 t->req = NULL;
81 #endif
82 t->evt.type = KORE_TYPE_TASK;
83 t->evt.handle = kore_task_handle;
84
85 t->entry = entry;
86 t->state = KORE_TASK_STATE_CREATED;
87 pthread_rwlock_init(&(t->lock), NULL);
88
89 if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
90 fatal("kore_task_create: socketpair() %s", errno_s);
91 }
92
93 void
94 kore_task_run(struct kore_task *t)
95 {
96 struct kore_task_thread *tt;
97
98 kore_platform_schedule_read(t->fds[0], t);
99 if (threads < kore_task_threads) {
100 /* task_thread_spawn() will lock tt->lock for us. */
101 task_thread_spawn(&tt);
102 } else {
103 /* Cycle task around. */
104 if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
105 fatal("no available tasks threads?");
106 pthread_mutex_lock(&(tt->lock));
107 TAILQ_REMOVE(&task_threads, tt, list);
108 TAILQ_INSERT_TAIL(&task_threads, tt, list);
109 }
110
111 t->thread = tt;
112 TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
113
114 pthread_mutex_unlock(&(tt->lock));
115 pthread_cond_signal(&(tt->cond));
116 }
117
118 #if !defined(KORE_NO_HTTP)
119 void
120 kore_task_bind_request(struct kore_task *t, struct http_request *req)
121 {
122 if (t->cb != NULL)
123 fatal("cannot bind cbs and requests at the same time");
124
125 t->req = req;
126 LIST_INSERT_HEAD(&(req->tasks), t, rlist);
127
128 http_request_sleep(req);
129 }
130 #endif
131
132 void
133 kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
134 {
135 #if !defined(KORE_NO_HTTP)
136 if (t->req != NULL)
137 fatal("cannot bind requests and cbs at the same time");
138 #endif
139 t->cb = cb;
140 }
141
142 void
143 kore_task_destroy(struct kore_task *t)
144 {
145 #if !defined(KORE_NO_HTTP)
146 if (t->req != NULL) {
147 t->req = NULL;
148 LIST_REMOVE(t, rlist);
149 }
150 #endif
151
152 pthread_rwlock_wrlock(&(t->lock));
153
154 if (t->fds[0] != -1) {
155 (void)close(t->fds[0]);
156 t->fds[0] = -1;
157 }
158
159 if (t->fds[1] != -1) {
160 (void)close(t->fds[1]);
161 t->fds[1] = -1;
162 }
163
164 pthread_rwlock_unlock(&(t->lock));
165 pthread_rwlock_destroy(&(t->lock));
166 }
167
168 int
169 kore_task_finished(struct kore_task *t)
170 {
171 return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
172 }
173
174 void
175 kore_task_finish(struct kore_task *t)
176 {
177 pthread_rwlock_wrlock(&(t->lock));
178
179 if (t->fds[1] != -1) {
180 (void)close(t->fds[1]);
181 t->fds[1] = -1;
182 }
183
184 pthread_rwlock_unlock(&(t->lock));
185 }
186
187 void
188 kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
189 {
190 int fd;
191
192 THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
193 task_channel_write(fd, &len, sizeof(len));
194 task_channel_write(fd, data, len);
195 }
196
197 u_int32_t
198 kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
199 {
200 int fd;
201 u_int32_t dlen, bytes;
202
203 THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
204 task_channel_read(fd, &dlen, sizeof(dlen));
205
206 if (dlen > len)
207 bytes = len;
208 else
209 bytes = dlen;
210
211 task_channel_read(fd, out, bytes);
212
213 return (dlen);
214 }
215
216 void
217 kore_task_handle(void *arg, int finished)
218 {
219 struct kore_task *t = arg;
220
221 #if !defined(KORE_NO_HTTP)
222 if (t->req != NULL)
223 http_request_wakeup(t->req);
224 #endif
225
226 if (finished) {
227 kore_platform_disable_read(t->fds[0]);
228 kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
229 #if !defined(KORE_NO_HTTP)
230 if (t->req != NULL) {
231 if (t->req->flags & HTTP_REQUEST_DELETE)
232 kore_task_destroy(t);
233 }
234 #endif
235 }
236
237 if (t->cb != NULL)
238 t->cb(t);
239 }
240
241 int
242 kore_task_state(struct kore_task *t)
243 {
244 int s;
245
246 pthread_rwlock_rdlock(&(t->lock));
247 s = t->state;
248 pthread_rwlock_unlock(&(t->lock));
249
250 return (s);
251 }
252
253 void
254 kore_task_set_state(struct kore_task *t, int state)
255 {
256 pthread_rwlock_wrlock(&(t->lock));
257 t->state = state;
258 pthread_rwlock_unlock(&(t->lock));
259 }
260
261 int
262 kore_task_result(struct kore_task *t)
263 {
264 int r;
265
266 pthread_rwlock_rdlock(&(t->lock));
267 r = t->result;
268 pthread_rwlock_unlock(&(t->lock));
269
270 return (r);
271 }
272
273 void
274 kore_task_set_result(struct kore_task *t, int result)
275 {
276 pthread_rwlock_wrlock(&(t->lock));
277 t->result = result;
278 pthread_rwlock_unlock(&(t->lock));
279 }
280
281 static void
282 task_channel_write(int fd, void *data, u_int32_t len)
283 {
284 ssize_t r;
285 u_int8_t *d;
286 u_int32_t offset;
287
288 d = data;
289 offset = 0;
290 while (offset != len) {
291 r = send(fd, d + offset, len - offset, 0);
292 if (r == -1 && errno == EINTR)
293 continue;
294 if (r == -1)
295 fatal("task_channel_write: %s", errno_s);
296 offset += r;
297 }
298 }
299
300 static void
301 task_channel_read(int fd, void *out, u_int32_t len)
302 {
303 ssize_t r;
304 u_int8_t *d;
305 u_int32_t offset;
306
307 d = out;
308 offset = 0;
309 while (offset != len) {
310 r = read(fd, d + offset, len - offset);
311 if (r == -1 && errno == EINTR)
312 continue;
313 if (r == -1)
314 fatal("task_channel_read: %s", errno_s);
315 if (r == 0)
316 fatal("task_channel_read: unexpected eof");
317
318 offset += r;
319 }
320 }
321
322 static void
323 task_thread_spawn(struct kore_task_thread **out)
324 {
325 struct kore_task_thread *tt;
326
327 tt = kore_malloc(sizeof(*tt));
328 tt->idx = threads++;
329
330 TAILQ_INIT(&(tt->tasks));
331 pthread_cond_init(&(tt->cond), NULL);
332 pthread_mutex_init(&(tt->lock), NULL);
333 pthread_mutex_lock(&(tt->lock));
334 TAILQ_INSERT_TAIL(&task_threads, tt, list);
335
336 if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
337 fatal("pthread_create: %s", errno_s);
338
339 *out = tt;
340 }
341
342 static void *
343 task_thread(void *arg)
344 {
345 struct kore_task *t;
346 struct kore_task_thread *tt = arg;
347
348 pthread_mutex_lock(&(tt->lock));
349
350 for (;;) {
351 if (TAILQ_EMPTY(&(tt->tasks)))
352 pthread_cond_wait(&(tt->cond), &(tt->lock));
353
354 t = TAILQ_FIRST(&(tt->tasks));
355 TAILQ_REMOVE(&(tt->tasks), t, list);
356 pthread_mutex_unlock(&(tt->lock));
357
358 kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
359 kore_task_set_result(t, t->entry(t));
360 kore_task_finish(t);
361
362 pthread_mutex_lock(&(tt->lock));
363 }
364
365 pthread_exit(NULL);
366
367 /* NOTREACHED */
368 return (NULL);
369 }