websocket.c (9360B)
1 /*
2 * Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/param.h>
18 #include <sys/types.h>
19
20 #include <limits.h>
21 #include <string.h>
22
23 #include "kore.h"
24 #include "http.h"
25 #include "sha1.h"
26
27 #define WEBSOCKET_FRAME_HDR 2
28 #define WEBSOCKET_MASK_LEN 4
29 #define WEBSOCKET_PAYLOAD_SINGLE 125
30 #define WEBSOCKET_PAYLOAD_EXTEND_1 126
31 #define WEBSOCKET_PAYLOAD_EXTEND_2 127
32 #define WEBSOCKET_OPCODE_MASK 0x0f
33 #define WEBSOCKET_FRAME_LENGTH(x) ((x) & ~(1 << 7))
34 #define WEBSOCKET_HAS_MASK(x) ((x) & (1 << 7))
35 #define WEBSOCKET_HAS_FINFLAG(x) ((x) & (1 << 7))
36 #define WEBSOCKET_RSV(x, i) ((x) & (1 << (7 - i)))
37
38 #define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
39
40 u_int64_t kore_websocket_timeout = 120000;
41 u_int64_t kore_websocket_maxframe = 16384;
42
43 static int websocket_recv_frame(struct netbuf *);
44 static int websocket_recv_opcode(struct netbuf *);
45 static void websocket_disconnect(struct connection *);
46 static void websocket_frame_build(struct kore_buf *, u_int8_t,
47 const void *, size_t);
48
49 void
50 kore_websocket_handshake(struct http_request *req, const char *onconnect,
51 const char *onmessage, const char *ondisconnect)
52 {
53 SHA1_CTX sctx;
54 struct kore_buf *buf;
55 char *base64;
56 const char *key, *version;
57 u_int8_t digest[SHA1_DIGEST_LENGTH];
58
59 if (!http_request_header(req, "sec-websocket-key", &key)) {
60 http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
61 return;
62 }
63
64 if (!http_request_header(req, "sec-websocket-version", &version)) {
65 http_response_header(req, "sec-websocket-version", "13");
66 http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
67 return;
68 }
69
70 if (strcmp(version, "13")) {
71 http_response_header(req, "sec-websocket-version", "13");
72 http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
73 return;
74 }
75
76 buf = kore_buf_alloc(128);
77 kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
78
79 SHA1Init(&sctx);
80 SHA1Update(&sctx, buf->data, buf->offset);
81 SHA1Final(digest, &sctx);
82
83 kore_buf_free(buf);
84
85 if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
86 http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
87 return;
88 }
89
90 http_response_header(req, "upgrade", "websocket");
91 http_response_header(req, "connection", "upgrade");
92 http_response_header(req, "sec-websocket-accept", base64);
93 kore_free(base64);
94
95 req->owner->proto = CONN_PROTO_WEBSOCKET;
96 http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
97 net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
98
99 req->owner->disconnect = websocket_disconnect;
100 req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
101
102 req->owner->http_timeout = 0;
103 req->owner->idle_timer.start = kore_time_ms();
104 req->owner->idle_timer.length = kore_websocket_timeout;
105
106 if (onconnect != NULL) {
107 req->owner->ws_connect = kore_runtime_getcall(onconnect);
108 if (req->owner->ws_connect == NULL)
109 fatal("no symbol '%s' for ws_connect", onconnect);
110 } else {
111 req->owner->ws_connect = NULL;
112 }
113
114 if (onmessage != NULL) {
115 req->owner->ws_message = kore_runtime_getcall(onmessage);
116 if (req->owner->ws_message == NULL)
117 fatal("no symbol '%s' for ws_message", onmessage);
118 } else {
119 req->owner->ws_message = NULL;
120 }
121
122 if (ondisconnect != NULL) {
123 req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect);
124 if (req->owner->ws_disconnect == NULL)
125 fatal("no symbol '%s' for ws_disconnect", ondisconnect);
126 } else {
127 req->owner->ws_disconnect = NULL;
128 }
129
130 if (req->owner->ws_connect != NULL)
131 kore_runtime_wsconnect(req->owner->ws_connect, req->owner);
132 }
133
134 int
135 kore_websocket_send_clean(struct netbuf *nb)
136 {
137 kore_free(nb->buf);
138 return (0);
139 }
140
141 void
142 kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
143 size_t len)
144 {
145 struct kore_buf frame;
146
147 kore_buf_init(&frame, len);
148 websocket_frame_build(&frame, op, data, len);
149 net_send_stream(c, frame.data, frame.offset,
150 kore_websocket_send_clean, NULL);
151
152 /* net_send_stream() takes over the buffer data pointer. */
153 frame.data = NULL;
154 kore_buf_cleanup(&frame);
155
156 net_send_flush(c);
157 }
158
159 void
160 kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
161 size_t len, int scope)
162 {
163 struct connection *c;
164 struct kore_buf *frame;
165
166 frame = kore_buf_alloc(len);
167 websocket_frame_build(frame, op, data, len);
168
169 TAILQ_FOREACH(c, &connections, list) {
170 if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
171 net_send_queue(c, frame->data, frame->offset);
172 net_send_flush(c);
173 }
174 }
175
176 if (scope == WEBSOCKET_BROADCAST_GLOBAL) {
177 kore_msg_send(KORE_MSG_WORKER_ALL,
178 KORE_MSG_WEBSOCKET, frame->data, frame->offset);
179 }
180
181 kore_buf_free(frame);
182 }
183
184 static void
185 websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
186 size_t len)
187 {
188 u_int8_t len_1;
189 u_int16_t len16;
190 u_int64_t len64;
191
192 if (len > WEBSOCKET_PAYLOAD_SINGLE) {
193 if (len <= USHRT_MAX)
194 len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
195 else
196 len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
197 } else {
198 len_1 = len;
199 }
200
201 op |= (1 << 7);
202 kore_buf_append(frame, &op, sizeof(op));
203
204 len_1 &= ~(1 << 7);
205 kore_buf_append(frame, &len_1, sizeof(len_1));
206
207 if (len_1 > WEBSOCKET_PAYLOAD_SINGLE) {
208 switch (len_1) {
209 case WEBSOCKET_PAYLOAD_EXTEND_1:
210 net_write16((u_int8_t *)&len16, len);
211 kore_buf_append(frame, &len16, sizeof(len16));
212 break;
213 case WEBSOCKET_PAYLOAD_EXTEND_2:
214 net_write64((u_int8_t *)&len64, len);
215 kore_buf_append(frame, &len64, sizeof(len64));
216 break;
217 }
218 }
219
220 if (data != NULL && len > 0)
221 kore_buf_append(frame, data, len);
222 }
223
224 static int
225 websocket_recv_opcode(struct netbuf *nb)
226 {
227 u_int8_t op, len;
228 struct connection *c = nb->owner;
229
230 if (!WEBSOCKET_HAS_MASK(nb->buf[1]))
231 return (KORE_RESULT_ERROR);
232
233 if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
234 WEBSOCKET_RSV(nb->buf[0], 3))
235 return (KORE_RESULT_ERROR);
236
237 len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
238
239 op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
240 switch (op) {
241 case WEBSOCKET_OP_CONT:
242 case WEBSOCKET_OP_TEXT:
243 case WEBSOCKET_OP_BINARY:
244 break;
245 case WEBSOCKET_OP_CLOSE:
246 case WEBSOCKET_OP_PING:
247 case WEBSOCKET_OP_PONG:
248 if (len > WEBSOCKET_PAYLOAD_SINGLE ||
249 !WEBSOCKET_HAS_FINFLAG(nb->buf[0]))
250 return (KORE_RESULT_ERROR);
251 break;
252 default:
253 return (KORE_RESULT_ERROR);
254 }
255
256 switch (len) {
257 case WEBSOCKET_PAYLOAD_EXTEND_1:
258 len += sizeof(u_int16_t);
259 break;
260 case WEBSOCKET_PAYLOAD_EXTEND_2:
261 len += sizeof(u_int64_t);
262 break;
263 }
264
265 len += WEBSOCKET_MASK_LEN;
266 net_recv_expand(c, len, websocket_recv_frame);
267
268 return (KORE_RESULT_OK);
269 }
270
271 static int
272 websocket_recv_frame(struct netbuf *nb)
273 {
274 struct connection *c;
275 int ret;
276 u_int64_t len, i, total;
277 u_int8_t op, moff, extra;
278
279 c = nb->owner;
280 op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
281 len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
282
283 switch (len) {
284 case WEBSOCKET_PAYLOAD_EXTEND_1:
285 moff = 4;
286 extra = sizeof(u_int16_t);
287 len = net_read16(&nb->buf[2]);
288 break;
289 case WEBSOCKET_PAYLOAD_EXTEND_2:
290 moff = 10;
291 extra = sizeof(u_int64_t);
292 len = net_read64(&nb->buf[2]);
293 break;
294 default:
295 extra = 0;
296 moff = 2;
297 break;
298 }
299
300 if (len > kore_websocket_maxframe)
301 return (KORE_RESULT_ERROR);
302
303 extra += WEBSOCKET_FRAME_HDR;
304 total = len + extra + WEBSOCKET_MASK_LEN;
305 if (total > nb->b_len) {
306 total -= nb->b_len;
307 net_recv_expand(c, total, websocket_recv_frame);
308 return (KORE_RESULT_OK);
309 }
310
311 if (total != nb->b_len)
312 return (KORE_RESULT_ERROR);
313
314 for (i = 0; i < len; i++)
315 nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
316
317 ret = KORE_RESULT_OK;
318 switch (op) {
319 case WEBSOCKET_OP_PONG:
320 break;
321 case WEBSOCKET_OP_CONT:
322 ret = KORE_RESULT_ERROR;
323 kore_log(LOG_ERR,
324 "%p: we do not support op 0x%02x yet", (void *)c, op);
325 break;
326 case WEBSOCKET_OP_TEXT:
327 case WEBSOCKET_OP_BINARY:
328 if (c->ws_message != NULL) {
329 kore_runtime_wsmessage(c->ws_message,
330 c, op, &nb->buf[moff + 4], len);
331 }
332 break;
333 case WEBSOCKET_OP_CLOSE:
334 c->evt.flags &= ~KORE_EVENT_READ;
335 if (!(c->flags & CONN_WS_CLOSE_SENT)) {
336 c->flags |= CONN_WS_CLOSE_SENT;
337 kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
338 }
339 kore_connection_disconnect(c);
340 break;
341 case WEBSOCKET_OP_PING:
342 kore_websocket_send(c, WEBSOCKET_OP_PONG,
343 &nb->buf[moff + 4], len);
344 break;
345 default:
346 return (KORE_RESULT_ERROR);
347 }
348
349 net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
350
351 return (ret);
352 }
353
354 static void
355 websocket_disconnect(struct connection *c)
356 {
357 if (c->ws_disconnect != NULL)
358 kore_runtime_wsdisconnect(c->ws_disconnect, c);
359
360 if (!(c->flags & CONN_WS_CLOSE_SENT)) {
361 c->flags |= CONN_WS_CLOSE_SENT;
362 c->evt.flags &= ~KORE_EVENT_READ;
363 kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
364 }
365 }