kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 351eec7eb44998093ffd90f2e0aa0ecdbacf2330
parent f5a58368b7ea9bf99e95ae24fcb235babc80dafc
Author: Joris Vink <joris@coders.se>
Date:   Fri, 17 Sep 2021 19:28:06 +0200

Add the on_body_chunk handler for routes.

If set, will call a given handler with the prototype of

`void body_chunk(struct http_request *req, const void *data, size_t len);`

for each chunk of the received HTTP body, allowing a developer to handle
it in their own way.

The incoming body is still being handled and retained in the same way
as before (in a kore_buf or temporary file).

While here, allow HTTP_STATUS_CONTINUE to work via http_response() and
make the handling of incoming HTTP header data a bit better.

Diffstat:
include/kore/kore.h | 4++++
src/http.c | 91+++++++++++++++++++++++++++++++++++++------------------------------------------
src/net.c | 3+++
src/runtime.c | 21+++++++++++++++++++++
4 files changed, 71 insertions(+), 48 deletions(-)

diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -287,6 +287,8 @@ struct kore_runtime { int type; #if !defined(KORE_NO_HTTP) int (*http_request)(void *, struct http_request *); + void (*http_body_chunk)(void *, + struct http_request *, const void *, size_t); int (*validator)(void *, struct http_request *, const void *); void (*wsconnect)(void *, struct connection *); void (*wsdisconnect)(void *, struct connection *); @@ -986,6 +988,8 @@ void kore_runtime_connect(struct kore_runtime_call *, struct connection *); #if !defined(KORE_NO_HTTP) int kore_runtime_http_request(struct kore_runtime_call *, struct http_request *); +void kore_runtime_http_body_chunk(struct kore_runtime_call *, + struct http_request *, const void *, size_t); int kore_runtime_validator(struct kore_runtime_call *, struct http_request *, const void *); void kore_runtime_wsconnect(struct kore_runtime_call *, struct connection *); diff --git a/src/http.c b/src/http.c @@ -132,6 +132,7 @@ static int http_release_buffer(struct netbuf *); static void http_error_response(struct connection *, int); static int http_data_convert(void *, void **, void *, int); static void http_write_response_cookie(struct http_cookie *); +static int http_body_update(struct http_request *, const void *, size_t); static void http_argument_add(struct http_request *, char *, char *, int, int); static int http_check_redirect(struct http_request *, @@ -806,10 +807,8 @@ http_header_recv(struct netbuf *nb) { struct connection *c; size_t len; - ssize_t ret; struct http_header *hdr; struct http_request *req; - u_int64_t bytes_left; u_int8_t *end_headers; int h, i, v, skip, l; char *headers[HTTP_REQ_HEADER_MAX]; @@ -821,6 +820,11 @@ http_header_recv(struct netbuf *nb) if (nb->b_len < 4) return (KORE_RESULT_OK); + if (!isalpha(nb->buf[0])) { + http_error_response(c, HTTP_STATUS_BAD_REQUEST); + return (KORE_RESULT_ERROR); + } + skip = 4; end_headers = kore_mem_find(nb->buf, nb->s_off, "\r\n\r\n", 4); if (end_headers == NULL) { @@ -962,47 +966,19 @@ http_header_recv(struct netbuf *nb) HTTP_STATUS_INTERNAL_ERROR); return (KORE_RESULT_OK); } - - ret = write(req->http_body_fd, - end_headers, (nb->s_off - len)); - if (ret == -1 || (size_t)ret != (nb->s_off - len)) { - req->flags |= HTTP_REQUEST_DELETE; - http_error_response(req->owner, - HTTP_STATUS_INTERNAL_ERROR); - return (KORE_RESULT_OK); - } } else { req->http_body_fd = -1; req->http_body = kore_buf_alloc(req->content_length); - kore_buf_append(req->http_body, end_headers, - (nb->s_off - len)); } SHA256_Init(&req->hashctx); - SHA256_Update(&req->hashctx, end_headers, (nb->s_off - len)); - - bytes_left = req->content_length - (nb->s_off - len); - if (bytes_left > 0) { - kore_debug("%ld/%ld (%ld - %ld) more bytes for body", - bytes_left, req->content_length, nb->s_off, len); - net_recv_reset(c, - MIN(bytes_left, NETBUF_SEND_PAYLOAD_MAX), - http_body_recv); - c->rnb->extra = req; - http_request_sleep(req); - req->content_length = bytes_left; - c->http_timeout = http_body_timeout * 1000; - } else { - c->http_timeout = 0; - req->flags |= HTTP_REQUEST_COMPLETE; - req->flags &= ~HTTP_REQUEST_EXPECT_BODY; - SHA256_Final(req->http_body_digest, &req->hashctx); - if (!http_body_rewind(req)) { - req->flags |= HTTP_REQUEST_DELETE; - http_error_response(req->owner, - HTTP_STATUS_INTERNAL_ERROR); - return (KORE_RESULT_OK); - } + c->http_timeout = http_body_timeout * 1000; + + if (!http_body_update(req, end_headers, nb->s_off - len)) { + req->flags |= HTTP_REQUEST_DELETE; + http_error_response(req->owner, + HTTP_STATUS_INTERNAL_ERROR); + return (KORE_RESULT_OK); } } else { c->http_timeout = 0; @@ -1505,7 +1481,6 @@ http_start_recv(struct connection *c) c->http_start = kore_time_ms(); c->http_timeout = http_header_timeout * 1000; net_recv_reset(c, http_header_max, http_header_recv); - (void)net_recv_flush(c); } void @@ -2360,22 +2335,29 @@ http_argument_add(struct http_request *req, char *name, char *value, int qs, static int http_body_recv(struct netbuf *nb) { + struct http_request *req = (struct http_request *)nb->extra; + + return (http_body_update(req, nb->buf, nb->s_off)); +} + +static int +http_body_update(struct http_request *req, const void *data, size_t len) +{ ssize_t ret; u_int64_t bytes_left; - struct http_request *req = (struct http_request *)nb->extra; - SHA256_Update(&req->hashctx, nb->buf, nb->s_off); + SHA256_Update(&req->hashctx, data, len); if (req->http_body_fd != -1) { - ret = write(req->http_body_fd, nb->buf, nb->s_off); - if (ret == -1 || (size_t)ret != nb->s_off) { + ret = write(req->http_body_fd, data, len); + if (ret == -1 || (size_t)ret != len) { req->flags |= HTTP_REQUEST_DELETE; http_error_response(req->owner, HTTP_STATUS_INTERNAL_ERROR); return (KORE_RESULT_ERROR); } } else if (req->http_body != NULL) { - kore_buf_append(req->http_body, nb->buf, nb->s_off); + kore_buf_append(req->http_body, data, len); } else { req->flags |= HTTP_REQUEST_DELETE; http_error_response(req->owner, @@ -2383,10 +2365,10 @@ http_body_recv(struct netbuf *nb) return (KORE_RESULT_ERROR); } - req->content_length -= nb->s_off; + req->content_length -= len; if (req->content_length == 0) { - nb->extra = NULL; + req->owner->rnb->extra = NULL; http_request_wakeup(req); req->flags |= HTTP_REQUEST_COMPLETE; req->flags &= ~HTTP_REQUEST_EXPECT_BODY; @@ -2398,12 +2380,17 @@ http_body_recv(struct netbuf *nb) return (KORE_RESULT_ERROR); } SHA256_Final(req->http_body_digest, &req->hashctx); - net_recv_reset(nb->owner, http_header_max, http_header_recv); } else { bytes_left = req->content_length; - net_recv_reset(nb->owner, + net_recv_reset(req->owner, MIN(bytes_left, NETBUF_SEND_PAYLOAD_MAX), http_body_recv); + req->owner->rnb->extra = req; + } + + if (req->rt->on_body_chunk != NULL && len > 0) { + kore_runtime_http_body_chunk(req->rt->on_body_chunk, + req, data, len); } return (KORE_RESULT_OK); @@ -2442,7 +2429,6 @@ http_response_normal(struct http_request *req, struct connection *c, send_body = 1; text = http_status_text(status); - kore_buf_init(&buf, 1024); kore_buf_reset(header_buf); if (req != NULL) { @@ -2456,6 +2442,13 @@ http_response_normal(struct http_request *req, struct connection *c, kore_buf_appendf(header_buf, "HTTP/1.%c %d %s\r\n", version, status, text); + + if (status == 100) { + kore_buf_append(header_buf, "\r\n", 2); + net_send_queue(c, header_buf->data, header_buf->offset); + return; + } + kore_buf_append(header_buf, http_version, http_version_len); if ((c->flags & CONN_CLOSE_EMPTY) || @@ -2474,6 +2467,8 @@ http_response_normal(struct http_request *req, struct connection *c, } } + kore_buf_init(&buf, 1024); + /* Note that req CAN be NULL. */ if (req == NULL || req->owner->proto != CONN_PROTO_WEBSOCKET) { if (http_keepalive_time && connection_close == 0) { diff --git a/src/net.c b/src/net.c @@ -292,6 +292,9 @@ net_recv_flush(struct connection *c) if (c->rnb->buf == NULL) return (KORE_RESULT_OK); + if ((c->rnb->b_len - c->rnb->s_off) == 0) + return (KORE_RESULT_OK); + if (!c->read(c, &r)) return (KORE_RESULT_ERROR); if (!(c->evt.flags & KORE_EVENT_READ)) diff --git a/src/runtime.c b/src/runtime.c @@ -33,6 +33,8 @@ static void native_runtime_connect(void *, struct connection *); static void native_runtime_configure(void *, int, char **); #if !defined(KORE_NO_HTTP) static int native_runtime_http_request(void *, struct http_request *); +static void native_runtime_http_body_chunk(void *, struct http_request *, + const void *, size_t); static int native_runtime_validator(void *, struct http_request *, const void *); @@ -44,6 +46,7 @@ struct kore_runtime kore_native_runtime = { KORE_RUNTIME_NATIVE, #if !defined(KORE_NO_HTTP) .http_request = native_runtime_http_request, + .http_body_chunk= native_runtime_http_body_chunk, .validator = native_runtime_validator, .wsconnect = native_runtime_connect, .wsmessage = native_runtime_wsmessage, @@ -105,6 +108,13 @@ kore_runtime_http_request(struct kore_runtime_call *rcall, return (rcall->runtime->http_request(rcall->addr, req)); } +void +kore_runtime_http_body_chunk(struct kore_runtime_call *rcall, + struct http_request *req, const void *data, size_t len) +{ + rcall->runtime->http_body_chunk(rcall->addr, req, data, len); +} + int kore_runtime_validator(struct kore_runtime_call *rcall, struct http_request *req, const void *data) @@ -178,6 +188,17 @@ native_runtime_http_request(void *addr, struct http_request *req) return (cb(req)); } +static void +native_runtime_http_body_chunk(void *addr, struct http_request *req, + const void *data, size_t len) +{ + void (*cb)(struct http_request *, const void *, size_t); + + *(void **)&(cb) = addr; + + cb(req, data, len); +} + static int native_runtime_validator(void *addr, struct http_request *req, const void *data) {