kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

websocket.c (9361B)



      1 /*
      2  * Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/types.h>
     19 
     20 #include <limits.h>
     21 #include <string.h>
     22 
     23 #include "kore.h"
     24 #include "http.h"
     25 #include "sha1.h"
     26 
     27 #define WEBSOCKET_FRAME_HDR		2
     28 #define WEBSOCKET_MASK_LEN		4
     29 #define WEBSOCKET_PAYLOAD_SINGLE	125
     30 #define WEBSOCKET_PAYLOAD_EXTEND_1	126
     31 #define WEBSOCKET_PAYLOAD_EXTEND_2	127
     32 #define WEBSOCKET_OPCODE_MASK		0x0f
     33 #define WEBSOCKET_FRAME_LENGTH(x)	((x) & ~(1 << 7))
     34 #define WEBSOCKET_HAS_MASK(x)		((x) & (1 << 7))
     35 #define WEBSOCKET_HAS_FINFLAG(x)	((x) & (1 << 7))
     36 #define WEBSOCKET_RSV(x, i)		((x) & (1 << (7 - i)))
     37 
     38 #define WEBSOCKET_SERVER_RESPONSE	"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
     39 
     40 
     41 u_int64_t	kore_websocket_timeout = 120000;
     42 u_int64_t	kore_websocket_maxframe = 16384;
     43 
     44 static int	websocket_recv_frame(struct netbuf *);
     45 static int	websocket_recv_opcode(struct netbuf *);
     46 static void	websocket_disconnect(struct connection *);
     47 static void	websocket_frame_build(struct kore_buf *, u_int8_t,
     48 		    const void *, size_t);
     49 
     50 void
     51 kore_websocket_handshake(struct http_request *req, const char *onconnect,
     52     const char *onmessage, const char *ondisconnect)
     53 {
     54 	SHA1_CTX		sctx;
     55 	struct kore_buf		*buf;
     56 	char			*base64;
     57 	const char		*key, *version;
     58 	u_int8_t		digest[SHA1_DIGEST_LENGTH];
     59 
     60 	if (!http_request_header(req, "sec-websocket-key", &key)) {
     61 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     62 		return;
     63 	}
     64 
     65 	if (!http_request_header(req, "sec-websocket-version", &version)) {
     66 		http_response_header(req, "sec-websocket-version", "13");
     67 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     68 		return;
     69 	}
     70 
     71 	if (strcmp(version, "13")) {
     72 		http_response_header(req, "sec-websocket-version", "13");
     73 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     74 		return;
     75 	}
     76 
     77 	buf = kore_buf_alloc(128);
     78 	kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
     79 
     80 	SHA1Init(&sctx);
     81 	SHA1Update(&sctx, buf->data, buf->offset);
     82 	SHA1Final(digest, &sctx);
     83 
     84 	kore_buf_free(buf);
     85 
     86 	if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
     87 		http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
     88 		return;
     89 	}
     90 
     91 	http_response_header(req, "upgrade", "websocket");
     92 	http_response_header(req, "connection", "upgrade");
     93 	http_response_header(req, "sec-websocket-accept", base64);
     94 	kore_free(base64);
     95 
     96 	req->owner->proto = CONN_PROTO_WEBSOCKET;
     97 	http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
     98 	net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
     99 
    100 	req->owner->disconnect = websocket_disconnect;
    101 	req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
    102 
    103 	req->owner->http_timeout = 0;
    104 	req->owner->idle_timer.start = kore_time_ms();
    105 	req->owner->idle_timer.length = kore_websocket_timeout;
    106 
    107 	if (onconnect != NULL) {
    108 		req->owner->ws_connect = kore_runtime_getcall(onconnect);
    109 		if (req->owner->ws_connect == NULL)
    110 			fatal("no symbol '%s' for ws_connect", onconnect);
    111 	} else {
    112 		req->owner->ws_connect = NULL;
    113 	}
    114 
    115 	if (onmessage != NULL) {
    116 		req->owner->ws_message = kore_runtime_getcall(onmessage);
    117 		if (req->owner->ws_message == NULL)
    118 			fatal("no symbol '%s' for ws_message", onmessage);
    119 	} else {
    120 		req->owner->ws_message = NULL;
    121 	}
    122 
    123 	if (ondisconnect != NULL) {
    124 		req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect);
    125 		if (req->owner->ws_disconnect == NULL)
    126 			fatal("no symbol '%s' for ws_disconnect", ondisconnect);
    127 	} else {
    128 		req->owner->ws_disconnect = NULL;
    129 	}
    130 
    131 	if (req->owner->ws_connect != NULL)
    132 		kore_runtime_wsconnect(req->owner->ws_connect, req->owner);
    133 }
    134 
    135 int
    136 kore_websocket_send_clean(struct netbuf *nb)
    137 {
    138 	kore_free(nb->buf);
    139 	return (0);
    140 }
    141 
    142 void
    143 kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
    144     size_t len)
    145 {
    146 	struct kore_buf		frame;
    147 
    148 	kore_buf_init(&frame, len);
    149 	websocket_frame_build(&frame, op, data, len);
    150 	net_send_stream(c, frame.data, frame.offset,
    151 	    kore_websocket_send_clean, NULL);
    152 
    153 	/* net_send_stream() takes over the buffer data pointer. */
    154 	frame.data = NULL;
    155 	kore_buf_cleanup(&frame);
    156 
    157 	net_send_flush(c);
    158 }
    159 
    160 void
    161 kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
    162     size_t len, int scope)
    163 {
    164 	struct connection	*c;
    165 	struct kore_buf		*frame;
    166 
    167 	frame = kore_buf_alloc(len);
    168 	websocket_frame_build(frame, op, data, len);
    169 
    170 	TAILQ_FOREACH(c, &connections, list) {
    171 		if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
    172 			net_send_queue(c, frame->data, frame->offset);
    173 			net_send_flush(c);
    174 		}
    175 	}
    176 
    177 	if (scope == WEBSOCKET_BROADCAST_GLOBAL) {
    178 		kore_msg_send(KORE_MSG_WORKER_ALL,
    179 		    KORE_MSG_WEBSOCKET, frame->data, frame->offset);
    180 	}
    181 
    182 	kore_buf_free(frame);
    183 }
    184 
    185 static void
    186 websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
    187     size_t len)
    188 {
    189 	u_int8_t		len_1;
    190 	u_int16_t		len16;
    191 	u_int64_t		len64;
    192 
    193 	if (len > WEBSOCKET_PAYLOAD_SINGLE) {
    194 		if (len <= USHRT_MAX)
    195 			len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
    196 		else
    197 			len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
    198 	} else {
    199 		len_1 = len;
    200 	}
    201 
    202 	op |= (1 << 7);
    203 	kore_buf_append(frame, &op, sizeof(op));
    204 
    205 	len_1 &= ~(1 << 7);
    206 	kore_buf_append(frame, &len_1, sizeof(len_1));
    207 
    208 	if (len_1 > WEBSOCKET_PAYLOAD_SINGLE) {
    209 		switch (len_1) {
    210 		case WEBSOCKET_PAYLOAD_EXTEND_1:
    211 			net_write16((u_int8_t *)&len16, len);
    212 			kore_buf_append(frame, &len16, sizeof(len16));
    213 			break;
    214 		case WEBSOCKET_PAYLOAD_EXTEND_2:
    215 			net_write64((u_int8_t *)&len64, len);
    216 			kore_buf_append(frame, &len64, sizeof(len64));
    217 			break;
    218 		}
    219 	}
    220 
    221 	if (data != NULL && len > 0)
    222 		kore_buf_append(frame, data, len);
    223 }
    224 
    225 static int
    226 websocket_recv_opcode(struct netbuf *nb)
    227 {
    228 	u_int8_t		op, len;
    229 	struct connection	*c = nb->owner;
    230 
    231 	if (!WEBSOCKET_HAS_MASK(nb->buf[1]))
    232 		return (KORE_RESULT_ERROR);
    233 
    234 	if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
    235 	    WEBSOCKET_RSV(nb->buf[0], 3))
    236 		return (KORE_RESULT_ERROR);
    237 
    238 	len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
    239 
    240 	op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
    241 	switch (op) {
    242 	case WEBSOCKET_OP_CONT:
    243 	case WEBSOCKET_OP_TEXT:
    244 	case WEBSOCKET_OP_BINARY:
    245 		break;
    246 	case WEBSOCKET_OP_CLOSE:
    247 	case WEBSOCKET_OP_PING:
    248 	case WEBSOCKET_OP_PONG:
    249 		if (len > WEBSOCKET_PAYLOAD_SINGLE ||
    250 		    !WEBSOCKET_HAS_FINFLAG(nb->buf[0]))
    251 			return (KORE_RESULT_ERROR);
    252 		break;
    253 	default:
    254 		return (KORE_RESULT_ERROR);
    255 	}
    256 
    257 	switch (len) {
    258 	case WEBSOCKET_PAYLOAD_EXTEND_1:
    259 		len += sizeof(u_int16_t);
    260 		break;
    261 	case WEBSOCKET_PAYLOAD_EXTEND_2:
    262 		len += sizeof(u_int64_t);
    263 		break;
    264 	}
    265 
    266 	len += WEBSOCKET_MASK_LEN;
    267 	net_recv_expand(c, len, websocket_recv_frame);
    268 
    269 	return (KORE_RESULT_OK);
    270 }
    271 
    272 static int
    273 websocket_recv_frame(struct netbuf *nb)
    274 {
    275 	struct connection	*c;
    276 	int			ret;
    277 	u_int64_t		len, i, total;
    278 	u_int8_t		op, moff, extra;
    279 
    280 	c = nb->owner;
    281 	op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
    282 	len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
    283 
    284 	switch (len) {
    285 	case WEBSOCKET_PAYLOAD_EXTEND_1:
    286 		moff = 4;
    287 		extra = sizeof(u_int16_t);
    288 		len = net_read16(&nb->buf[2]);
    289 		break;
    290 	case WEBSOCKET_PAYLOAD_EXTEND_2:
    291 		moff = 10;
    292 		extra = sizeof(u_int64_t);
    293 		len = net_read64(&nb->buf[2]);
    294 		break;
    295 	default:
    296 		extra = 0;
    297 		moff = 2;
    298 		break;
    299 	}
    300 
    301 	if (len > kore_websocket_maxframe)
    302 		return (KORE_RESULT_ERROR);
    303 
    304 	extra += WEBSOCKET_FRAME_HDR;
    305 	total = len + extra + WEBSOCKET_MASK_LEN;
    306 	if (total > nb->b_len) {
    307 		total -= nb->b_len;
    308 		net_recv_expand(c, total, websocket_recv_frame);
    309 		return (KORE_RESULT_OK);
    310 	}
    311 
    312 	if (total != nb->b_len)
    313 		return (KORE_RESULT_ERROR);
    314 
    315 	for (i = 0; i < len; i++)
    316 		nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
    317 
    318 	ret = KORE_RESULT_OK;
    319 	switch (op) {
    320 	case WEBSOCKET_OP_PONG:
    321 		break;
    322 	case WEBSOCKET_OP_CONT:
    323 		ret = KORE_RESULT_ERROR;
    324 		kore_log(LOG_ERR,
    325 		    "%p: we do not support op 0x%02x yet", (void *)c, op);
    326 		break;
    327 	case WEBSOCKET_OP_TEXT:
    328 	case WEBSOCKET_OP_BINARY:
    329 		if (c->ws_message != NULL) {
    330 			kore_runtime_wsmessage(c->ws_message,
    331 			    c, op, &nb->buf[moff + 4], len);
    332 		}
    333 		break;
    334 	case WEBSOCKET_OP_CLOSE:
    335 		c->evt.flags &= ~KORE_EVENT_READ;
    336 		if (!(c->flags & CONN_WS_CLOSE_SENT)) {
    337 			c->flags |= CONN_WS_CLOSE_SENT;
    338 			kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
    339 		}
    340 		kore_connection_disconnect(c);
    341 		break;
    342 	case WEBSOCKET_OP_PING:
    343 		kore_websocket_send(c, WEBSOCKET_OP_PONG,
    344 		    &nb->buf[moff + 4], len);
    345 		break;
    346 	default:
    347 		return (KORE_RESULT_ERROR);
    348 	}
    349 
    350 	net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
    351 
    352 	return (ret);
    353 }
    354 
    355 static void
    356 websocket_disconnect(struct connection *c)
    357 {
    358 	if (c->ws_disconnect != NULL)
    359 		kore_runtime_wsdisconnect(c->ws_disconnect, c);
    360 
    361 	if (!(c->flags & CONN_WS_CLOSE_SENT)) {
    362 		c->flags |= CONN_WS_CLOSE_SENT;
    363 		c->evt.flags &= ~KORE_EVENT_READ;
    364 		kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
    365 	}
    366 }