msg.c (6966B)
1 /*
2 * Copyright (c) 2015-2022 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/types.h>
18 #include <sys/socket.h>
19
20 #include <signal.h>
21
22 #include "kore.h"
23 #include "http.h"
24
25 #if defined(KORE_USE_ACME)
26 #include "acme.h"
27 #endif
28
29 struct msg_type {
30 u_int8_t id;
31 void (*cb)(struct kore_msg *, const void *);
32 TAILQ_ENTRY(msg_type) list;
33 };
34
35 static struct msg_type *msg_type_lookup(u_int8_t);
36 static int msg_recv_data(struct netbuf *);
37 static int msg_recv_packet(struct netbuf *);
38 static void msg_disconnected_worker(struct connection *);
39 static void msg_type_shutdown(struct kore_msg *, const void *);
40
41 #if !defined(KORE_NO_HTTP)
42 static void msg_type_websocket(struct kore_msg *, const void *);
43 #endif
44
45 static TAILQ_HEAD(, msg_type) msg_types;
46 static size_t cacheidx = 0;
47 static struct connection **conncache = NULL;
48
49 void
50 kore_msg_init(void)
51 {
52 TAILQ_INIT(&msg_types);
53 }
54
55 void
56 kore_msg_parent_init(void)
57 {
58 u_int8_t idx;
59 struct kore_worker *kw;
60
61 for (idx = 0; idx < worker_count; idx++) {
62 kw = kore_worker_data(idx);
63 if (kw->ps != NULL)
64 kore_msg_parent_add(kw);
65 }
66
67 kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
68 }
69
70 void
71 kore_msg_parent_add(struct kore_worker *kw)
72 {
73 kw->msg[0] = kore_connection_new(NULL);
74 kw->msg[0]->fd = kw->pipe[0];
75 kw->msg[0]->read = net_read;
76 kw->msg[0]->write = net_write;
77 kw->msg[0]->proto = CONN_PROTO_MSG;
78 kw->msg[0]->state = CONN_STATE_ESTABLISHED;
79 kw->msg[0]->hdlr_extra = &kw->id;
80 kw->msg[0]->disconnect = msg_disconnected_worker;
81 kw->msg[0]->handle = kore_connection_handle;
82
83 conncache = kore_realloc(conncache,
84 (cacheidx + 1) * sizeof(struct connection *));
85
86 conncache[cacheidx++] = kw->msg[0];
87
88 TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
89 kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
90
91 net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
92 }
93
94 void
95 kore_msg_parent_remove(struct kore_worker *kw)
96 {
97 kore_connection_disconnect(kw->msg[0]);
98 kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
99 (void)close(kw->pipe[1]);
100 }
101
102 void
103 kore_msg_worker_init(void)
104 {
105 #if !defined(KORE_NO_HTTP)
106 kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
107 #endif
108
109 worker->msg[1] = kore_connection_new(NULL);
110 worker->msg[1]->fd = worker->pipe[1];
111 worker->msg[1]->read = net_read;
112 worker->msg[1]->write = net_write;
113 worker->msg[1]->proto = CONN_PROTO_MSG;
114 worker->msg[1]->state = CONN_STATE_ESTABLISHED;
115 worker->msg[1]->handle = kore_connection_handle;
116 worker->msg[1]->evt.flags = KORE_EVENT_WRITE;
117
118 TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
119 kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
120
121 net_recv_queue(worker->msg[1],
122 sizeof(struct kore_msg), 0, msg_recv_packet);
123 }
124
125 void
126 kore_msg_unregister(u_int8_t id)
127 {
128 struct msg_type *type;
129
130 if ((type = msg_type_lookup(id)) == NULL)
131 return;
132
133 TAILQ_REMOVE(&msg_types, type, list);
134 kore_free(type);
135 }
136
137 int
138 kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
139 {
140 struct msg_type *type;
141
142 if (msg_type_lookup(id) != NULL)
143 return (KORE_RESULT_ERROR);
144
145 type = kore_malloc(sizeof(*type));
146 type->id = id;
147 type->cb = cb;
148 TAILQ_INSERT_TAIL(&msg_types, type, list);
149
150 return (KORE_RESULT_OK);
151 }
152
153 void
154 kore_msg_send(u_int16_t dst, u_int8_t id, const void *data, size_t len)
155 {
156 struct kore_msg m;
157 struct connection *c;
158 struct kore_worker *kw;
159
160 m.id = id;
161 m.dst = dst;
162 m.length = len;
163
164 if (worker == NULL) {
165 m.src = KORE_MSG_PARENT;
166
167 if ((kw = kore_worker_data_byid(dst)) == NULL) {
168 kore_log(LOG_NOTICE, "no such worker by id %u", dst);
169 return;
170 }
171
172 c = kw->msg[0];
173 m.dst = kw->id;
174 } else {
175 m.src = worker->id;
176 c = worker->msg[1];
177 }
178
179 net_send_queue(c, &m, sizeof(m));
180 if (data != NULL && len > 0)
181 net_send_queue(c, data, len);
182
183 net_send_flush(c);
184 }
185
186 static int
187 msg_recv_packet(struct netbuf *nb)
188 {
189 struct kore_msg *msg = (struct kore_msg *)nb->buf;
190
191 if (msg->length > 0) {
192 net_recv_expand(nb->owner, msg->length, msg_recv_data);
193 return (KORE_RESULT_OK);
194 }
195
196 return (msg_recv_data(nb));
197 }
198
199 static int
200 msg_recv_data(struct netbuf *nb)
201 {
202 size_t i;
203 struct connection *c;
204 struct msg_type *type;
205 int deliver;
206 u_int16_t dst, destination;
207 struct kore_msg *msg = (struct kore_msg *)nb->buf;
208
209 if ((type = msg_type_lookup(msg->id)) != NULL) {
210 if (worker == NULL && msg->dst != KORE_MSG_PARENT)
211 fatal("received parent msg for non parent dst");
212 if (worker != NULL && msg->dst != worker->id)
213 fatal("received message for incorrect worker");
214
215 if (msg->length > 0)
216 type->cb(msg, nb->buf + sizeof(*msg));
217 else
218 type->cb(msg, NULL);
219 }
220
221 if (worker == NULL && type == NULL) {
222 destination = msg->dst;
223
224 for (i = 0; i < cacheidx; i++) {
225 c = conncache[i];
226 if (c->proto != CONN_PROTO_MSG)
227 fatal("connection not a msg connection");
228
229 /*
230 * If hdlr_extra is NULL it just means the worker
231 * never started, ignore it.
232 */
233 if (c->hdlr_extra == NULL)
234 continue;
235
236 deliver = 1;
237 dst = *(u_int16_t *)c->hdlr_extra;
238
239 if (destination == KORE_MSG_WORKER_ALL) {
240 if (kore_keymgr_active && dst == 0)
241 deliver = 0;
242 } else {
243 if (dst != destination)
244 deliver = 0;
245 }
246
247 if (deliver == 0)
248 continue;
249
250 /* This allows the worker to receive the correct id. */
251 msg->dst = *(u_int16_t *)c->hdlr_extra;
252
253 net_send_queue(c, nb->buf, nb->s_off);
254 net_send_flush(c);
255 }
256 }
257
258 net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
259 return (KORE_RESULT_OK);
260 }
261
262 static void
263 msg_disconnected_worker(struct connection *c)
264 {
265 c->hdlr_extra = NULL;
266 }
267
268 static void
269 msg_type_shutdown(struct kore_msg *msg, const void *data)
270 {
271 if (!kore_quiet) {
272 kore_log(LOG_NOTICE,
273 "shutdown requested by worker %u, going down", msg->src);
274 }
275
276 kore_quit = 1;
277 }
278
279 #if !defined(KORE_NO_HTTP)
280 static void
281 msg_type_websocket(struct kore_msg *msg, const void *data)
282 {
283 struct connection *c;
284
285 TAILQ_FOREACH(c, &connections, list) {
286 if (c->proto == CONN_PROTO_WEBSOCKET) {
287 net_send_queue(c, data, msg->length);
288 net_send_flush(c);
289 }
290 }
291 }
292 #endif
293
294 static struct msg_type *
295 msg_type_lookup(u_int8_t id)
296 {
297 struct msg_type *type;
298
299 TAILQ_FOREACH(type, &msg_types, list) {
300 if (type->id == id)
301 return (type);
302 }
303
304 return (NULL);
305 }