commit a64808c6b01eff40981f5e1b043329304ecb4704
parent 0e3271cf9debbf577460cb44f2eecf0391b12ac0
Author: Joris Vink <joris@coders.se>
Date:   Tue, 23 Jun 2015 18:17:14 +0200
Improvements to our message framework.
Change the callback prototypes to:
	void callback(struct kore_msg *msg, const void *data);
This allows the callbacks to receive the full kore_msg data structure
as sent over the wire (including length and id). Useful for future
additions to the kore_msg structure (such as worker origin).
Several other improvements:
	* Accesslog now uses the msg framework as well.
	* Websocket WEBSOCKET_BROADCAST_GLOBAL now works.
Small websocket improvement in this commit:
	* Build the frame to be sent only once when broadcasting
	 instead of per connection we are broadcasting towards.
Diffstat:
10 files changed, 122 insertions(+), 127 deletions(-)
diff --git a/examples/messaging/src/messaging.c b/examples/messaging/src/messaging.c
@@ -28,7 +28,7 @@
 
 int	init(int);
 int	page(struct http_request *);
-void	received_message(const void *, u_int32_t);
+void	received_message(struct kore_msg *, const void *);
 
 /* Initialization callback. */
 int
@@ -51,10 +51,10 @@ init(int state)
  * Callback for receiving a message MY_MESSAGE_ID.
  */
 void
-received_message(const void *data, u_int32_t len)
+received_message(struct kore_msg *msg, const void *data)
 {
-	kore_log(LOG_INFO, "got message (%d bytes): %.*s", len,
-	    len, (const char *)data);
+	kore_log(LOG_INFO, "got message (%d bytes): %.*s", msg->length,
+	    msg->length, (const char *)data);
 }
 
 /*
diff --git a/examples/websocket/conf/websocket.conf b/examples/websocket/conf/websocket.conf
@@ -5,6 +5,10 @@ load		./websocket.so
 
 tls_dhparam	dh2048.pem
 
+# Increase workers so connections are spread
+# across them to demonstrate WEBSOCKET_BROADCAST_GLOBAL.
+workers		4
+
 websocket_maxframe	65536
 websocket_timeout	20
 
diff --git a/examples/websocket/src/websocket.c b/examples/websocket/src/websocket.c
@@ -44,7 +44,7 @@ websocket_connect(struct connection *c)
 void
 websocket_message(struct connection *c, u_int8_t op, void *data, size_t len)
 {
-	kore_websocket_broadcast(c, op, data, len, WEBSOCKET_BROADCAST_LOCAL);
+	kore_websocket_broadcast(c, op, data, len, WEBSOCKET_BROADCAST_GLOBAL);
 }
 
 void
diff --git a/includes/kore.h b/includes/kore.h
@@ -365,6 +365,10 @@ struct kore_timer {
 	TAILQ_ENTRY(kore_timer)	list;
 };
 
+/* Reserved message ids, registered on workers. */
+#define KORE_MSG_ACCESSLOG	1
+#define KORE_MSG_WEBSOCKET	2
+
 struct kore_msg {
 	u_int8_t	id;
 	u_int32_t	length;
@@ -428,8 +432,8 @@ void		kore_platform_event_schedule(int, int, int, void *);
 void		kore_platform_worker_setcpu(struct kore_worker *);
 
 void		kore_accesslog_init(void);
-int		kore_accesslog_wait(void);
 void		kore_accesslog_worker_init(void);
+int		kore_accesslog_write(const void *, u_int32_t);
 
 int		kore_auth_run(struct http_request *, struct kore_auth *);
 void		kore_auth_init(void);
@@ -499,9 +503,9 @@ void		*kore_mem_find(void *, size_t, void *, u_int32_t);
 void		kore_websocket_handshake(struct http_request *,
 		    struct kore_wscbs *);
 void		kore_websocket_send(struct connection *,
-		    u_int8_t, void *, size_t);
+		    u_int8_t, const void *, size_t);
 void		kore_websocket_broadcast(struct connection *,
-		    u_int8_t, void *, size_t, int);
+		    u_int8_t, const void *, size_t, int);
 
 void		kore_msg_init(void);
 void		kore_msg_worker_init(void);
