kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 5f983d575b4e7fa8766f7acdd977173bfd6ae252
parent 3918a7a0a22a818dcf41bebb7b4516bf25b5a487
Author: Joris Vink <joris@coders.se>
Date:   Thu,  2 May 2013 00:28:49 +0200

add flags to net_recv_queue() and net_send_queue(),.

NETBUF_CALL_CB_ALWAYS flag, calls the registered cb everytime data has
been received or sent.

Diffstat:
includes/kore.h | 7+++++--
src/http.c | 4++--
src/kore.c | 2+-
src/net.c | 22++++++++++++++--------
src/spdy.c | 14++++++--------
5 files changed, 28 insertions(+), 21 deletions(-)

diff --git a/includes/kore.h b/includes/kore.h @@ -34,6 +34,7 @@ struct netbuf { u_int32_t len; u_int8_t type; u_int8_t retain; + u_int8_t flags; void *owner; int (*cb)(struct netbuf *); @@ -57,6 +58,8 @@ struct listener { #define CONN_READ_POSSIBLE 0x01 #define CONN_WRITE_POSSIBLE 0x02 +#define NETBUF_CALL_CB_ALWAYS 0x01 + struct connection { int fd; int state; @@ -120,11 +123,11 @@ int net_recv(struct connection *); int net_send(struct connection *); int net_send_flush(struct connection *); int net_recv_flush(struct connection *); -int net_recv_queue(struct connection *, size_t, +int net_recv_queue(struct connection *, size_t, int, struct netbuf **, int (*cb)(struct netbuf *)); int net_recv_expand(struct connection *c, struct netbuf *, size_t, int (*cb)(struct netbuf *)); -int net_send_queue(struct connection *, u_int8_t *, size_t, +int net_send_queue(struct connection *, u_int8_t *, size_t, int, struct netbuf **, int (*cb)(struct netbuf *)); struct spdy_header_block *spdy_header_block_create(int); diff --git a/src/http.c b/src/http.c @@ -130,14 +130,14 @@ http_response(struct http_request *req, int status, u_int8_t *d, u_int32_t len) 0, hlen, req->stream->stream_id)) return (KORE_RESULT_ERROR); - if (!net_send_queue(req->owner, htext, hlen, NULL, NULL)) + if (!net_send_queue(req->owner, htext, hlen, 0, NULL, NULL)) return (KORE_RESULT_ERROR); if (len > 0) { if (!spdy_frame_send(req->owner, SPDY_DATA_FRAME, 0, len, req->stream->stream_id)) return (KORE_RESULT_ERROR); - if (!net_send_queue(req->owner, d, len, NULL, NULL)) + if (!net_send_queue(req->owner, d, len, 0, NULL, NULL)) return (KORE_RESULT_ERROR); } diff --git a/src/kore.c b/src/kore.c @@ -323,7 +323,7 @@ kore_connection_handle(struct connection *c, int flags) if (!memcmp(data, "spdy/3", 6)) kore_log("using SPDY/3"); c->proto = CONN_PROTO_SPDY; - if (!net_recv_queue(c, SPDY_FRAME_SIZE, + if (!net_recv_queue(c, SPDY_FRAME_SIZE, 0, NULL, spdy_frame_recv)) return (KORE_RESULT_ERROR); } else { diff --git a/src/net.c b/src/net.c @@ -38,7 +38,7 @@ #include "kore.h" int -net_send_queue(struct connection *c, u_int8_t *data, size_t len, +net_send_queue(struct connection *c, u_int8_t *data, size_t len, int flags, struct netbuf **out, int (*cb)(struct netbuf *)) { struct netbuf *nb; @@ -51,6 +51,7 @@ net_send_queue(struct connection *c, u_int8_t *data, size_t len, nb->owner = c; nb->offset = 0; nb->retain = 0; + nb->flags = flags; nb->type = NETBUF_SEND; if (len > 0) { @@ -68,7 +69,7 @@ net_send_queue(struct connection *c, u_int8_t *data, size_t len, } int -net_recv_queue(struct connection *c, size_t len, +net_recv_queue(struct connection *c, size_t len, int flags, struct netbuf **out, int (*cb)(struct netbuf *)) { struct netbuf *nb; @@ -81,6 +82,7 @@ net_recv_queue(struct connection *c, size_t len, nb->owner = c; nb->offset = 0; nb->retain = 0; + nb->flags = flags; nb->type = NETBUF_RECV; nb->buf = (u_int8_t *)kore_malloc(nb->len); @@ -144,16 +146,19 @@ net_send(struct connection *c) } nb->offset += (size_t)r; - if (nb->offset == nb->len) { - TAILQ_REMOVE(&(c->send_queue), nb, list); + if (nb->offset == nb->len || (nb->flags & NETBUF_CALL_CB_ALWAYS)) { + if (nb->offset == nb->len) + TAILQ_REMOVE(&(c->send_queue), nb, list); if (nb->cb != NULL) r = nb->cb(nb); else r = KORE_RESULT_OK; - free(nb->buf); - free(nb); + if (nb->offset == nb->len) { + free(nb->buf); + free(nb); + } } else { r = KORE_RESULT_OK; } @@ -202,14 +207,15 @@ net_recv(struct connection *c) } nb->offset += (size_t)r; - if (nb->offset == nb->len) { + if (nb->offset == nb->len || (nb->flags & NETBUF_CALL_CB_ALWAYS)) { if (nb->cb == NULL) { kore_log("kore_read_client(): nb->cb == NULL"); return (KORE_RESULT_ERROR); } nb->retain++; - TAILQ_REMOVE(&(c->recv_queue), nb, list); + if (nb->offset == nb->len) + TAILQ_REMOVE(&(c->recv_queue), nb, list); r = nb->cb(nb); nb->retain--; diff --git a/src/spdy.c b/src/spdy.c @@ -100,8 +100,10 @@ spdy_frame_recv(struct netbuf *nb) kore_log("received data frame, can't handle that yet."); } - if (r == KORE_RESULT_OK) - r = net_recv_queue(c, SPDY_FRAME_SIZE, NULL, spdy_frame_recv); + if (r == KORE_RESULT_OK) { + r = net_recv_queue(c, SPDY_FRAME_SIZE, + 0, NULL, spdy_frame_recv); + } return (r); } @@ -144,7 +146,7 @@ spdy_frame_send(struct connection *c, u_int16_t type, u_int8_t flags, break; } - return (net_send_queue(c, nb, length, NULL, NULL)); + return (net_send_queue(c, nb, length, 0, NULL, NULL)); } struct spdy_stream * @@ -238,7 +240,7 @@ spdy_header_block_release(struct connection *c, int spdy_stream_get_header(struct spdy_header_block *s, char *header, char **out) { - char *cmp, t[128]; + char *cmp; u_int8_t *p, *end; u_int32_t i, nlen, vlen; @@ -267,10 +269,6 @@ spdy_stream_get_header(struct spdy_header_block *s, char *header, char **out) } cmp = (char *)(p + 4); - memcpy(t, cmp, nlen); - t[nlen] = '\0'; - kore_log("header %s", t); - if (!strncasecmp(cmp, header, nlen)) { kore_log("found %s header", header);