kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit cda09b60650973cf7d1bf152598d5c0ad241ea63
parent e3ae1b4e2d59283fab54349858fd518cca65099a
Author: Joris Vink <joris@coders.se>
Date:   Thu,  7 Aug 2014 14:23:26 +0200

Add http_response_stream() which can stream data from a buffer to the client.

Diffstat:
includes/http.h | 2++
includes/kore.h | 3+++
src/connection.c | 3++-
src/http.c | 34++++++++++++++++++++++++++++++----
src/net.c | 34++++++++++++++++++++++++++++++++--
5 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/includes/http.h b/includes/http.h @@ -201,6 +201,8 @@ char *http_post_data_text(struct http_request *); void http_process_request(struct http_request *, int); u_int8_t *http_post_data_bytes(struct http_request *, u_int32_t *); void http_response(struct http_request *, int, void *, u_int32_t); +void http_response_stream(struct http_request *, int, + void *, u_int64_t, u_int64_t); int http_request_header(struct http_request *, const char *, char **); void http_response_header(struct http_request *, diff --git a/includes/kore.h b/includes/kore.h @@ -75,6 +75,7 @@ extern int daemon(int, int); #define NETBUF_CALL_CB_ALWAYS 0x01 #define NETBUF_FORCE_REMOVE 0x02 #define NETBUF_MUST_RESEND 0x04 +#define NETBUF_IS_STREAM 0x10 #define X509_GET_CN(c, o, l) \ X509_NAME_get_text_by_NID(X509_get_subject_name(c), \ @@ -453,6 +454,8 @@ int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); void net_send_queue(struct connection *, void *, u_int32_t, struct spdy_stream *); +void net_send_stream(struct connection *, void *, + u_int32_t, struct spdy_stream *); void kore_buf_free(struct kore_buf *); struct kore_buf *kore_buf_create(u_int32_t); diff --git a/src/connection.c b/src/connection.c @@ -275,7 +275,8 @@ kore_connection_remove(struct connection *c) for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); TAILQ_REMOVE(&(c->send_queue), nb, list); - kore_mem_free(nb->buf); + if (!(nb->flags & NETBUF_IS_STREAM)) + kore_mem_free(nb->buf); kore_pool_put(&nb_pool, nb); } diff --git a/src/http.c b/src/http.c @@ -385,6 +385,8 @@ http_response(struct http_request *req, int status, void *d, u_int32_t l) switch (req->owner->proto) { case CONN_PROTO_SPDY: http_response_spdy(req, req->owner, req->stream, status, d, l); + spdy_frame_send(req->owner, SPDY_DATA_FRAME, + FLAG_FIN, 0, req->stream, 0); break; case CONN_PROTO_HTTP: http_response_normal(req, req->owner, status, d, l); @@ -392,6 +394,29 @@ http_response(struct http_request *req, int status, void *d, u_int32_t l) } } +void +http_response_stream(struct http_request *req, int status, void *base, + u_int64_t start, u_int64_t end) +{ + u_int8_t *d; + u_int64_t len; + + len = end - start; + req->status = status; + + switch (req->owner->proto) { + case CONN_PROTO_SPDY: + http_response_spdy(req, req->owner, req->stream, status, NULL, len); + break; + case CONN_PROTO_HTTP: + http_response_normal(req, req->owner, status, NULL, len); + break; + } + + d = base; + net_send_stream(req->owner, d + start, end - start, req->stream); +} + int http_request_header(struct http_request *req, const char *header, char **out) { @@ -1008,6 +1033,7 @@ http_error_response(struct connection *c, struct spdy_stream *s, int status) switch (c->proto) { case CONN_PROTO_SPDY: http_response_spdy(NULL, c, s, status, NULL, 0); + spdy_frame_send(c, SPDY_DATA_FRAME, FLAG_FIN, 0, s, 0); break; case CONN_PROTO_HTTP: if (s != NULL) @@ -1067,10 +1093,10 @@ http_response_spdy(struct http_request *req, struct connection *c, if (len > 0) { req->stream->send_size += len; spdy_frame_send(c, SPDY_DATA_FRAME, 0, len, s, 0); - net_send_queue(c, d, len, s); - } - spdy_frame_send(c, SPDY_DATA_FRAME, FLAG_FIN, 0, s, 0); + if (d != NULL) + net_send_queue(c, d, len, s); + } } static void @@ -1139,7 +1165,7 @@ http_response_normal(struct http_request *req, struct connection *c, net_send_queue(c, htext, hlen, NULL); kore_mem_free(htext); - if (len > 0) + if (d != NULL) net_send_queue(c, d, len, NULL); if (!(c->flags & CONN_CLOSE_EMPTY)) { diff --git a/src/net.c b/src/net.c @@ -75,6 +75,26 @@ net_send_queue(struct connection *c, void *data, u_int32_t len, } void +net_send_stream(struct connection *c, void *data, u_int32_t len, + struct spdy_stream *s) +{ + struct netbuf *nb; + + nb = kore_pool_get(&nb_pool); + nb->cb = NULL; + nb->owner = c; + nb->s_off = 0; + nb->buf = data; + nb->stream = s; + nb->b_len = len; + nb->m_len = nb->b_len; + nb->type = NETBUF_SEND; + nb->flags = NETBUF_IS_STREAM; + + TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); +} + +void net_recv_queue(struct connection *c, size_t len, int flags, struct netbuf **out, int (*cb)(struct netbuf *)) { @@ -168,8 +188,15 @@ net_send(struct connection *c) spdy_update_wsize(c, nb->stream, r); } - if (nb->s_off == nb->b_len) + if (nb->s_off == nb->b_len) { + if (nb->stream != NULL && + (nb->flags & NETBUF_IS_STREAM)) { + spdy_frame_send(c, SPDY_DATA_FRAME, + FLAG_FIN, 0, nb->stream, 0); + } + net_remove_netbuf(&(c->send_queue), nb); + } } return (KORE_RESULT_OK); @@ -272,13 +299,16 @@ void net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) { nb->stream = NULL; + if (nb->flags & NETBUF_MUST_RESEND) { kore_debug("retaining %p (MUST_RESEND)", nb); return; } + if (!(nb->flags & NETBUF_IS_STREAM)) + kore_mem_free(nb->buf); + TAILQ_REMOVE(list, nb, list); - kore_mem_free(nb->buf); kore_pool_put(&nb_pool, nb); }