@@ -510,7 +514,7 @@ void		kore_msg_parent_add(struct kore_worker *);
 void		kore_msg_parent_remove(struct kore_worker *);
 void		kore_msg_send(u_int8_t, void *, u_int32_t);
 int		kore_msg_register(u_int8_t,
-		    void (*cb)(const void *, u_int32_t));
+		    void (*cb)(struct kore_msg *, const void *));
 
 void		kore_domain_init(void);
 int		kore_domain_new(char *);
@@ -563,7 +567,7 @@ void		net_recv_queue(struct connection *, u_int32_t, int,
 		    int (*cb)(struct netbuf *));
 void		net_recv_expand(struct connection *c, u_int32_t,
 		    int (*cb)(struct netbuf *));
-void		net_send_queue(struct connection *, void *,
+void		net_send_queue(struct connection *, const void *,
 		    u_int32_t, struct spdy_stream *, int);
 void		net_send_stream(struct connection *, void *,
 		    u_int32_t, struct spdy_stream *,
@@ -571,8 +575,9 @@ void		net_send_stream(struct connection *, void *,
 
 void		kore_buf_free(struct kore_buf *);
 struct kore_buf	*kore_buf_create(u_int32_t);
-void		kore_buf_append(struct kore_buf *, void *, u_int32_t);
+void		kore_buf_append(struct kore_buf *, const void *, u_int32_t);
 u_int8_t	*kore_buf_release(struct kore_buf *, u_int32_t *);
+
 void	kore_buf_appendf(struct kore_buf *, const char *, ...);
 void	kore_buf_appendv(struct kore_buf *, const char *, va_list);
 void	kore_buf_appendb(struct kore_buf *, struct kore_buf *);
diff --git a/src/accesslog.c b/src/accesslog.c
@@ -21,8 +21,6 @@
 #include "kore.h"
 #include "http.h"
 
-static int		accesslog_fd[2];
-
 struct kore_log_packet {
 	u_int8_t	method;
 	int		status;
@@ -40,52 +38,29 @@ struct kore_log_packet {
 void
 kore_accesslog_init(void)
 {
-	if (socketpair(AF_UNIX, SOCK_STREAM, 0, accesslog_fd) == -1)
-		fatal("kore_accesslog_init(): socketpair() %s", errno_s);
 }
 
 void
 kore_accesslog_worker_init(void)
 {
-	close(accesslog_fd[0]);
 	kore_domain_closelogs();
 }
 
 int
-kore_accesslog_wait(void)
+kore_accesslog_write(const void *data, u_int32_t len)
 {
-	ssize_t			len;
+	int			l;
 	time_t			now;
+	ssize_t			sent;
 	struct kore_domain	*dom;
-	struct pollfd		pfd[1];
-	int			nfds, l;
 	struct kore_log_packet	logpacket;
 	char			addr[INET6_ADDRSTRLEN];
 	char			*method, *buf, *tbuf, *cn;
 
-	pfd[0].fd = accesslog_fd[0];
-	pfd[0].events = POLLIN;
-	pfd[0].revents = 0;
-
-	nfds = poll(pfd, 1, 100);
-	if (nfds == -1 || (pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))) {
-		if (nfds == -1 && errno == EINTR)
-			return (KORE_RESULT_OK);
-		kore_log(LOG_WARNING, "poll(): %s", errno_s);
-		return (KORE_RESULT_ERROR);
-	}
-
-	if (nfds == 0)
-		return (KORE_RESULT_OK);
-
-	len = recv(accesslog_fd[0], &logpacket, sizeof(logpacket), 0);
-	if (len == -1) {
-		kore_log(LOG_WARNING, "recv(): %s", errno_s);
+	if (len != sizeof(struct kore_log_packet))
 		return (KORE_RESULT_ERROR);
-	}
 
-	if (len != sizeof(logpacket))
-		return (KORE_RESULT_ERROR);
+	(void)memcpy(&logpacket, data, sizeof(logpacket));
 
 	if ((dom = kore_domain_lookup(logpacket.host)) == NULL) {
 		kore_log(LOG_WARNING,
@@ -131,19 +106,19 @@ kore_accesslog_wait(void)
 	    logpacket.worker_id, logpacket.time_req, cn, logpacket.agent);
 	if (l == -1) {
 		kore_log(LOG_WARNING,
-		    "kore_accesslog_wait(): asprintf() == -1");
+		    "kore_accesslog_write(): asprintf() == -1");
 		return (KORE_RESULT_ERROR);
 	}
 
-	len = write(dom->accesslog, buf, l);
-	if (len == -1) {
+	sent = write(dom->accesslog, buf, l);
+	if (sent == -1) {
 		free(buf);
 		kore_log(LOG_WARNING,
-		    "kore_accesslog_wait(): write(): %s", errno_s);
+		    "kore_accesslog_write(): write(): %s", errno_s);
 		return (KORE_RESULT_ERROR);
 	}
 
-	if (len != l)
+	if (sent != l)
 		kore_log(LOG_NOTICE, "accesslog: %s", buf);
 
 	free(buf);
@@ -153,7 +128,6 @@ kore_accesslog_wait(void)
 void
 kore_accesslog(struct http_request *req)
 {
-	ssize_t			len;
 	struct kore_log_packet	logpacket;
 
 	logpacket.addrtype = req->owner->addrtype;
@@ -193,10 +167,5 @@ kore_accesslog(struct http_request *req)
 	}
 #endif
 
-	len = send(accesslog_fd[1], &logpacket, sizeof(logpacket), 0);
-	if (len == -1) {
-		kore_log(LOG_WARNING, "kore_accesslog(): send(): %s", errno_s);
-	} else if (len != sizeof(logpacket)) {
-		kore_log(LOG_WARNING, "short accesslog packet sent");
-	}
+	kore_msg_send(KORE_MSG_ACCESSLOG, &logpacket, sizeof(logpacket));
 }
diff --git a/src/buf.c b/src/buf.c
@@ -30,7 +30,7 @@ kore_buf_create(u_int32_t initial)
 }
 
 void
-kore_buf_append(struct kore_buf *buf, void *d, u_int32_t len)
+kore_buf_append(struct kore_buf *buf, const void *d, u_int32_t len)
 {
 	if ((buf->offset + len) >= buf->length) {
 		buf->length += len + KORE_BUF_INCREMENT;
diff --git a/src/kore.c b/src/kore.c
@@ -383,12 +383,6 @@ kore_server_start(void)
 			sig_recv = 0;
 		}
 
-		/* XXX - The accesslog should move to our msg framework. */
-		if (!kore_accesslog_wait()) {
-			kore_worker_dispatch_signal(SIGQUIT);
-			break;
-		}
-
 		kore_worker_wait(0);
 		kore_platform_event_wait(100);
 		kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
diff --git a/src/msg.c b/src/msg.c
@@ -23,17 +23,18 @@
 
 struct msg_type {
 	u_int8_t		id;
-	void			(*cb)(const void *, u_int32_t);
+	void			(*cb)(struct kore_msg *, const void *);
 	TAILQ_ENTRY(msg_type)	list;
 };
 
 TAILQ_HEAD(, msg_type)		msg_types;
 
 static struct msg_type	*msg_type_lookup(u_int8_t);
-static int		msg_recv_worker(struct netbuf *);
-static int		msg_recv_parent(struct netbuf *);
-static int		msg_recv_worker_data(struct netbuf *);
+static int		msg_recv_packet(struct netbuf *);
+static int		msg_recv_data(struct netbuf *);
 static void		msg_disconnected_parent(struct connection *);
+static void		msg_type_accesslog(struct kore_msg *, const void *);
+static void		msg_type_websocket(struct kore_msg *, const void *);
 
 void
 kore_msg_init(void)
@@ -51,6 +52,8 @@ kore_msg_parent_init(void)
 		kw = kore_worker_data(i);
 		kore_msg_parent_add(kw);
 	}
+
+	kore_msg_register(KORE_MSG_ACCESSLOG, msg_type_accesslog);
 }
 
 void
@@ -66,8 +69,7 @@ kore_msg_parent_add(struct kore_worker *kw)
 	TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
 	kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
 
-	net_recv_queue(kw->msg[0], NETBUF_SEND_PAYLOAD_MAX,
-	    NETBUF_CALL_CB_ALWAYS, msg_recv_parent);
+	net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
 }
 
 void
@@ -81,6 +83,8 @@ kore_msg_parent_remove(struct kore_worker *kw)
 void
 kore_msg_worker_init(void)
 {
+	kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
+
 	worker->msg[1] = kore_connection_new(NULL);
 	worker->msg[1]->fd = worker->pipe[1];
 	worker->msg[1]->read = net_read;
@@ -93,11 +97,11 @@ kore_msg_worker_init(void)
 	kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
 
 	net_recv_queue(worker->msg[1],
-	    sizeof(struct kore_msg), 0, msg_recv_worker);
+	    sizeof(struct kore_msg), 0, msg_recv_packet);
 }
 
 int
