kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 064f2095b0f13c1fa67144a11483c131c8761aff
parent d8508f4a7b4af57dd0e4747a3ae1ab97cdd7bc84
Author: Joris Vink <joris@coders.se>
Date:   Mon,  6 Jul 2015 21:08:36 +0200

Update message framework with src/dst for workers.

One can now send messages to specific workers and
receiving workers can see the origin of the messages.

Diffstat:
examples/messaging/src/messaging.c | 12++++++++----
includes/kore.h | 8+++++++-
src/accesslog.c | 3++-
src/msg.c | 31+++++++++++++++++++++++++++++--
src/websocket.c | 6++++--
5 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/examples/messaging/src/messaging.c b/examples/messaging/src/messaging.c @@ -53,8 +53,8 @@ init(int state) void received_message(struct kore_msg *msg, const void *data) { - kore_log(LOG_INFO, "got message (%d bytes): %.*s", msg->length, - msg->length, (const char *)data); + kore_log(LOG_INFO, "got message from %u (%d bytes): %.*s", msg->src, + msg->length, msg->length, (const char *)data); } /* @@ -64,8 +64,12 @@ received_message(struct kore_msg *msg, const void *data) int page(struct http_request *req) { - kore_msg_send(MY_MESSAGE_ID, "hello", 5); - http_response(req, 200, NULL, 0); + /* Send to all workers first. */ + kore_msg_send(KORE_MSG_WORKER_ALL, MY_MESSAGE_ID, "hello", 5); + + /* Now send something to worker number #2 only. */ + kore_msg_send(2, MY_MESSAGE_ID, "hello number 2", 14); + http_response(req, 200, NULL, 0); return (KORE_RESULT_OK); } diff --git a/includes/kore.h b/includes/kore.h @@ -369,8 +369,14 @@ struct kore_timer { #define KORE_MSG_ACCESSLOG 1 #define KORE_MSG_WEBSOCKET 2 +/* Predefined message targets. */ +#define KORE_MSG_PARENT 1000 +#define KORE_MSG_WORKER_ALL 1001 + struct kore_msg { u_int8_t id; + u_int16_t src; + u_int16_t dst; u_int32_t length; }; @@ -512,7 +518,7 @@ void kore_msg_worker_init(void); void kore_msg_parent_init(void); void kore_msg_parent_add(struct kore_worker *); void kore_msg_parent_remove(struct kore_worker *); -void kore_msg_send(u_int8_t, void *, u_int32_t); +void kore_msg_send(u_int16_t, u_int8_t, void *, u_int32_t); int kore_msg_register(u_int8_t, void (*cb)(struct kore_msg *, const void *)); diff --git a/src/accesslog.c b/src/accesslog.c @@ -167,5 +167,6 @@ kore_accesslog(struct http_request *req) } #endif - kore_msg_send(KORE_MSG_ACCESSLOG, &logpacket, sizeof(logpacket)); + kore_msg_send(KORE_MSG_PARENT, + KORE_MSG_ACCESSLOG, &logpacket, sizeof(logpacket)); } diff --git a/src/msg.c b/src/msg.c @@ -33,6 +33,7 @@ static struct msg_type *msg_type_lookup(u_int8_t); static int msg_recv_packet(struct netbuf *); static int msg_recv_data(struct netbuf *); static void msg_disconnected_parent(struct connection *); +static void msg_disconnected_worker(struct connection *); static void msg_type_accesslog(struct kore_msg *, const void *); static void msg_type_websocket(struct kore_msg *, const void *); @@ -59,12 +60,19 @@ kore_msg_parent_init(void) void kore_msg_parent_add(struct kore_worker *kw) { + u_int8_t *worker_id; + + worker_id = kore_malloc(sizeof(*worker_id)); + *worker_id = kw->id; + kw->msg[0] = kore_connection_new(NULL); kw->msg[0]->fd = kw->pipe[0]; kw->msg[0]->read = net_read; kw->msg[0]->write = net_write; kw->msg[0]->proto = CONN_PROTO_MSG; kw->msg[0]->state = CONN_STATE_ESTABLISHED; + kw->msg[0]->hdlr_extra = worker_id; + kw->msg[0]->disconnect = msg_disconnected_worker; TAILQ_INSERT_TAIL(&connections, kw->msg[0], list); kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]); @@ -117,12 +125,14 @@ kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *)) } void -kore_msg_send(u_int8_t id, void *data, u_int32_t len) +kore_msg_send(u_int16_t dst, u_int8_t id, void *data, u_int32_t len) { struct kore_msg m; m.id = id; + m.dst = dst; m.length = len; + m.src = worker->id; net_send_queue(worker->msg[1], &m, sizeof(m), NULL, NETBUF_LAST_CHAIN); net_send_queue(worker->msg[1], data, len, NULL, NETBUF_LAST_CHAIN); @@ -145,13 +155,23 @@ msg_recv_data(struct netbuf *nb) struct msg_type *type; struct kore_msg *msg = (struct kore_msg *)nb->buf; - if ((type = msg_type_lookup(msg->id)) != NULL) + if ((type = msg_type_lookup(msg->id)) != NULL) { + if (worker == NULL && msg->dst != KORE_MSG_PARENT) + fatal("received parent msg for non parent dst"); type->cb(msg, nb->buf + sizeof(*msg)); + } if (worker == NULL && type == NULL) { TAILQ_FOREACH(c, &connections, list) { if (c == nb->owner) continue; + if (c->proto != CONN_PROTO_MSG || c->hdlr_extra == NULL) + continue; + + if (msg->dst != KORE_MSG_WORKER_ALL && + *(u_int8_t *)c->hdlr_extra != msg->dst) + continue; + net_send_queue(c, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN); net_send_flush(c); @@ -171,6 +191,13 @@ msg_disconnected_parent(struct connection *c) } static void +msg_disconnected_worker(struct connection *c) +{ + kore_mem_free(c->hdlr_extra); + c->hdlr_extra = NULL; +} + +static void msg_type_accesslog(struct kore_msg *msg, const void *data) { if (kore_accesslog_write(data, msg->length) == -1) diff --git a/src/websocket.c b/src/websocket.c @@ -143,8 +143,10 @@ kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data, } } - if (scope == WEBSOCKET_BROADCAST_GLOBAL) - kore_msg_send(KORE_MSG_WEBSOCKET, frame->data, frame->offset); + if (scope == WEBSOCKET_BROADCAST_GLOBAL) { + kore_msg_send(KORE_MSG_WORKER_ALL, + KORE_MSG_WEBSOCKET, frame->data, frame->offset); + } kore_buf_free(frame); }