kore

An easy to use, scalable and secure web application framework for writing web APIs in C.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

websocket.c (9819B)



      1 /*
      2  * Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/types.h>
     19 
     20 #include <limits.h>
     21 #include <string.h>
     22 
     23 #include "kore.h"
     24 #include "http.h"
     25 #include "sha1.h"
     26 
     27 #define WEBSOCKET_FRAME_HDR		2
     28 #define WEBSOCKET_MASK_LEN		4
     29 #define WEBSOCKET_FRAME_MAXLEN		16384
     30 #define WEBSOCKET_PAYLOAD_SINGLE	125
     31 #define WEBSOCKET_PAYLOAD_EXTEND_1	126
     32 #define WEBSOCKET_PAYLOAD_EXTEND_2	127
     33 #define WEBSOCKET_OPCODE_MASK		0x0f
     34 #define WEBSOCKET_FRAME_LENGTH(x)	((x) & ~(1 << 7))
     35 #define WEBSOCKET_HAS_MASK(x)		((x) & (1 << 7))
     36 #define WEBSOCKET_HAS_FINFLAG(x)	((x) & (1 << 7))
     37 #define WEBSOCKET_RSV(x, i)		((x) & (1 << (7 - i)))
     38 
     39 #define WEBSOCKET_SERVER_RESPONSE	"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
     40 
     41 
     42 u_int64_t	kore_websocket_timeout = 120000;
     43 u_int64_t	kore_websocket_maxframe = 16384;
     44 
     45 static int	websocket_recv_frame(struct netbuf *);
     46 static int	websocket_recv_opcode(struct netbuf *);
     47 static void	websocket_disconnect(struct connection *);
     48 static void	websocket_frame_build(struct kore_buf *, u_int8_t,
     49 		    const void *, size_t);
     50 
     51 void
     52 kore_websocket_handshake(struct http_request *req, const char *onconnect,
     53     const char *onmessage, const char *ondisconnect)
     54 {
     55 	SHA1_CTX		sctx;
     56 	struct kore_buf		*buf;
     57 	char			*base64;
     58 	const char		*key, *version;
     59 	u_int8_t		digest[SHA1_DIGEST_LENGTH];
     60 
     61 	if (!http_request_header(req, "sec-websocket-key", &key)) {
     62 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     63 		return;
     64 	}
     65 
     66 	if (!http_request_header(req, "sec-websocket-version", &version)) {
     67 		http_response_header(req, "sec-websocket-version", "13");
     68 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     69 		return;
     70 	}
     71 
     72 	if (strcmp(version, "13")) {
     73 		http_response_header(req, "sec-websocket-version", "13");
     74 		http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
     75 		return;
     76 	}
     77 
     78 	buf = kore_buf_alloc(128);
     79 	kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
     80 
     81 	SHA1Init(&sctx);
     82 	SHA1Update(&sctx, buf->data, buf->offset);
     83 	SHA1Final(digest, &sctx);
     84 
     85 	kore_buf_free(buf);
     86 
     87 	if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
     88 		kore_debug("failed to base64 encode digest");
     89 		http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
     90 		return;
     91 	}
     92 
     93 	http_response_header(req, "upgrade", "websocket");
     94 	http_response_header(req, "connection", "upgrade");
     95 	http_response_header(req, "sec-websocket-accept", base64);
     96 	kore_free(base64);
     97 
     98 	kore_debug("%p: new websocket connection", req->owner);
     99 
    100 	req->owner->proto = CONN_PROTO_WEBSOCKET;
    101 	http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
    102 	net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
    103 
    104 	req->owner->disconnect = websocket_disconnect;
    105 	req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
    106 
    107 	req->owner->http_timeout = 0;
    108 	req->owner->idle_timer.start = kore_time_ms();
    109 	req->owner->idle_timer.length = kore_websocket_timeout;
    110 
    111 	if (onconnect != NULL) {
    112 		req->owner->ws_connect = kore_runtime_getcall(onconnect);
    113 		if (req->owner->ws_connect == NULL)
    114 			fatal("no symbol '%s' for ws_connect", onconnect);
    115 	} else {
    116 		req->owner->ws_connect = NULL;
    117 	}
    118 
    119 	if (onmessage != NULL) {
    120 		req->owner->ws_message = kore_runtime_getcall(onmessage);
    121 		if (req->owner->ws_message == NULL)
    122 			fatal("no symbol '%s' for ws_message", onmessage);
    123 	} else {
    124 		req->owner->ws_message = NULL;
    125 	}
    126 
    127 	if (ondisconnect != NULL) {
    128 		req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect);
    129 		if (req->owner->ws_disconnect == NULL)
    130 			fatal("no symbol '%s' for ws_disconnect", ondisconnect);
    131 	} else {
    132 		req->owner->ws_disconnect = NULL;
    133 	}
    134 
    135 	if (req->owner->ws_connect != NULL)
    136 		kore_runtime_wsconnect(req->owner->ws_connect, req->owner);
    137 }
    138 
    139 int
    140 kore_websocket_send_clean(struct netbuf *nb)
    141 {
    142 	kore_free(nb->buf);
    143 	return (0);
    144 }
    145 
    146 void
    147 kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
    148     size_t len)
    149 {
    150 	struct kore_buf		frame;
    151 
    152 	kore_buf_init(&frame, len);
    153 	websocket_frame_build(&frame, op, data, len);
    154 	net_send_stream(c, frame.data, frame.offset,
    155 	    kore_websocket_send_clean, NULL);
    156 
    157 	/* net_send_stream() takes over the buffer data pointer. */
    158 	frame.data = NULL;
    159 	kore_buf_cleanup(&frame);
    160 
    161 	net_send_flush(c);
    162 }
    163 
    164 void
    165 kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
    166     size_t len, int scope)
    167 {
    168 	struct connection	*c;
    169 	struct kore_buf		*frame;
    170 
    171 	frame = kore_buf_alloc(len);
    172 	websocket_frame_build(frame, op, data, len);
    173 
    174 	TAILQ_FOREACH(c, &connections, list) {
    175 		if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
    176 			net_send_queue(c, frame->data, frame->offset);
    177 			net_send_flush(c);
    178 		}
    179 	}
    180 
    181 	if (scope == WEBSOCKET_BROADCAST_GLOBAL) {
    182 		kore_msg_send(KORE_MSG_WORKER_ALL,
    183 		    KORE_MSG_WEBSOCKET, frame->data, frame->offset);
    184 	}
    185 
    186 	kore_buf_free(frame);
    187 }
    188 
    189 static void
    190 websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
    191     size_t len)
    192 {
    193 	u_int8_t		len_1;
    194 	u_int16_t		len16;
    195 	u_int64_t		len64;
    196 
    197 	if (len > WEBSOCKET_PAYLOAD_SINGLE) {
    198 		if (len <= USHRT_MAX)
    199 			len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
    200 		else
    201 			len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
    202 	} else {
    203 		len_1 = len;
    204 	}
    205 
    206 	op |= (1 << 7);
    207 	kore_buf_append(frame, &op, sizeof(op));
    208 
    209 	len_1 &= ~(1 << 7);
    210 	kore_buf_append(frame, &len_1, sizeof(len_1));
    211 
    212 	if (len_1 > WEBSOCKET_PAYLOAD_SINGLE) {
    213 		switch (len_1) {
    214 		case WEBSOCKET_PAYLOAD_EXTEND_1:
    215 			net_write16((u_int8_t *)&len16, len);
    216 			kore_buf_append(frame, &len16, sizeof(len16));
    217 			break;
    218 		case WEBSOCKET_PAYLOAD_EXTEND_2:
    219 			net_write64((u_int8_t *)&len64, len);
    220 			kore_buf_append(frame, &len64, sizeof(len64));
    221 			break;
    222 		}
    223 	}
    224 
    225 	if (data != NULL && len > 0)
    226 		kore_buf_append(frame, data, len);
    227 }
    228 
    229 static int
    230 websocket_recv_opcode(struct netbuf *nb)
    231 {
    232 	u_int8_t		op, len;
    233 	struct connection	*c = nb->owner;
    234 
    235 	if (!WEBSOCKET_HAS_MASK(nb->buf[1])) {
    236 		kore_debug("%p: frame did not have a mask set", c);
    237 		return (KORE_RESULT_ERROR);
    238 	}
    239 
    240 	if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
    241 	    WEBSOCKET_RSV(nb->buf[0], 3)) {
    242 		kore_debug("%p: RSV bits are not zero", c);
    243 		return (KORE_RESULT_ERROR);
    244 	}
    245 
    246 	len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
    247 
    248 	op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
    249 	switch (op) {
    250 	case WEBSOCKET_OP_CONT:
    251 	case WEBSOCKET_OP_TEXT:
    252 	case WEBSOCKET_OP_BINARY:
    253 		break;
    254 	case WEBSOCKET_OP_CLOSE:
    255 	case WEBSOCKET_OP_PING:
    256 	case WEBSOCKET_OP_PONG:
    257 		if (len > WEBSOCKET_PAYLOAD_SINGLE ||
    258 		    !WEBSOCKET_HAS_FINFLAG(nb->buf[0])) {
    259 			kore_debug("%p: large or fragmented control frame", c);
    260 			return (KORE_RESULT_ERROR);
    261 		}
    262 		break;
    263 	default:
    264 		kore_debug("%p: bad websocket op %d", c, op);
    265 		return (KORE_RESULT_ERROR);
    266 	}
    267 
    268 	switch (len) {
    269 	case WEBSOCKET_PAYLOAD_EXTEND_1:
    270 		len += sizeof(u_int16_t);
    271 		break;
    272 	case WEBSOCKET_PAYLOAD_EXTEND_2:
    273 		len += sizeof(u_int64_t);
    274 		break;
    275 	}
    276 
    277 	len += WEBSOCKET_MASK_LEN;
    278 	net_recv_expand(c, len, websocket_recv_frame);
    279 
    280 	return (KORE_RESULT_OK);
    281 }
    282 
    283 static int
    284 websocket_recv_frame(struct netbuf *nb)
    285 {
    286 	struct connection	*c;
    287 	int			ret;
    288 	u_int64_t		len, i, total;
    289 	u_int8_t		op, moff, extra;
    290 
    291 	c = nb->owner;
    292 	op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
    293 	len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
    294 
    295 	switch (len) {
    296 	case WEBSOCKET_PAYLOAD_EXTEND_1:
    297 		moff = 4;
    298 		extra = sizeof(u_int16_t);
    299 		len = net_read16(&nb->buf[2]);
    300 		break;
    301 	case WEBSOCKET_PAYLOAD_EXTEND_2:
    302 		moff = 10;
    303 		extra = sizeof(u_int64_t);
    304 		len = net_read64(&nb->buf[2]);
    305 		break;
    306 	default:
    307 		extra = 0;
    308 		moff = 2;
    309 		break;
    310 	}
    311 
    312 	if (len > kore_websocket_maxframe) {
    313 		kore_debug("%p: frame too big", c);
    314 		return (KORE_RESULT_ERROR);
    315 	}
    316 
    317 	extra += WEBSOCKET_FRAME_HDR;
    318 	total = len + extra + WEBSOCKET_MASK_LEN;
    319 	if (total > nb->b_len) {
    320 		total -= nb->b_len;
    321 		net_recv_expand(c, total, websocket_recv_frame);
    322 		return (KORE_RESULT_OK);
    323 	}
    324 
    325 	if (total != nb->b_len)
    326 		return (KORE_RESULT_ERROR);
    327 
    328 	for (i = 0; i < len; i++)
    329 		nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
    330 
    331 	ret = KORE_RESULT_OK;
    332 	switch (op) {
    333 	case WEBSOCKET_OP_PONG:
    334 		break;
    335 	case WEBSOCKET_OP_CONT:
    336 		ret = KORE_RESULT_ERROR;
    337 		kore_log(LOG_ERR,
    338 		    "%p: we do not support op 0x%02x yet", (void *)c, op);
    339 		break;
    340 	case WEBSOCKET_OP_TEXT:
    341 	case WEBSOCKET_OP_BINARY:
    342 		if (c->ws_message != NULL) {
    343 			kore_runtime_wsmessage(c->ws_message,
    344 			    c, op, &nb->buf[moff + 4], len);
    345 		}
    346 		break;
    347 	case WEBSOCKET_OP_CLOSE:
    348 		c->evt.flags &= ~KORE_EVENT_READ;
    349 		if (!(c->flags & CONN_WS_CLOSE_SENT)) {
    350 			c->flags |= CONN_WS_CLOSE_SENT;
    351 			kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
    352 		}
    353 		kore_connection_disconnect(c);
    354 		break;
    355 	case WEBSOCKET_OP_PING:
    356 		kore_websocket_send(c, WEBSOCKET_OP_PONG,
    357 		    &nb->buf[moff + 4], len);
    358 		break;
    359 	default:
    360 		kore_debug("%p: bad websocket op %d", c, op);
    361 		return (KORE_RESULT_ERROR);
    362 	}
    363 
    364 	net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
    365 
    366 	return (ret);
    367 }
    368 
    369 static void
    370 websocket_disconnect(struct connection *c)
    371 {
    372 	if (c->ws_disconnect != NULL)
    373 		kore_runtime_wsdisconnect(c->ws_disconnect, c);
    374 
    375 	if (!(c->flags & CONN_WS_CLOSE_SENT)) {
    376 		c->flags |= CONN_WS_CLOSE_SENT;
    377 		c->evt.flags &= ~KORE_EVENT_READ;
    378 		kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
    379 	}
    380 }