-kore_msg_register(u_int8_t id, void (*cb)(const void *, u_int32_t))
+kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
 {
 	struct msg_type		*type;
 
@@ -126,41 +130,35 @@ kore_msg_send(u_int8_t id, void *data, u_int32_t len)
 }
 
 static int
-msg_recv_worker(struct netbuf *nb)
+msg_recv_packet(struct netbuf *nb)
 {
 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
 
-	net_recv_expand(nb->owner, msg->length, msg_recv_worker_data);
+	net_recv_expand(nb->owner, msg->length, msg_recv_data);
 	return (KORE_RESULT_OK);
 }
 
 static int
-msg_recv_worker_data(struct netbuf *nb)
+msg_recv_data(struct netbuf *nb)
 {
+	struct connection	*c;
 	struct msg_type		*type;
 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
 
 	if ((type = msg_type_lookup(msg->id)) != NULL)
-		type->cb(nb->buf + sizeof(*msg), nb->s_off - sizeof(*msg));
-
-	net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_worker);
-	return (KORE_RESULT_OK);
-}
-
-static int
-msg_recv_parent(struct netbuf *nb)
-{
-	struct connection	*c;
-
-	TAILQ_FOREACH(c, &connections, list) {
-		if (c == nb->owner)
-			continue;
-		net_send_queue(c, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN);
-		net_send_flush(c);
+		type->cb(msg, nb->buf + sizeof(*msg));
+
+	if (worker == NULL && type == NULL) {
+		TAILQ_FOREACH(c, &connections, list) {
+			if (c == nb->owner)
+				continue;
+			net_send_queue(c, nb->buf, nb->s_off,
+			    NULL, NETBUF_LAST_CHAIN);
+			net_send_flush(c);
+		}
 	}
 
-	net_recv_reset(nb->owner, NETBUF_SEND_PAYLOAD_MAX, msg_recv_parent);
-
+	net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
 	return (KORE_RESULT_OK);
 }
 
@@ -172,6 +170,27 @@ msg_disconnected_parent(struct connection *c)
 		kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s);
 }
 
