kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

websocket.c (9360B)



      1 /*
      2  * Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/types.h>
     19 
     20 #include <limits.h>
     21 #include <string.h>
     22 
     23 #include "kore.h"
     24 #include "http.h"
     25 #include "sha1.h"
     26 
     27 #define WEBSOCKET_FRAME_HDR		2
     28 #define WEBSOCKET_MASK_LEN		4
     29 #define WEBSOCKET_PAYLOAD_SINGLE	125
     30 #define WEBSOCKET_PAYLOAD_EXTEND_1	126
     31 #define WEBSOCKET_PAYLOAD_EXTEND_2	127
     32 #define WEBSOCKET_OPCODE_MASK		0x0f
     33 #define WEBSOCKET_FRAME_LENGTH(x)	((x) & ~(1 << 7))
     34 #define WEBSOCKET_HAS_MASK(x)		((x) & (1 << 7))
     35 #define WEBSOCKET_HAS_FINFLAG(x)	((x) & (1 << 7))
     36 #define WEBSOCKET_RSV(x, i)		((x) & (1 << (7 - i)))
     37 
     38 #define WEBSOCKET_SERVER_RESPONSE	"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
     39 
     40 u_int64_t	kore_websocket_timeout = 120000;
     41 u_int64_t	kore_websocket_maxframe = 16384;
     42 
     43 static int	websocket_recv_frame(struct netbuf *);
     44 static int	websocket_recv_opcode(struct netbuf *);
     45 static void	websocket_disconnect(struct connection *);
     46 static void	websocket_frame_build(struct kore_buf *, u_int8_t,
     47 		    const void *, size_t);
     48 
     49 void
     50 kore_websocket_handshake(struct http_request *req, const char *onconnect,
     51     const char *onmessage, const char *ondisconnect)
     52 {
     53 	SHA1_CTX		sctx;
     54 	struct kore_buf		*buf;
     55 	char			*base64;
     56 	const char		*key, *version;
     57 	u_int8_t		digest[SHA1_DIGEST_LENGTH];
     58 
     59 	if (!http_request_header(req, "sec-websocket-key", &key)) {
     60 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     61 		return;
     62 	}
     63 
     64 	if (!http_request_header(req, "sec-websocket-version", &version)) {
     65 		http_response_header(req, "sec-websocket-version", "13");
     66 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     67 		return;
     68 	}
     69 
     70 	if (strcmp(version, "13")) {
     71 		http_response_header(req, "sec-websocket-version", "13");
     72 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     73 		return;
     74 	}
     75 
     76 	buf = kore_buf_alloc(128);
     77 	kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
     78 
     79 	SHA1Init(&sctx);
     80 	SHA1Update(&sctx, buf->data, buf->offset);
     81 	SHA1Final(digest, &sctx);
     82 
     83 	kore_buf_free(buf);
     84 
     85 	if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
     86 		http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
     87 		return;
     88 	}
     89 
     90 	http_response_header(req, "upgrade", "websocket");
     91 	http_response_header(req, "connection", "upgrade");
     92 	http_response_header(req, "sec-websocket-accept", base64);
     93 	kore_free(base64);
     94 
     95 	req->owner->proto = CONN_PROTO_WEBSOCKET;
     96 	http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
     97 	net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
     98 
     99 	req->owner->disconnect = websocket_disconnect;
    100 	req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
    101 
    102 	req->owner->http_timeout = 0;
    103 	req->owner->idle_timer.start = kore_time_ms();
    104 	req->owner->idle_timer.length = kore_websocket_timeout;
    105 
    106 	if (onconnect != NULL) {
    107 		req->owner->ws_connect = kore_runtime_getcall(onconnect);
    108 		if (req->owner->ws_connect == NULL)
    109 			fatal("no symbol '%s' for ws_connect", onconnect);
    110 	} else {
    111 		req->owner->ws_connect = NULL;
    112 	}
    113 
    114 	if (onmessage != NULL) {
    115 		req->owner->ws_message = kore_runtime_getcall(onmessage);
    116 		if (req->owner->ws_message == NULL)
    117 			fatal("no symbol '%s' for ws_message", onmessage);
    118 	} else {
    119 		req->owner->ws_message = NULL;
    120 	}
    121 
    122 	if (ondisconnect != NULL) {
    123 		req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect);
    124 		if (req->owner->ws_disconnect == NULL)
    125 			fatal("no symbol '%s' for ws_disconnect", ondisconnect);
    126 	} else {
    127 		req->owner->ws_disconnect = NULL;
    128 	}
    129 
    130 	if (req->owner->ws_connect != NULL)
    131 		kore_runtime_wsconnect(req->owner->ws_connect, req->owner);
    132 }
    133 
    134 int
    135 kore_websocket_send_clean(struct netbuf *nb)
    136 {
    137 	kore_free(nb->buf);
    138 	return (0);
    139 }
    140 
    141 void
    142 kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
    143     size_t len)
    144 {
    145 	struct kore_buf		frame;
    146 
    147 	kore_buf_init(&frame, len);
    148 	websocket_frame_build(&frame, op, data, len);
    149 	net_send_stream(c, frame.data, frame.offset,
    150 	    kore_websocket_send_clean, NULL);
    151 
    152 	/* net_send_stream() takes over the buffer data pointer. */
    153 	frame.data = NULL;
    154 	kore_buf_cleanup(&frame);
    155 
    156 	net_send_flush(c);
    157 }
    158 
    159 void
    160 kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
    161     size_t len, int scope)
    162 {
    163 	struct connection	*c;
    164 	struct kore_buf		*frame;
    165 
    166 	frame = kore_buf_alloc(len);
    167 	websocket_frame_build(frame, op, data, len);
    168 
    169 	TAILQ_FOREACH(c, &connections, list) {
    170 		if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
    171 			net_send_queue(c, frame->data, frame->offset);
    172 			net_send_flush(c);
    173 		}
    174 	}
    175 
    176 	if (scope == WEBSOCKET_BROADCAST_GLOBAL) {
    177 		kore_msg_send(KORE_MSG_WORKER_ALL,
    178 		    KORE_MSG_WEBSOCKET, frame->data, frame->offset);
    179 	}
    180 
    181 	kore_buf_free(frame);
    182 }
    183 
    184 static void
    185 websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
    186     size_t len)
    187 {
    188 	u_int8_t		len_1;
    189 	u_int16_t		len16;
    190 	u_int64_t		len64;
    191 
    192 	if (len > WEBSOCKET_PAYLOAD_SINGLE) {
    193 		if (len <= USHRT_MAX)
    194 			len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
    195 		else
    196 			len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
    197 	} else {
    198 		len_1 = len;
    199 	}
    200 
    201 	op |= (1 << 7);
    202 	kore_buf_append(frame, &op, sizeof(op));
    203 
    204 	len_1 &= ~(1 << 7);
    205 	kore_buf_append(frame, &len_1, sizeof(len_1));
    206 
    207 	if (len_1 > WEBSOCKET_PAYLOAD_SINGLE) {
    208 		switch (len_1) {
    209 		case WEBSOCKET_PAYLOAD_EXTEND_1:
    210 			net_write16((u_int8_t *)&len16, len);
    211 			kore_buf_append(frame, &len16, sizeof(len16));
    212 			break;
    213 		case WEBSOCKET_PAYLOAD_EXTEND_2:
    214 			net_write64((u_int8_t *)&len64, len);
    215 			kore_buf_append(frame, &len64, sizeof(len64));
    216 			break;
    217 		}
    218 	}
    219 
    220 	if (data != NULL && len > 0)
    221 		kore_buf_append(frame, data, len);
    222 }
    223 
    224 static int
    225 websocket_recv_opcode(struct netbuf *nb)
    226 {
    227 	u_int8_t		op, len;
    228 	struct connection	*c = nb->owner;
    229 
    230 	if (!WEBSOCKET_HAS_MASK(nb->buf[1]))
    231 		return (KORE_RESULT_ERROR);
    232 
    233 	if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
    234 	    WEBSOCKET_RSV(nb->buf[0], 3))
    235 		return (KORE_RESULT_ERROR);
    236 
    237 	len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
    238 
    239 	op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
    240 	switch (op) {
    241 	case WEBSOCKET_OP_CONT:
    242 	case WEBSOCKET_OP_TEXT:
    243 	case WEBSOCKET_OP_BINARY:
    244 		break;
    245 	case WEBSOCKET_OP_CLOSE:
    246 	case WEBSOCKET_OP_PING:
    247 	case WEBSOCKET_OP_PONG:
    248 		if (len > WEBSOCKET_PAYLOAD_SINGLE ||
    249 		    !WEBSOCKET_HAS_FINFLAG(nb->buf[0]))
    250 			return (KORE_RESULT_ERROR);
    251 		break;
    252 	default:
    253 		return (KORE_RESULT_ERROR);
    254 	}
    255 
    256 	switch (len) {
    257 	case WEBSOCKET_PAYLOAD_EXTEND_1:
    258 		len += sizeof(u_int16_t);
    259 		break;
    260 	case WEBSOCKET_PAYLOAD_EXTEND_2:
    261 		len += sizeof(u_int64_t);
    262 		break;
    263 	}
    264 
    265 	len += WEBSOCKET_MASK_LEN;
    266 	net_recv_expand(c, len, websocket_recv_frame);
    267 
    268 	return (KORE_RESULT_OK);
    269 }
    270 
    271 static int
    272 websocket_recv_frame(struct netbuf *nb)
    273 {
    274 	struct connection	*c;
    275 	int			ret;
    276 	u_int64_t		len, i, total;
    277 	u_int8_t		op, moff, extra;
    278 
    279 	c = nb->owner;
    280 	op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
    281 	len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
    282 
    283 	switch (len) {
    284 	case WEBSOCKET_PAYLOAD_EXTEND_1:
    285 		moff = 4;
    286 		extra = sizeof(u_int16_t);
    287 		len = net_read16(&nb->buf[2]);
    288 		break;
    289 	case WEBSOCKET_PAYLOAD_EXTEND_2:
    290 		moff = 10;
    291 		extra = sizeof(u_int64_t);
    292 		len = net_read64(&nb->buf[2]);
    293 		break;
    294 	default:
    295 		extra = 0;
    296 		moff = 2;
    297 		break;
    298 	}
    299 
    300 	if (len > kore_websocket_maxframe)
    301 		return (KORE_RESULT_ERROR);
    302 
    303 	extra += WEBSOCKET_FRAME_HDR;
    304 	total = len + extra + WEBSOCKET_MASK_LEN;
    305 	if (total > nb->b_len) {
    306 		total -= nb->b_len;
    307 		net_recv_expand(c, total, websocket_recv_frame);
    308 		return (KORE_RESULT_OK);
    309 	}
    310 
    311 	if (total != nb->b_len)
    312 		return (KORE_RESULT_ERROR);
    313 
    314 	for (i = 0; i < len; i++)
    315 		nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
    316 
    317 	ret = KORE_RESULT_OK;
    318 	switch (op) {
    319 	case WEBSOCKET_OP_PONG:
    320 		break;
    321 	case WEBSOCKET_OP_CONT:
    322 		ret = KORE_RESULT_ERROR;
    323 		kore_log(LOG_ERR,
    324 		    "%p: we do not support op 0x%02x yet", (void *)c, op);
    325 		break;
    326 	case WEBSOCKET_OP_TEXT:
    327 	case WEBSOCKET_OP_BINARY:
    328 		if (c->ws_message != NULL) {
    329 			kore_runtime_wsmessage(c->ws_message,
    330 			    c, op, &nb->buf[moff + 4], len);
    331 		}
    332 		break;
    333 	case WEBSOCKET_OP_CLOSE:
    334 		c->evt.flags &= ~KORE_EVENT_READ;
    335 		if (!(c->flags & CONN_WS_CLOSE_SENT)) {
    336 			c->flags |= CONN_WS_CLOSE_SENT;
    337 			kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
    338 		}
    339 		kore_connection_disconnect(c);
    340 		break;
    341 	case WEBSOCKET_OP_PING:
    342 		kore_websocket_send(c, WEBSOCKET_OP_PONG,
    343 		    &nb->buf[moff + 4], len);
    344 		break;
    345 	default:
    346 		return (KORE_RESULT_ERROR);
    347 	}
    348 
    349 	net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
    350 
    351 	return (ret);
    352 }
    353 
    354 static void
    355 websocket_disconnect(struct connection *c)
    356 {
    357 	if (c->ws_disconnect != NULL)
    358 		kore_runtime_wsdisconnect(c->ws_disconnect, c);
    359 
    360 	if (!(c->flags & CONN_WS_CLOSE_SENT)) {
    361 		c->flags |= CONN_WS_CLOSE_SENT;
    362 		c->evt.flags &= ~KORE_EVENT_READ;
    363 		kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
    364 	}
    365 }