kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit ee22ec99d6908133e41c1bfb90ffd153d6b0493f
parent 6be54040f35dfad1eade7443e0801ed28ca3fcc8
Author: Joris Vink <joris@coders.se>
Date:   Fri, 24 Apr 2015 14:20:02 +0200

Task improvements.

Do not blindly close the sockets created by socketpair() when
finishing up or destroying a task.

Under heavy load this could turn into a race condition where
the task thread closes its endpoint when at the same time
a new task is registered and socketpair() returns the recently
closed socket back to a new task.

When the task that finished then gets destroyed it closes
the endpoint registered to a new task instead causing Kore
to fatal() out when attempting to read from said socket.

Diffstat:
src/tasks.c | 27++++++++++++++++++++++-----
1 file changed, 22 insertions(+), 5 deletions(-)

diff --git a/src/tasks.c b/src/tasks.c @@ -66,7 +66,7 @@ kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *)) t->state = KORE_TASK_STATE_CREATED; pthread_rwlock_init(&(t->lock), NULL); - if (socketpair(AF_UNIX, SOCK_STREAM, 0,t->fds) == -1) + if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1) fatal("kore_task_create: socketpair() %s", errno_s); } @@ -114,9 +114,19 @@ kore_task_destroy(struct kore_task *t) LIST_REMOVE(t, rlist); } - close(t->fds[0]); - close(t->fds[1]); /* This might already be closed. */ + pthread_rwlock_wrlock(&(t->lock)); + + if (t->fds[0] != -1) { + (void)close(t->fds[0]); + t->fds[0] = -1; + } + if (t->fds[1] != -1) { + (void)close(t->fds[1]); + t->fds[1] = -1; + } + + pthread_rwlock_unlock(&(t->lock)); pthread_rwlock_destroy(&(t->lock)); } @@ -130,7 +140,15 @@ void kore_task_finish(struct kore_task *t) { kore_debug("kore_task_finished: %p (%d)", t, t->result); - close(t->fds[1]); + + pthread_rwlock_wrlock(&(t->lock)); + + if (t->fds[1] != -1) { + (void)close(t->fds[1]); + t->fds[1] = -1; + } + + pthread_rwlock_unlock(&(t->lock)); } void @@ -139,7 +157,6 @@ kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len) int fd; kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len); - THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]); task_channel_write(fd, &len, sizeof(len)); task_channel_write(fd, data, len);