msg.c (7108B)
1 /*
2 * Copyright (c) 2015-2022 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/types.h>
18 #include <sys/socket.h>
19
20 #include <signal.h>
21
22 #include "kore.h"
23 #include "http.h"
24
25 #if defined(KORE_USE_ACME)
26 #include "acme.h"
27 #endif
28
29 struct msg_type {
30 u_int8_t id;
31 void (*cb)(struct kore_msg *, const void *);
32 TAILQ_ENTRY(msg_type) list;
33 };
34
35 static struct msg_type *msg_type_lookup(u_int8_t);
36 static int msg_recv_data(struct netbuf *);
37 static int msg_recv_packet(struct netbuf *);
38 static void msg_disconnected_worker(struct connection *);
39 static void msg_type_shutdown(struct kore_msg *, const void *);
40
41 #if !defined(KORE_NO_HTTP)
42 static void msg_type_websocket(struct kore_msg *, const void *);
43 #endif
44
45 static TAILQ_HEAD(, msg_type) msg_types;
46 static size_t cacheidx = 0;
47 static struct connection **conncache = NULL;
48
49 void
50 kore_msg_init(void)
51 {
52 TAILQ_INIT(&msg_types);
53 }
54
55 void
56 kore_msg_parent_init(void)
57 {
58 u_int8_t idx;
59 struct kore_worker *kw;
60
61 for (idx = 0; idx < worker_count; idx++) {
62 kw = kore_worker_data(idx);
63 if (kw->ps != NULL)
64 kore_msg_parent_add(kw);
65 }
66
67 kore_msg_register(KORE_MSG_FATALX, msg_type_shutdown);
68 kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
69 }
70
71 void
72 kore_msg_parent_add(struct kore_worker *kw)
73 {
74 kw->msg[0] = kore_connection_new(NULL);
75 kw->msg[0]->fd = kw->pipe[0];
76 kw->msg[0]->read = net_read;
77 kw->msg[0]->write = net_write;
78 kw->msg[0]->proto = CONN_PROTO_MSG;
79 kw->msg[0]->state = CONN_STATE_ESTABLISHED;
80 kw->msg[0]->hdlr_extra = &kw->id;
81 kw->msg[0]->disconnect = msg_disconnected_worker;
82 kw->msg[0]->handle = kore_connection_handle;
83
84 conncache = kore_realloc(conncache,
85 (cacheidx + 1) * sizeof(struct connection *));
86
87 conncache[cacheidx++] = kw->msg[0];
88
89 TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
90 kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
91
92 net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
93 }
94
95 void
96 kore_msg_parent_remove(struct kore_worker *kw)
97 {
98 kore_connection_disconnect(kw->msg[0]);
99 kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
100 (void)close(kw->pipe[1]);
101 }
102
103 void
104 kore_msg_worker_init(void)
105 {
106 #if !defined(KORE_NO_HTTP)
107 kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
108 #endif
109
110 worker->msg[1] = kore_connection_new(NULL);
111 worker->msg[1]->fd = worker->pipe[1];
112 worker->msg[1]->read = net_read;
113 worker->msg[1]->write = net_write;
114 worker->msg[1]->proto = CONN_PROTO_MSG;
115 worker->msg[1]->state = CONN_STATE_ESTABLISHED;
116 worker->msg[1]->handle = kore_connection_handle;
117 worker->msg[1]->evt.flags = KORE_EVENT_WRITE;
118
119 TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
120 kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
121
122 net_recv_queue(worker->msg[1],
123 sizeof(struct kore_msg), 0, msg_recv_packet);
124 }
125
126 void
127 kore_msg_unregister(u_int8_t id)
128 {
129 struct msg_type *type;
130
131 if ((type = msg_type_lookup(id)) == NULL)
132 return;
133
134 TAILQ_REMOVE(&msg_types, type, list);
135 kore_free(type);
136 }
137
138 int
139 kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
140 {
141 struct msg_type *type;
142
143 if (msg_type_lookup(id) != NULL)
144 return (KORE_RESULT_ERROR);
145
146 type = kore_malloc(sizeof(*type));
147 type->id = id;
148 type->cb = cb;
149 TAILQ_INSERT_TAIL(&msg_types, type, list);
150
151 return (KORE_RESULT_OK);
152 }
153
154 void
155 kore_msg_send(u_int16_t dst, u_int8_t id, const void *data, size_t len)
156 {
157 struct kore_msg m;
158 struct connection *c;
159 struct kore_worker *kw;
160
161 m.id = id;
162 m.dst = dst;
163 m.length = len;
164
165 if (worker == NULL) {
166 m.src = KORE_MSG_PARENT;
167
168 if ((kw = kore_worker_data_byid(dst)) == NULL) {
169 kore_log(LOG_NOTICE, "no such worker by id %u", dst);
170 return;
171 }
172
173 c = kw->msg[0];
174 m.dst = kw->id;
175 } else {
176 m.src = worker->id;
177 c = worker->msg[1];
178 }
179
180 net_send_queue(c, &m, sizeof(m));
181 if (data != NULL && len > 0)
182 net_send_queue(c, data, len);
183
184 net_send_flush(c);
185 }
186
187 static int
188 msg_recv_packet(struct netbuf *nb)
189 {
190 struct kore_msg *msg = (struct kore_msg *)nb->buf;
191
192 if (msg->length > 0) {
193 net_recv_expand(nb->owner, msg->length, msg_recv_data);
194 return (KORE_RESULT_OK);
195 }
196
197 return (msg_recv_data(nb));
198 }
199
200 static int
201 msg_recv_data(struct netbuf *nb)
202 {
203 size_t i;
204 struct connection *c;
205 struct msg_type *type;
206 int deliver;
207 u_int16_t dst, destination;
208 struct kore_msg *msg = (struct kore_msg *)nb->buf;
209
210 if ((type = msg_type_lookup(msg->id)) != NULL) {
211 if (worker == NULL && msg->dst != KORE_MSG_PARENT)
212 fatal("received parent msg for non parent dst");
213 if (worker != NULL && msg->dst != worker->id)
214 fatal("received message for incorrect worker");
215
216 if (msg->length > 0)
217 type->cb(msg, nb->buf + sizeof(*msg));
218 else
219 type->cb(msg, NULL);
220 }
221
222 if (worker == NULL && type == NULL) {
223 destination = msg->dst;
224
225 for (i = 0; i < cacheidx; i++) {
226 c = conncache[i];
227 if (c->proto != CONN_PROTO_MSG)
228 fatal("connection not a msg connection");
229
230 /*
231 * If hdlr_extra is NULL it just means the worker
232 * never started, ignore it.
233 */
234 if (c->hdlr_extra == NULL)
235 continue;
236
237 deliver = 1;
238 dst = *(u_int16_t *)c->hdlr_extra;
239
240 if (destination == KORE_MSG_WORKER_ALL) {
241 if (kore_keymgr_active && dst == 0)
242 deliver = 0;
243 } else {
244 if (dst != destination)
245 deliver = 0;
246 }
247
248 if (deliver == 0)
249 continue;
250
251 /* This allows the worker to receive the correct id. */
252 msg->dst = *(u_int16_t *)c->hdlr_extra;
253
254 net_send_queue(c, nb->buf, nb->s_off);
255 net_send_flush(c);
256 }
257 }
258
259 net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
260 return (KORE_RESULT_OK);
261 }
262
263 static void
264 msg_disconnected_worker(struct connection *c)
265 {
266 c->hdlr_extra = NULL;
267 }
268
269 static void
270 msg_type_shutdown(struct kore_msg *msg, const void *data)
271 {
272 if (!kore_quiet) {
273 kore_log(LOG_NOTICE,
274 "shutdown requested by worker %u, going down", msg->src);
275 }
276
277 if (msg->id == KORE_MSG_FATALX)
278 kore_quit = KORE_QUIT_FATAL;
279 else
280 kore_quit = KORE_QUIT_NORMAL;
281 }
282
283 #if !defined(KORE_NO_HTTP)
284 static void
285 msg_type_websocket(struct kore_msg *msg, const void *data)
286 {
287 struct connection *c;
288
289 TAILQ_FOREACH(c, &connections, list) {
290 if (c->proto == CONN_PROTO_WEBSOCKET) {
291 net_send_queue(c, data, msg->length);
292 net_send_flush(c);
293 }
294 }
295 }
296 #endif
297
298 static struct msg_type *
299 msg_type_lookup(u_int8_t id)
300 {
301 struct msg_type *type;
302
303 TAILQ_FOREACH(type, &msg_types, list) {
304 if (type->id == id)
305 return (type);
306 }
307
308 return (NULL);
309 }