commit f867882f4393e2b509445b6fa77e9644ca236f26
parent ac4222e92986e772632fed2565bd4e7f00394373
Author: Joris Vink <joris@coders.se>
Date: Mon, 24 Nov 2014 11:01:12 +0100
Add websocket support to Kore.
Introduces a few new api functions:
- kore_websocket_handshake(struct http_request *):
Performs the handshake on an HTTP request (coming from page handler)
- kore_websocket_send(struct connection *, u_int8_t, void *, size_t):
Sends data to a websocket connection.
- kore_websocket_broadcast(struct connection *, u_int8_t, void *, size_t, int):
Broadcast the given websocket op and data to all connected
websocket clients on the worker. Note that as of right now
the WEBSOCKET_BROADCAST_GLOBAL scope option does not work
yet and messages broadcasted will be restricted to workers
only.
- kore_worker_websocket_broadcast(struct connection *, void *, void *):
Backend function used by kore_websocket_broadcast().
Could prove useful for developers to have access to.
A simple example is given under examples/websocket.
Known issues:
Kore does not support PING or CONT frames just yet.
Diffstat:
15 files changed, 673 insertions(+), 1 deletion(-)
diff --git a/Makefile b/Makefile
@@ -8,7 +8,7 @@ INCLUDE_DIR=/usr/local/include/kore
S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c src/config.c \
src/connection.c src/domain.c src/http.c src/mem.c src/module.c \
src/net.c src/pool.c src/spdy.c src/validator.c src/utils.c \
- src/worker.c src/zlib_dict.c
+ src/websocket.c src/worker.c src/zlib_dict.c
S_OBJS= $(S_SRC:.c=.o)
CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes
diff --git a/conf/kore.conf.example b/conf/kore.conf.example
@@ -40,6 +40,13 @@ workers 4
#http_keepalive_time 0
#http_hsts_enable 31536000
+# Websocket specific settings.
+# websocket_maxframe Specifies the maximum frame size we can receive
+# websocket_timeout Specifies the time in seconds before a websocket
+# connection would be closed due to inactivity.
+#websocket_maxframe 16384
+#websocket_timeout 120
+
# Load modules (you can load multiple at the same time).
# An additional parameter can be specified as the "onload" function
# which Kore will call when the module is loaded/reloaded.
diff --git a/examples/websocket/.gitignore b/examples/websocket/.gitignore
@@ -0,0 +1,5 @@
+*.o
+.objs
+websocket.so
+assets.h
+cert
diff --git a/examples/websocket/README.md b/examples/websocket/README.md
@@ -0,0 +1,17 @@
+Kore example websocket server
+
+Run:
+```
+ # kore run
+```
+
+Test:
+```
+ Open a browser that does websockets, surf to https://127.0.0.1:8888
+ or whatever configured IP you have in the config.
+
+ Hit the connect button to open a websocket session, open a second
+ tab and surf to the same address and hit the connection button there
+ as well. This should cause the number of messages sent/recv to keep
+ incremementing as each message is broadcast to the other connection.
+```
diff --git a/examples/websocket/assets/frontend.html b/examples/websocket/assets/frontend.html
@@ -0,0 +1,60 @@
+<!DOCTYPE>
+<html>
+<head>
+<script>
+var socket = null;
+var sent = 0;
+var recv = 0;
+var length = 65536;
+
+function open(evt) {
+ var msg = "";
+ var alphabet = "abcdefghijklmnopqrstuvwxyz";
+
+ for (i = 0; i < length; i++)
+ msg += alphabet.charAt(Math.floor(Math.random() * alphabet.length));
+
+ message(msg);
+}
+
+function message(msg) {
+ socket.send(msg);
+ sent = sent + 1;
+ update();
+}
+
+function update() {
+ var cnt = document.getElementById("counter");
+
+ cnt.innerHTML = "Recv: " + recv + " Sent: " + sent;
+}
+
+function onmessage(evt) {
+ recv = recv + 1;
+ update();
+
+ message(evt.data);
+}
+
+function connect() {
+ socket = new WebSocket("wss://127.0.0.1:8888/connect");
+
+ socket.onopen = function(evt) { open(evt) };
+ socket.onclose = function(evt) { alert("closed"); };
+ socket.onmessage = function(evt) { onmessage(evt) };
+ socket.onerror = function(evt) { alert("onerror"); };
+}
+</script>
+</head>
+
+<body>
+
+<form action="/" onsubmit="connect(); return false;">
+<input type="submit" value="connect">
+</form>
+
+<div id="counter">
+</div>
+
+</body>
+</html>
diff --git a/examples/websocket/conf/websocket.conf b/examples/websocket/conf/websocket.conf
@@ -0,0 +1,15 @@
+# Kore websocket example
+
+bind 127.0.0.1 8888
+load ./websocket.so
+
+websocket_maxframe 65536
+websocket_timeout 20
+
+domain 127.0.0.1 {
+ certfile cert/server.crt
+ certkey cert/server.key
+
+ static / page
+ static /connect page_ws_connect
+}
diff --git a/examples/websocket/out b/examples/websocket/out
@@ -0,0 +1,37 @@
+diff --git a/websocket/src/websocket.c b/websocket/src/websocket.c
+index d5848fd..630a355 100755
+--- a/websocket/src/websocket.c
++++ b/websocket/src/websocket.c
+@@ -226,7 +226,7 @@ websocket_recv_frame(struct netbuf *nb)
+ break;
+ case WEBSOCKET_OP_TEXT:
+ case WEBSOCKET_OP_BINARY:
+- websocket_send(c, WEBSOCKET_OP_BINARY, &nb->buf[mask_off + 4], len);
++ websocket_send(c, op, &nb->buf[mask_off + 4], len);
+ break;
+ case WEBSOCKET_OP_TERMINATE:
+ kore_connection_disconnect(c);
+@@ -248,8 +248,11 @@ websocket_recv_frame(struct netbuf *nb)
+ void
+ websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
+ {
+- u_int8_t len_1;
++ size_t i;
++ u_int8_t *p;
++ u_int32_t mask;
+ struct kore_buf *frame;
++ u_int8_t len_1, byte;
+
+ kore_log(LOG_NOTICE, "%p: sending %ld bytes", c, len);
+
+@@ -259,8 +262,10 @@ websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
+ frame = kore_buf_create(132);
+ (void)memset(frame->data, '\0', frame->length);
+
++ mask = 0;
+ kore_buf_append(frame, &op, sizeof(op));
+ kore_buf_append(frame, &len_1, sizeof(len_1));
++ kore_buf_append(frame, &mask, sizeof(mask));
+ kore_buf_append(frame, data, len);
+
+ net_send_queue(c, frame->data, frame->offset, NULL, NETBUF_LAST_CHAIN);
diff --git a/examples/websocket/src/websocket.c b/examples/websocket/src/websocket.c
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2014 Joris Vink <joris@coders.se>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <kore/kore.h>
+#include <kore/http.h>
+
+#include "assets.h"
+
+int page(struct http_request *);
+int page_ws_connect(struct http_request *);
+
+void websocket_connect(struct connection *);
+void websocket_disconnect(struct connection *);
+void websocket_message(struct connection *,
+ u_int8_t, void *, size_t);
+
+/* Websocket callbacks. */
+struct kore_wscbs wscbs = {
+ websocket_connect,
+ websocket_message,
+ websocket_disconnect
+};
+
+/* Called whenever we get a new websocket connection. */
+void
+websocket_connect(struct connection *c)
+{
+ kore_log(LOG_NOTICE, "%p: connected", c);
+}
+
+void
+websocket_message(struct connection *c, u_int8_t op, void *data, size_t len)
+{
+ kore_websocket_broadcast(c, op, data, len, WEBSOCKET_BROADCAST_LOCAL);
+}
+
+void
+websocket_disconnect(struct connection *c)
+{
+ kore_log(LOG_NOTICE, "%p: disconnecting", c);
+}
+
+int
+page(struct http_request *req)
+{
+ http_response_header(req, "content-type", "text/html");
+ http_response(req, 200, asset_frontend_html, asset_len_frontend_html);
+
+ return (KORE_RESULT_OK);
+}
+
+int
+page_ws_connect(struct http_request *req)
+{
+ /* Perform the websocket handshake, passing our callbacks. */
+ kore_websocket_handshake(req, &wscbs);
+
+ return (KORE_RESULT_OK);
+}
diff --git a/includes/kore.h b/includes/kore.h
@@ -137,6 +137,7 @@ LIST_HEAD(listener_head, listener);
#define CONN_PROTO_UNKNOWN 0
#define CONN_PROTO_SPDY 1
#define CONN_PROTO_HTTP 2
+#define CONN_PROTO_WEBSOCKET 3
#define CONN_READ_POSSIBLE 0x01
#define CONN_WRITE_POSSIBLE 0x02
@@ -148,6 +149,16 @@ LIST_HEAD(listener_head, listener);
#define KORE_IDLE_TIMER_MAX 20000
+#define WEBSOCKET_OP_CONT 0x00
+#define WEBSOCKET_OP_TEXT 0x01
+#define WEBSOCKET_OP_BINARY 0x02
+#define WEBSOCKET_OP_CLOSE 0x08
+#define WEBSOCKET_OP_PING 0x09
+#define WEBSOCKET_OP_PONG 0x10
+
+#define WEBSOCKET_BROADCAST_LOCAL 1
+#define WEBSOCKET_BROADCAST_GLOBAL 2
+
struct connection {
u_int8_t type;
int fd;
@@ -158,6 +169,7 @@ struct connection {
u_int8_t flags;
void *hdlr_extra;
X509 *cert;
+ void *wscbs;
void (*disconnect)(struct connection *);
int (*read)(struct connection *, int *);
@@ -315,6 +327,13 @@ struct kore_pool {
LIST_HEAD(, kore_pool_entry) freelist;
};
+struct kore_wscbs {
+ void (*connect)(struct connection *);
+ void (*message)(struct connection *, u_int8_t,
+ void *, size_t);
+ void (*disconnect)(struct connection *);
+};
+
extern pid_t kore_pid;
extern int foreground;
extern int kore_debug;
@@ -333,6 +352,8 @@ extern u_int8_t worker_count;
extern u_int32_t worker_rlimit_nofiles;
extern u_int32_t worker_max_connections;
extern u_int32_t worker_active_connections;
+extern u_int64_t kore_websocket_maxframe;
+extern u_int64_t kore_websocket_timeout;
extern struct listener_head listeners;
extern struct kore_worker *worker;
@@ -354,6 +375,8 @@ void kore_worker_entry(struct kore_worker *);
void kore_worker_connection_add(struct connection *);
void kore_worker_connection_move(struct connection *);
void kore_worker_connection_remove(struct connection *);
+void kore_worker_websocket_broadcast(struct connection *,
+ void (*cb)(struct connection *, void *), void *);
void kore_platform_init(void);
void kore_platform_event_init(void);
@@ -427,6 +450,13 @@ int kore_base64_encode(u_int8_t *, u_int32_t, char **);
int kore_base64_decode(char *, u_int8_t **, u_int32_t *);
void *kore_mem_find(void *, size_t, void *, u_int32_t);
+void kore_websocket_handshake(struct http_request *,
+ struct kore_wscbs *);
+void kore_websocket_send(struct connection *,
+ u_int8_t, void *, size_t);
+void kore_websocket_broadcast(struct connection *,
+ u_int8_t, void *, size_t, int);
+
void kore_domain_init(void);
int kore_domain_new(char *);
void kore_module_init(void);
@@ -458,8 +488,11 @@ void kore_debug_internal(char *, int, const char *, ...);
u_int16_t net_read16(u_int8_t *);
u_int32_t net_read32(u_int8_t *);
+u_int64_t net_read64(u_int8_t *);
void net_write16(u_int8_t *, u_int16_t);
void net_write32(u_int8_t *, u_int32_t);
+void net_write64(u_int8_t *, u_int64_t);
+
void net_init(void);
int net_send(struct connection *);
int net_send_flush(struct connection *);
diff --git a/src/config.c b/src/config.c
@@ -61,6 +61,8 @@ static int configure_authentication_uri(char **);
static int configure_authentication_type(char **);
static int configure_authentication_value(char **);
static int configure_authentication_validator(char **);
+static int configure_websocket_maxframe(char **);
+static int configure_websocket_timeout(char **);
#if defined(KORE_USE_PGSQL)
static int configure_pgsql_conn_max(char **);
@@ -105,6 +107,8 @@ static struct {
{ "authentication_type", configure_authentication_type },
{ "authentication_value", configure_authentication_value },
{ "authentication_validator", configure_authentication_validator },
+ { "websocket_maxframe", configure_websocket_maxframe },
+ { "websocket_timeout", configure_websocket_timeout },
#if defined(KORE_USE_PGSQL)
{ "pgsql_conn_max", configure_pgsql_conn_max },
#endif
@@ -875,6 +879,46 @@ configure_authentication_uri(char **argv)
return (KORE_RESULT_OK);
}
+static int
+configure_websocket_maxframe(char **argv)
+{
+ int err;
+
+ if (argv[1] == NULL) {
+ printf("missing parameter for kore_websocket_maxframe\n");
+ return (KORE_RESULT_ERROR);
+ }
+
+ kore_websocket_maxframe = kore_strtonum64(argv[1], 1, &err);
+ if (err != KORE_RESULT_OK) {
+ printf("bad kore_websocket_maxframe value\n");
+ return (KORE_RESULT_ERROR);
+ }
+
+ return (KORE_RESULT_OK);
+}
+
+static int
+configure_websocket_timeout(char **argv)
+{
+ int err;
+
+ if (argv[1] == NULL) {
+ printf("missing parameter for kore_websocket_timeout\n");
+ return (KORE_RESULT_ERROR);
+ }
+
+ kore_websocket_timeout = kore_strtonum64(argv[1], 1, &err);
+ if (err != KORE_RESULT_OK) {
+ printf("bad kore_websocket_timeout value\n");
+ return (KORE_RESULT_ERROR);
+ }
+
+ kore_websocket_timeout = kore_websocket_timeout * 1000;
+
+ return (KORE_RESULT_OK);
+}
+
static void
domain_sslstart(void)
{
diff --git a/src/connection.c b/src/connection.c
@@ -45,6 +45,7 @@ kore_connection_new(void *owner)
c->rnb = NULL;
c->snb = NULL;
c->cert = NULL;
+ c->wscbs = NULL;
c->owner = owner;
c->disconnect = NULL;
c->hdlr_extra = NULL;
diff --git a/src/http.c b/src/http.c
@@ -449,6 +449,9 @@ http_response(struct http_request *req, int status, void *d, u_int32_t l)
case CONN_PROTO_HTTP:
http_response_normal(req, req->owner, status, d, l);
break;
+ default:
+ fatal("http_response() bad proto %d", req->owner->proto);
+ /* NOTREACHED. */
}
}
@@ -468,6 +471,9 @@ http_response_stream(struct http_request *req, int status, void *base,
case CONN_PROTO_HTTP:
http_response_normal(req, req->owner, status, NULL, len);
break;
+ default:
+ fatal("http_response_stream() bad proto %d", req->owner->proto);
+ /* NOTREACHED. */
}
if (req->method != HTTP_METHOD_HEAD) {
@@ -1138,6 +1144,9 @@ http_error_response(struct connection *c, struct spdy_stream *s, int status)
kore_log(LOG_NOTICE, "http_error_response: s != NULL");
http_response_normal(NULL, c, status, NULL, 0);
break;
+ default:
+ fatal("http_error_response() bad proto %d", c->proto);
+ /* NOTREACHED. */
}
}
diff --git a/src/net.c b/src/net.c
@@ -16,6 +16,16 @@
#include <sys/param.h>
+#if defined(__linux__)
+#include <endian.h>
+#elif defined(__MACH__)
+#include <libkern/OSByteOrder.h>
+#define htobe64(x) OSSwapHostToBigInt64(x)
+#define be64toh(x) OSSwapBigToHostInt64(x)
+#else
+#include <sys/endian.h>
+#endif
+
#include "kore.h"
struct kore_pool nb_pool;
@@ -419,3 +429,21 @@ net_write32(u_int8_t *p, u_int32_t n)
r = htonl(n);
memcpy(p, &r, sizeof(r));
}
+
+u_int64_t
+net_read64(u_int8_t *b)
+{
+ u_int64_t r;
+
+ r = *(u_int64_t *)b;
+ return (be64toh(r));
+}
+
+void
+net_write64(u_int8_t *p, u_int64_t n)
+{
+ u_int64_t r;
+
+ r = htobe64(n);
+ memcpy(p, &r, sizeof(r));
+}
diff --git a/src/websocket.c b/src/websocket.c
@@ -0,0 +1,332 @@
+/*
+ * Copyright (c) 2014 Joris Vink <joris@coders.se>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+
+#include <limits.h>
+
+#include "kore.h"
+#include "http.h"
+
+#define WEBSOCKET_FRAME_HDR 2
+#define WEBSOCKET_MASK_LEN 4
+#define WEBSOCKET_FRAME_MAXLEN 16384
+#define WEBSOCKET_PAYLOAD_SINGLE 125
+#define WEBSOCKET_PAYLOAD_EXTEND_1 126
+#define WEBSOCKET_PAYLOAD_EXTEND_2 127
+#define WEBSOCKET_OPCODE_MASK 0x0f
+#define WEBSOCKET_FRAME_LENGTH(x) ((x) & ~(1 << 7))
+#define WEBSOCKET_HAS_MASK(x) ((x) & (1 << 7))
+#define WEBSOCKET_HAS_FINFLAG(x) ((x) & (1 << 7))
+#define WEBSOCKET_RSV(x, i) ((x) & (1 << (7 - i)))
+
+#define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+
+struct websocket_data {
+ u_int8_t op;
+ void *data;
+ size_t len;
+};
+
+u_int64_t kore_websocket_timeout = 120000;
+u_int64_t kore_websocket_maxframe = 16384;
+
+static int websocket_recv_frame(struct netbuf *);
+static int websocket_recv_opcode(struct netbuf *);
+static void websocket_disconnect(struct connection *);
+static void websocket_send_single(struct connection *, void *);
+void websocket_send(struct connection *, u_int8_t, void *, size_t);
+
+void
+kore_websocket_handshake(struct http_request *req, struct kore_wscbs *wscbs)
+{
+ SHA_CTX sctx;
+ struct kore_buf *buf;
+ char *key, *base64, *version;
+ u_int8_t digest[SHA_DIGEST_LENGTH];
+
+ if (!http_request_header(req, "sec-websocket-key", &key)) {
+ http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
+ return;
+ }
+
+ if (!http_request_header(req, "sec-websocket-version", &version)) {
+ http_response_header(req, "sec-websocket-version", "13");
+ http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
+ return;
+ }
+
+ if (strcmp(version, "13")) {
+ kore_mem_free(version);
+ http_response_header(req, "sec-websocket-version", "13");
+ http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
+ return;
+ }
+
+ kore_mem_free(version);
+
+ buf = kore_buf_create(128);
+ kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
+ kore_mem_free(key);
+
+ (void)SHA1_Init(&sctx);
+ (void)SHA1_Update(&sctx, buf->data, buf->offset);
+ (void)SHA1_Final(digest, &sctx);
+
+ kore_buf_free(buf);
+
+ if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
+ kore_debug("failed to base64 encode digest");
+ http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
+ return;
+ }
+
+ http_response_header(req, "upgrade", "websocket");
+ http_response_header(req, "connection", "upgrade");
+ http_response_header(req, "sec-websocket-accept", base64);
+ kore_mem_free(base64);
+
+ kore_debug("%p: new websocket connection", req->owner);
+
+ http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
+ net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
+
+ req->owner->disconnect = websocket_disconnect;
+ req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
+ req->owner->proto = CONN_PROTO_WEBSOCKET;
+
+ req->owner->wscbs = wscbs;
+ req->owner->idle_timer.start = 0;
+ req->owner->idle_timer.length = kore_websocket_timeout;
+
+ if (wscbs->connect != NULL)
+ wscbs->connect(req->owner);
+}
+
+void
+kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
+{
+ u_int8_t len_1;
+ u_int16_t len16;
+ u_int64_t len64;
+ struct kore_buf *frame;
+
+ if (c->proto != CONN_PROTO_WEBSOCKET)
+ fatal("kore_websocket_send(): to non websocket connection");
+
+ kore_debug("%p: sending %ld bytes", c, len);
+
+ if (len > WEBSOCKET_PAYLOAD_SINGLE) {
+ if (len < USHRT_MAX)
+ len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
+ else
+ len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
+ } else {
+ len_1 = len;
+ }
+
+ frame = kore_buf_create(len);
+
+ op |= (1 << 7);
+ kore_buf_append(frame, &op, sizeof(op));
+
+ len_1 &= ~(1 << 7);
+ kore_buf_append(frame, &len_1, sizeof(len_1));
+
+ if (len_1 != len) {
+ switch (len_1) {
+ case WEBSOCKET_PAYLOAD_EXTEND_1:
+ net_write16((u_int8_t *)&len16, len);
+ kore_buf_append(frame, &len16, sizeof(len16));
+ break;
+ case WEBSOCKET_PAYLOAD_EXTEND_2:
+ net_write64((u_int8_t *)&len64, len);
+ kore_buf_append(frame, &len64, sizeof(len64));
+ break;
+ }
+ }
+
+ kore_buf_append(frame, data, len);
+ net_send_queue(c, frame->data, frame->offset, NULL, NETBUF_LAST_CHAIN);
+ kore_buf_free(frame);
+}
+
+void
+kore_websocket_broadcast(struct connection *c, u_int8_t op, void *data,
+ size_t len, int scope)
+{
+ struct websocket_data arg;
+
+ arg.op = op;
+ arg.len = len;
+ arg.data = data;
+ kore_worker_websocket_broadcast(c, websocket_send_single, &arg);
+
+ if (scope == WEBSOCKET_BROADCAST_GLOBAL)
+ fatal("kore_websocket_broadcast: no global scope yet");
+}
+
+static void
+websocket_send_single(struct connection *c, void *args)
+{
+ struct websocket_data *arg = args;
+
+ kore_websocket_send(c, arg->op, arg->data, arg->len);
+ net_send_flush(c);
+}
+
+static int
+websocket_recv_opcode(struct netbuf *nb)
+{
+ u_int8_t op, len;
+ struct connection *c = nb->owner;
+
+ if (!WEBSOCKET_HAS_MASK(nb->buf[1])) {
+ kore_debug("%p: frame did not have a mask set", c);
+ return (KORE_RESULT_ERROR);
+ }
+
+ if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
+ WEBSOCKET_RSV(nb->buf[0], 2)) {
+ kore_debug("%p: RSV bits are not zero", c);
+ return (KORE_RESULT_ERROR);
+ }
+
+ len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
+
+ op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
+ switch (op) {
+ case WEBSOCKET_OP_CONT:
+ case WEBSOCKET_OP_TEXT:
+ case WEBSOCKET_OP_BINARY:
+ break;
+ case WEBSOCKET_OP_CLOSE:
+ case WEBSOCKET_OP_PING:
+ case WEBSOCKET_OP_PONG:
+ if (len > WEBSOCKET_PAYLOAD_SINGLE ||
+ !WEBSOCKET_HAS_FINFLAG(nb->buf[0])) {
+ kore_debug("%p: large or fragmented control frame", c);
+ return (KORE_RESULT_ERROR);
+ }
+ break;
+ default:
+ kore_debug("%p: bad websocket op %d", c, op);
+ return (KORE_RESULT_ERROR);
+ }
+
+ switch (len) {
+ case WEBSOCKET_PAYLOAD_EXTEND_1:
+ len += sizeof(u_int16_t);
+ break;
+ case WEBSOCKET_PAYLOAD_EXTEND_2:
+ len += sizeof(u_int64_t);
+ break;
+ }
+
+ len += WEBSOCKET_MASK_LEN;
+ net_recv_expand(c, len, NULL, websocket_recv_frame);
+
+ return (KORE_RESULT_OK);
+}
+
+static int
+websocket_recv_frame(struct netbuf *nb)
+{
+ struct connection *c;
+ int ret;
+ struct kore_wscbs *wscbs;
+ u_int64_t len, i, total;
+ u_int8_t op, moff, extra;
+
+ c = nb->owner;
+ wscbs = c->wscbs;
+
+ op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
+ len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
+
+ switch (len) {
+ case WEBSOCKET_PAYLOAD_EXTEND_1:
+ moff = 4;
+ extra = sizeof(u_int16_t);
+ len = net_read16(&nb->buf[2]);
+ break;
+ case WEBSOCKET_PAYLOAD_EXTEND_2:
+ moff = 10;
+ extra = sizeof(u_int64_t);
+ len = net_read64(&nb->buf[2]);
+ break;
+ default:
+ extra = 0;
+ moff = 2;
+ break;
+ }
+
+ if (len > kore_websocket_maxframe) {
+ kore_debug("%p: frame too big", c);
+ return (KORE_RESULT_ERROR);
+ }
+
+ extra += WEBSOCKET_FRAME_HDR;
+ total = len + extra + WEBSOCKET_MASK_LEN;
+ if (total > nb->b_len) {
+ total -= nb->b_len;
+ net_recv_expand(c, total, NULL, websocket_recv_frame);
+ return (KORE_RESULT_OK);
+ }
+
+ if (total != nb->b_len)
+ return (KORE_RESULT_ERROR);
+
+ for (i = 0; i < len; i++)
+ nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
+
+ ret = KORE_RESULT_OK;
+ switch (op) {
+ case WEBSOCKET_OP_CONT:
+ case WEBSOCKET_OP_PONG:
+ ret = KORE_RESULT_ERROR;
+ kore_log(LOG_ERR, "%p: we do not support op 0x%02x yet", c, op);
+ break;
+ case WEBSOCKET_OP_TEXT:
+ case WEBSOCKET_OP_BINARY:
+ if (wscbs->message != NULL)
+ wscbs->message(c, op, &nb->buf[moff + 4], len);
+ break;
+ case WEBSOCKET_OP_CLOSE:
+ kore_connection_disconnect(c);
+ break;
+ case WEBSOCKET_OP_PING:
+ kore_websocket_send(c, WEBSOCKET_OP_PONG,
+ &nb->buf[moff + 4], len);
+ break;
+ default:
+ ret = KORE_RESULT_ERROR;
+ kore_debug("%p: bad websocket op %d", c, op);
+ return (KORE_RESULT_ERROR);
+ }
+
+ net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
+ return (ret);
+}
+
+static void
+websocket_disconnect(struct connection *c)
+{
+ struct kore_wscbs *wscbs = c->wscbs;
+
+ if (wscbs->disconnect != NULL)
+ wscbs->disconnect(c);
+}
diff --git a/src/worker.c b/src/worker.c
@@ -358,6 +358,18 @@ kore_worker_connection_remove(struct connection *c)
}
void
+kore_worker_websocket_broadcast(struct connection *src,
+ void (*cb)(struct connection *, void *), void *args)
+{
+ struct connection *c;
+
+ TAILQ_FOREACH(c, &worker_clients, list) {
+ if (c != src && c->proto == CONN_PROTO_WEBSOCKET)
+ cb(c, args);
+ }
+}
+
+void
kore_worker_wait(int final)
{
u_int16_t id;