kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit c665b7d92648d5b839d40b198610aa05d11cceb8
parent 10284d59b6ee5f48e496135839677a854ded024a
Author: Joris Vink <joris@coders.se>
Date:   Sun, 10 Aug 2014 18:46:44 +0200

Add a callback to http_response_stream().

This way we can get our code called whenever a stream is
completed. This cb handler does stand alone from an http_request
and is passed a netbuf data structure.

Diffstat:
includes/http.h | 4++--
includes/kore.h | 9+++++----
src/connection.c | 5++++-
src/http.c | 9++++++---
src/net.c | 15++++++++++-----
5 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/includes/http.h b/includes/http.h @@ -201,8 +201,8 @@ char *http_post_data_text(struct http_request *); void http_process_request(struct http_request *, int); u_int8_t *http_post_data_bytes(struct http_request *, u_int32_t *); void http_response(struct http_request *, int, void *, u_int32_t); -void http_response_stream(struct http_request *, int, - void *, u_int64_t); +void http_response_stream(struct http_request *, int, void *, + u_int64_t, int (*cb)(struct netbuf *), void *); int http_request_header(struct http_request *, const char *, char **); void http_response_header(struct http_request *, diff --git a/includes/kore.h b/includes/kore.h @@ -456,14 +456,15 @@ int net_send(struct connection *); int net_send_flush(struct connection *); int net_recv_flush(struct connection *); void net_remove_netbuf(struct netbuf_head *, struct netbuf *); -void net_recv_queue(struct connection *, size_t, int, +void net_recv_queue(struct connection *, u_int32_t, int, struct netbuf **, int (*cb)(struct netbuf *)); -int net_recv_expand(struct connection *c, struct netbuf *, size_t, - int (*cb)(struct netbuf *)); +int net_recv_expand(struct connection *c, struct netbuf *, + u_int32_t, int (*cb)(struct netbuf *)); void net_send_queue(struct connection *, void *, u_int32_t, struct spdy_stream *, int); void net_send_stream(struct connection *, void *, - u_int32_t, struct spdy_stream *); + u_int32_t, struct spdy_stream *, + int (*cb)(struct netbuf *), struct netbuf **); void kore_buf_free(struct kore_buf *); struct kore_buf *kore_buf_create(u_int32_t); diff --git a/src/connection.c b/src/connection.c @@ -276,8 +276,11 @@ kore_connection_remove(struct connection *c) for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { next = TAILQ_NEXT(nb, list); TAILQ_REMOVE(&(c->send_queue), nb, list); - if (!(nb->flags & NETBUF_IS_STREAM)) + if (!(nb->flags & NETBUF_IS_STREAM)) { kore_mem_free(nb->buf); + } else if (nb->cb != NULL) { + (void)nb->cb(nb); + } kore_pool_put(&nb_pool, nb); } diff --git a/src/http.c b/src/http.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Joris Vink <joris@coders.se> + * Copyright (c) 2013-2014 Joris Vink <joris@coders.se> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -394,8 +394,10 @@ http_response(struct http_request *req, int status, void *d, u_int32_t l) void http_response_stream(struct http_request *req, int status, void *base, - u_int64_t len) + u_int64_t len, int (*cb)(struct netbuf *), void *arg) { + struct netbuf *nb; + req->status = status; switch (req->owner->proto) { @@ -408,7 +410,8 @@ http_response_stream(struct http_request *req, int status, void *base, break; } - net_send_stream(req->owner, base, len, req->stream); + net_send_stream(req->owner, base, len, req->stream, cb, &nb); + nb->extra = arg; } int diff --git a/src/net.c b/src/net.c @@ -86,14 +86,14 @@ net_send_queue(struct connection *c, void *data, u_int32_t len, void net_send_stream(struct connection *c, void *data, u_int32_t len, - struct spdy_stream *s) + struct spdy_stream *s, int (*cb)(struct netbuf *), struct netbuf **out) { struct netbuf *nb; kore_debug("net_send_stream(%p, %p, %d, %p)", c, data, len, s); nb = kore_pool_get(&nb_pool); - nb->cb = NULL; + nb->cb = cb; nb->owner = c; nb->s_off = 0; nb->buf = data; @@ -104,10 +104,12 @@ net_send_stream(struct connection *c, void *data, u_int32_t len, nb->flags = NETBUF_IS_STREAM; TAILQ_INSERT_TAIL(&(c->send_queue), nb, list); + if (out != NULL) + *out = nb; } void -net_recv_queue(struct connection *c, size_t len, int flags, +net_recv_queue(struct connection *c, u_int32_t len, int flags, struct netbuf **out, int (*cb)(struct netbuf *)) { struct netbuf *nb; @@ -129,7 +131,7 @@ net_recv_queue(struct connection *c, size_t len, int flags, } int -net_recv_expand(struct connection *c, struct netbuf *nb, size_t len, +net_recv_expand(struct connection *c, struct netbuf *nb, u_int32_t len, int (*cb)(struct netbuf *)) { if (nb->type != NETBUF_RECV) { @@ -328,8 +330,11 @@ net_remove_netbuf(struct netbuf_head *list, struct netbuf *nb) return; } - if (!(nb->flags & NETBUF_IS_STREAM)) + if (!(nb->flags & NETBUF_IS_STREAM)) { kore_mem_free(nb->buf); + } else if (nb->cb != NULL) { + (void)nb->cb(nb); + } TAILQ_REMOVE(list, nb, list); kore_pool_put(&nb_pool, nb);