+static void
+msg_type_accesslog(struct kore_msg *msg, const void *data)
+{
+	if (kore_accesslog_write(data, msg->length) == -1)
+		kore_log(LOG_WARNING, "failed to write to accesslog");
+}
+
+static void
+msg_type_websocket(struct kore_msg *msg, const void *data)
+{
+	struct connection	*c;
+
+	TAILQ_FOREACH(c, &connections, list) {
+		if (c->proto == CONN_PROTO_WEBSOCKET) {
+			net_send_queue(c, data, msg->length,
+			    NULL, NETBUF_LAST_CHAIN);
+			net_send_flush(c);
+		}
+	}
+}
+
 static struct msg_type *
 msg_type_lookup(u_int8_t id)
 {
diff --git a/src/net.c b/src/net.c
@@ -37,10 +37,10 @@ net_init(void)
 }
 
 void
-net_send_queue(struct connection *c, void *data, u_int32_t len,
+net_send_queue(struct connection *c, const void *data, u_int32_t len,
     struct spdy_stream *s, int before)
 {
-	u_int8_t		*d;
+	const u_int8_t		*d;
 	struct netbuf		*nb;
 	u_int32_t		avail;
 
diff --git a/src/websocket.c b/src/websocket.c
@@ -42,9 +42,8 @@ u_int64_t	kore_websocket_maxframe = 16384;
 static int	websocket_recv_frame(struct netbuf *);
 static int	websocket_recv_opcode(struct netbuf *);
 static void	websocket_disconnect(struct connection *);
-static void	websocket_send_single(struct connection *,
-		    u_int8_t, void *, size_t);
-void		websocket_send(struct connection *, u_int8_t, void *, size_t);
+static void	websocket_frame_build(struct kore_buf *, u_int8_t,
+		    const void *, size_t);
 
 void
 kore_websocket_handshake(struct http_request *req, struct kore_wscbs *wscbs)
@@ -115,17 +114,48 @@ kore_websocket_handshake(struct http_request *req, struct kore_wscbs *wscbs)
 }
 
 void
-kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
+kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
+    size_t len)
 {
-	u_int8_t		len_1;
-	u_int16_t		len16;
-	u_int64_t		len64;
 	struct kore_buf		*frame;
 
-	if (c->proto != CONN_PROTO_WEBSOCKET)
-		fatal("kore_websocket_send(): to non websocket connection");
+	frame = kore_buf_create(len);
+	websocket_frame_build(frame, op, data, len);
+	net_send_queue(c, frame->data, frame->offset, NULL, NETBUF_LAST_CHAIN);
+	kore_buf_free(frame);
+}
+
+void
+kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
+    size_t len, int scope)
+{
+	struct connection	*c;
+	struct kore_buf		*frame;
+
+	frame = kore_buf_create(len);
+	websocket_frame_build(frame, op, data, len);
+
+	TAILQ_FOREACH(c, &connections, list) {
+		if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
+			net_send_queue(c, frame->data, frame->offset,
+			    NULL, NETBUF_LAST_CHAIN);
+			net_send_flush(c);
+		}
+	}
+
+	if (scope == WEBSOCKET_BROADCAST_GLOBAL)
+		kore_msg_send(KORE_MSG_WEBSOCKET, frame->data, frame->offset);
+
+	kore_buf_free(frame);
+}
 
