websocket.c (9361B)
1 /*
2 * Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/param.h>
18 #include <sys/types.h>
19
20 #include <limits.h>
21 #include <string.h>
22
23 #include "kore.h"
24 #include "http.h"
25 #include "sha1.h"
26
27 #define WEBSOCKET_FRAME_HDR 2
28 #define WEBSOCKET_MASK_LEN 4
29 #define WEBSOCKET_PAYLOAD_SINGLE 125
30 #define WEBSOCKET_PAYLOAD_EXTEND_1 126
31 #define WEBSOCKET_PAYLOAD_EXTEND_2 127
32 #define WEBSOCKET_OPCODE_MASK 0x0f
33 #define WEBSOCKET_FRAME_LENGTH(x) ((x) & ~(1 << 7))
34 #define WEBSOCKET_HAS_MASK(x) ((x) & (1 << 7))
35 #define WEBSOCKET_HAS_FINFLAG(x) ((x) & (1 << 7))
36 #define WEBSOCKET_RSV(x, i) ((x) & (1 << (7 - i)))
37
38 #define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
39
40
41 u_int64_t kore_websocket_timeout = 120000;
42 u_int64_t kore_websocket_maxframe = 16384;
43
44 static int websocket_recv_frame(struct netbuf *);
45 static int websocket_recv_opcode(struct netbuf *);
46 static void websocket_disconnect(struct connection *);
47 static void websocket_frame_build(struct kore_buf *, u_int8_t,
48 const void *, size_t);
49
50 void
51 kore_websocket_handshake(struct http_request *req, const char *onconnect,
52 const char *onmessage, const char *ondisconnect)
53 {
54 SHA1_CTX sctx;
55 struct kore_buf *buf;
56 char *base64;
57 const char *key, *version;
58 u_int8_t digest[SHA1_DIGEST_LENGTH];
59
60 if (!http_request_header(req, "sec-websocket-key", &key)) {
61 http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
62 return;
63 }
64
65 if (!http_request_header(req, "sec-websocket-version", &version)) {
66 http_response_header(req, "sec-websocket-version", "13");
67 http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
68 return;
69 }
70
71 if (strcmp(version, "13")) {
72 http_response_header(req, "sec-websocket-version", "13");
73 http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
74 return;
75 }
76
77 buf = kore_buf_alloc(128);
78 kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
79
80 SHA1Init(&sctx);
81 SHA1Update(&sctx, buf->data, buf->offset);
82 SHA1Final(digest, &sctx);
83
84 kore_buf_free(buf);
85
86 if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
87 http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
88 return;
89 }
90
91 http_response_header(req, "upgrade", "websocket");
92 http_response_header(req, "connection", "upgrade");
93 http_response_header(req, "sec-websocket-accept", base64);
94 kore_free(base64);
95
96 req->owner->proto = CONN_PROTO_WEBSOCKET;
97 http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
98 net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
99
100 req->owner->disconnect = websocket_disconnect;
101 req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
102
103 req->owner->http_timeout = 0;
104 req->owner->idle_timer.start = kore_time_ms();
105 req->owner->idle_timer.length = kore_websocket_timeout;
106
107 if (onconnect != NULL) {
108 req->owner->ws_connect = kore_runtime_getcall(onconnect);
109 if (req->owner->ws_connect == NULL)
110 fatal("no symbol '%s' for ws_connect", onconnect);
111 } else {
112 req->owner->ws_connect = NULL;
113 }
114
115 if (onmessage != NULL) {
116 req->owner->ws_message = kore_runtime_getcall(onmessage);
117 if (req->owner->ws_message == NULL)
118 fatal("no symbol '%s' for ws_message", onmessage);
119 } else {
120 req->owner->ws_message = NULL;
121 }
122
123 if (ondisconnect != NULL) {
124 req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect);
125 if (req->owner->ws_disconnect == NULL)
126 fatal("no symbol '%s' for ws_disconnect", ondisconnect);
127 } else {
128 req->owner->ws_disconnect = NULL;
129 }
130
131 if (req->owner->ws_connect != NULL)
132 kore_runtime_wsconnect(req->owner->ws_connect, req->owner);
133 }
134
135 int
136 kore_websocket_send_clean(struct netbuf *nb)
137 {
138 kore_free(nb->buf);
139 return (0);
140 }
141
142 void
143 kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
144 size_t len)
145 {
146 struct kore_buf frame;
147
148 kore_buf_init(&frame, len);
149 websocket_frame_build(&frame, op, data, len);
150 net_send_stream(c, frame.data, frame.offset,
151 kore_websocket_send_clean, NULL);
152
153 /* net_send_stream() takes over the buffer data pointer. */
154 frame.data = NULL;
155 kore_buf_cleanup(&frame);
156
157 net_send_flush(c);
158 }
159
160 void
161 kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
162 size_t len, int scope)
163 {
164 struct connection *c;
165 struct kore_buf *frame;
166
167 frame = kore_buf_alloc(len);
168 websocket_frame_build(frame, op, data, len);
169
170 TAILQ_FOREACH(c, &connections, list) {
171 if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
172 net_send_queue(c, frame->data, frame->offset);
173 net_send_flush(c);
174 }
175 }
176
177 if (scope == WEBSOCKET_BROADCAST_GLOBAL) {
178 kore_msg_send(KORE_MSG_WORKER_ALL,
179 KORE_MSG_WEBSOCKET, frame->data, frame->offset);
180 }
181
182 kore_buf_free(frame);
183 }
184
185 static void
186 websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
187 size_t len)
188 {
189 u_int8_t len_1;
190 u_int16_t len16;
191 u_int64_t len64;
192
193 if (len > WEBSOCKET_PAYLOAD_SINGLE) {
194 if (len <= USHRT_MAX)
195 len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
196 else
197 len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
198 } else {
199 len_1 = len;
200 }
201
202 op |= (1 << 7);
203 kore_buf_append(frame, &op, sizeof(op));
204
205 len_1 &= ~(1 << 7);
206 kore_buf_append(frame, &len_1, sizeof(len_1));
207
208 if (len_1 > WEBSOCKET_PAYLOAD_SINGLE) {
209 switch (len_1) {
210 case WEBSOCKET_PAYLOAD_EXTEND_1:
211 net_write16((u_int8_t *)&len16, len);
212 kore_buf_append(frame, &len16, sizeof(len16));
213 break;
214 case WEBSOCKET_PAYLOAD_EXTEND_2:
215 net_write64((u_int8_t *)&len64, len);
216 kore_buf_append(frame, &len64, sizeof(len64));
217 break;
218 }
219 }
220
221 if (data != NULL && len > 0)
222 kore_buf_append(frame, data, len);
223 }
224
225 static int
226 websocket_recv_opcode(struct netbuf *nb)
227 {
228 u_int8_t op, len;
229 struct connection *c = nb->owner;
230
231 if (!WEBSOCKET_HAS_MASK(nb->buf[1]))
232 return (KORE_RESULT_ERROR);
233
234 if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
235 WEBSOCKET_RSV(nb->buf[0], 3))
236 return (KORE_RESULT_ERROR);
237
238 len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
239
240 op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
241 switch (op) {
242 case WEBSOCKET_OP_CONT:
243 case WEBSOCKET_OP_TEXT:
244 case WEBSOCKET_OP_BINARY:
245 break;
246 case WEBSOCKET_OP_CLOSE:
247 case WEBSOCKET_OP_PING:
248 case WEBSOCKET_OP_PONG:
249 if (len > WEBSOCKET_PAYLOAD_SINGLE ||
250 !WEBSOCKET_HAS_FINFLAG(nb->buf[0]))
251 return (KORE_RESULT_ERROR);
252 break;
253 default:
254 return (KORE_RESULT_ERROR);
255 }
256
257 switch (len) {
258 case WEBSOCKET_PAYLOAD_EXTEND_1:
259 len += sizeof(u_int16_t);
260 break;
261 case WEBSOCKET_PAYLOAD_EXTEND_2:
262 len += sizeof(u_int64_t);
263 break;
264 }
265
266 len += WEBSOCKET_MASK_LEN;
267 net_recv_expand(c, len, websocket_recv_frame);
268
269 return (KORE_RESULT_OK);
270 }
271
272 static int
273 websocket_recv_frame(struct netbuf *nb)
274 {
275 struct connection *c;
276 int ret;
277 u_int64_t len, i, total;
278 u_int8_t op, moff, extra;
279
280 c = nb->owner;
281 op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
282 len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
283
284 switch (len) {
285 case WEBSOCKET_PAYLOAD_EXTEND_1:
286 moff = 4;
287 extra = sizeof(u_int16_t);
288 len = net_read16(&nb->buf[2]);
289 break;
290 case WEBSOCKET_PAYLOAD_EXTEND_2:
291 moff = 10;
292 extra = sizeof(u_int64_t);
293 len = net_read64(&nb->buf[2]);
294 break;
295 default:
296 extra = 0;
297 moff = 2;
298 break;
299 }
300
301 if (len > kore_websocket_maxframe)
302 return (KORE_RESULT_ERROR);
303
304 extra += WEBSOCKET_FRAME_HDR;
305 total = len + extra + WEBSOCKET_MASK_LEN;
306 if (total > nb->b_len) {
307 total -= nb->b_len;
308 net_recv_expand(c, total, websocket_recv_frame);
309 return (KORE_RESULT_OK);
310 }
311
312 if (total != nb->b_len)
313 return (KORE_RESULT_ERROR);
314
315 for (i = 0; i < len; i++)
316 nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
317
318 ret = KORE_RESULT_OK;
319 switch (op) {
320 case WEBSOCKET_OP_PONG:
321 break;
322 case WEBSOCKET_OP_CONT:
323 ret = KORE_RESULT_ERROR;
324 kore_log(LOG_ERR,
325 "%p: we do not support op 0x%02x yet", (void *)c, op);
326 break;
327 case WEBSOCKET_OP_TEXT:
328 case WEBSOCKET_OP_BINARY:
329 if (c->ws_message != NULL) {
330 kore_runtime_wsmessage(c->ws_message,
331 c, op, &nb->buf[moff + 4], len);
332 }
333 break;
334 case WEBSOCKET_OP_CLOSE:
335 c->evt.flags &= ~KORE_EVENT_READ;
336 if (!(c->flags & CONN_WS_CLOSE_SENT)) {
337 c->flags |= CONN_WS_CLOSE_SENT;
338 kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
339 }
340 kore_connection_disconnect(c);
341 break;
342 case WEBSOCKET_OP_PING:
343 kore_websocket_send(c, WEBSOCKET_OP_PONG,
344 &nb->buf[moff + 4], len);
345 break;
346 default:
347 return (KORE_RESULT_ERROR);
348 }
349
350 net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
351
352 return (ret);
353 }
354
355 static void
356 websocket_disconnect(struct connection *c)
357 {
358 if (c->ws_disconnect != NULL)
359 kore_runtime_wsdisconnect(c->ws_disconnect, c);
360
361 if (!(c->flags & CONN_WS_CLOSE_SENT)) {
362 c->flags |= CONN_WS_CLOSE_SENT;
363 c->evt.flags &= ~KORE_EVENT_READ;
364 kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
365 }
366 }