kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 8b9f7a6c125792573af01d6487713ab296a14f4f
parent eaef4b654a78fa5368a3b4a0d3ee4e690d6ee73d
Author: Joris Vink <joris@coders.se>
Date:   Mon, 17 Aug 2020 15:15:04 +0200

improve our asynchronous curl support.

- Remove the edge trigger io hacks we had in place.
- Use level triggered io for the libcurl fds instead.
- Batch all curl events together and process them at the end
  of our worker event loop.

Diffstat:
include/kore/curl.h | 1+
include/kore/kore.h | 2++
src/bsd.c | 15++++++++++++++-
src/curl.c | 101++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
src/linux.c | 12++++++++++++
src/worker.c | 1+
6 files changed, 89 insertions(+), 43 deletions(-)

diff --git a/include/kore/curl.h b/include/kore/curl.h @@ -71,6 +71,7 @@ extern u_int64_t kore_curl_recv_max; void kore_curl_sysinit(void); void kore_curl_do_timeout(void); +void kore_curl_run_scheduled(void); void kore_curl_run(struct kore_curl *); void kore_curl_cleanup(struct kore_curl *); int kore_curl_success(struct kore_curl *); diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -743,6 +743,8 @@ void kore_platform_enable_accept(void); void kore_platform_disable_accept(void); void kore_platform_event_wait(u_int64_t); void kore_platform_event_all(int, void *); +void kore_platform_event_level_all(int, void *); +void kore_platform_event_level_read(int, void *); void kore_platform_proctitle(const char *); void kore_platform_schedule_read(int, void *); void kore_platform_schedule_write(int, void *); diff --git a/src/bsd.c b/src/bsd.c @@ -160,12 +160,25 @@ kore_platform_event_all(int fd, void *c) } void +kore_platform_event_level_all(int fd, void *c) +{ + kore_platform_event_schedule(fd, EVFILT_READ, EV_ADD, c); + kore_platform_event_schedule(fd, EVFILT_WRITE, EV_ADD, c); +} + +void +kore_platform_event_level_read(int fd, void *c) +{ + kore_platform_event_schedule(fd, EVFILT_READ, EV_ADD, c); +} + +void kore_platform_event_schedule(int fd, int type, int flags, void *data) { struct kevent event[1]; EV_SET(&event[0], fd, type, flags, 0, 0, data); - if (kevent(kfd, event, 1, NULL, 0, NULL) == -1) + if (kevent(kfd, event, 1, NULL, 0, NULL) == -1 && errno != ENOENT) fatal("kevent: %s", errno_s); } diff --git a/src/curl.c b/src/curl.c @@ -62,14 +62,24 @@ struct fd_cache { LIST_ENTRY(fd_cache) list; }; +struct curl_run { + int eof; + struct fd_cache *fdc; + TAILQ_ENTRY(curl_run) list; +}; + static void curl_process(void); static void curl_event_handle(void *, int); static void curl_timeout(void *, u_int64_t); static int curl_timer(CURLM *, long, void *); +static void curl_run_handle(struct curl_run *); +static void curl_run_schedule(struct fd_cache *, int); static int curl_socket(CURL *, curl_socket_t, int, void *, void *); static struct fd_cache *fd_cache_get(int); +static TAILQ_HEAD(, curl_run) runlist; +static struct kore_pool run_pool; static int running = 0; static CURLM *multi = NULL; static struct kore_timer *timer = NULL; @@ -107,8 +117,11 @@ kore_curl_sysinit(void) for (i = 0; i < FD_CACHE_BUCKETS; i++) LIST_INIT(&cache[i]); + TAILQ_INIT(&runlist); + kore_pool_init(&fd_cache_pool, "fd_cache_pool", 100, sizeof(struct fd_cache)); + kore_pool_init(&run_pool, "run_pool", 100, sizeof(struct curl_run)); len = snprintf(user_agent, sizeof(user_agent), "kore/%s", kore_version); if (len == -1 || (size_t)len >= sizeof(user_agent)) @@ -206,6 +219,20 @@ kore_curl_do_timeout(void) } } +void +kore_curl_run_scheduled(void) +{ + struct curl_run *run; + + while ((run = TAILQ_FIRST(&runlist))) { + TAILQ_REMOVE(&runlist, run, list); + curl_run_handle(run); + kore_pool_put(&run_pool, run); + } + + curl_process(); +} + size_t kore_curl_tobuf(char *ptr, size_t size, size_t nmemb, void *udata) { @@ -529,10 +556,19 @@ curl_socket(CURL *easy, curl_socket_t fd, int action, void *arg, void *sock) case CURL_POLL_NONE: break; case CURL_POLL_IN: + if (fdc->scheduled) { + kore_platform_disable_read(fd); +#if !defined(__linux__) + kore_platform_disable_write(fd); +#endif + } + fdc->scheduled = 1; + kore_platform_event_level_read(fd, fdc); + break; case CURL_POLL_OUT: case CURL_POLL_INOUT: if (fdc->scheduled == 0) { - kore_platform_event_all(fd, fdc); + kore_platform_event_level_all(fd, fdc); fdc->scheduled = 1; } break; @@ -550,22 +586,8 @@ curl_socket(CURL *easy, curl_socket_t fd, int action, void *arg, void *sock) fatal("unknown action value: %d", action); } - /* - * XXX - libcurl hates edge triggered io. - */ - if (action == CURL_POLL_OUT || action == CURL_POLL_INOUT) { - if (fdc->evt.flags & KORE_EVENT_WRITE) { - if (fdc->scheduled) { - kore_platform_disable_read(fdc->fd); -#if !defined(__linux__) - kore_platform_disable_write(fdc->fd); -#endif - } - - fdc->evt.flags = 0; - kore_platform_event_all(fdc->fd, fdc); - } - } + if (action != CURL_POLL_NONE && action != CURL_POLL_REMOVE) + curl_run_schedule(fdc, 0); return (CURLM_OK); } @@ -657,13 +679,29 @@ curl_timer(CURLM *mctx, long timeout, void *arg) } static void +curl_run_schedule(struct fd_cache *fdc, int eof) +{ + struct curl_run *run; + + run = kore_pool_get(&run_pool); + run->fdc = fdc; + run->eof = eof; + + TAILQ_INSERT_TAIL(&runlist, run, list); +} + +static void curl_event_handle(void *arg, int eof) { + curl_run_schedule(arg, eof); +} + +static void +curl_run_handle(struct curl_run *run) +{ CURLMcode res; int flags; - ssize_t bytes; - char buf[32]; - struct fd_cache *fdc = arg; + struct fd_cache *fdc = run->fdc; flags = 0; @@ -673,33 +711,12 @@ curl_event_handle(void *arg, int eof) if (fdc->evt.flags & KORE_EVENT_WRITE) flags |= CURL_CSELECT_OUT; - if (eof) + if (run->eof) flags |= CURL_CSELECT_ERR; res = curl_multi_socket_action(multi, fdc->fd, flags, &running); if (res != CURLM_OK) fatal("curl_multi_socket_action: %s", curl_multi_strerror(res)); - - /* - * XXX - libcurl doesn't work with edge triggered i/o so check - * if we need to reprime the event. Not optimal. - */ - if (fdc->evt.flags & KORE_EVENT_READ) { - bytes = recv(fdc->fd, buf, sizeof(buf), MSG_PEEK); - if (bytes > 0) { - if (fdc->scheduled) { - kore_platform_disable_read(fdc->fd); -#if !defined(__linux__) - kore_platform_disable_write(fdc->fd); -#endif - } - - fdc->evt.flags = 0; - kore_platform_event_all(fdc->fd, fdc); - } - } - - curl_process(); } static struct fd_cache * diff --git a/src/linux.c b/src/linux.c @@ -142,6 +142,18 @@ kore_platform_event_wait(u_int64_t timer) } void +kore_platform_event_level_all(int fd, void *c) +{ + kore_platform_event_schedule(fd, EPOLLIN | EPOLLOUT | EPOLLRDHUP, 0, c); +} + +void +kore_platform_event_level_read(int fd, void *c) +{ + kore_platform_event_schedule(fd, EPOLLIN | EPOLLRDHUP, 0, c); +} + +void kore_platform_event_all(int fd, void *c) { kore_platform_event_schedule(fd, diff --git a/src/worker.c b/src/worker.c @@ -517,6 +517,7 @@ kore_worker_entry(struct kore_worker *kw) kore_timer_run(now); #if defined(KORE_USE_CURL) + kore_curl_run_scheduled(); kore_curl_do_timeout(); #endif #if !defined(KORE_NO_HTTP)