-	kore_debug("%p: sending %ld bytes", c, len);
+static void
+websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
+    size_t len)
+{
+	u_int8_t		len_1;
+	u_int16_t		len16;
+	u_int64_t		len64;
 
 	if (len > WEBSOCKET_PAYLOAD_SINGLE) {
 		if (len < USHRT_MAX)
@@ -136,8 +166,6 @@ kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
 		len_1 = len;
 	}
 
-	frame = kore_buf_create(len);
-
 	op |= (1 << 7);
 	kore_buf_append(frame, &op, sizeof(op));
 
@@ -158,30 +186,6 @@ kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
 	}
 
 	kore_buf_append(frame, data, len);
-	net_send_queue(c, frame->data, frame->offset, NULL, NETBUF_LAST_CHAIN);
-	kore_buf_free(frame);
-}
-
-void
-kore_websocket_broadcast(struct connection *src, u_int8_t op, void *data,
-    size_t len, int scope)
-{
-	struct connection	*c;
-
-	TAILQ_FOREACH(c, &connections, list) {
-		if (c != src && c->proto == CONN_PROTO_WEBSOCKET)
-			websocket_send_single(c, op, data, len);
-	}
-
-	if (scope == WEBSOCKET_BROADCAST_GLOBAL)
-		fatal("kore_websocket_broadcast: no global scope yet");
-}
-
-static void
-websocket_send_single(struct connection *c, u_int8_t op, void *data, size_t len)
-{
-	kore_websocket_send(c, op, data, len);
-	net_send_flush(c);
 }
 
 static int