kore

An easy to use, scalable and secure web application framework for writing web APIs in C.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

msg.c (6966B)



      1 /*
      2  * Copyright (c) 2015-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/types.h>
     18 #include <sys/socket.h>
     19 
     20 #include <signal.h>
     21 
     22 #include "kore.h"
     23 #include "http.h"
     24 
     25 #if defined(KORE_USE_ACME)
     26 #include "acme.h"
     27 #endif
     28 
     29 struct msg_type {
     30 	u_int8_t		id;
     31 	void			(*cb)(struct kore_msg *, const void *);
     32 	TAILQ_ENTRY(msg_type)	list;
     33 };
     34 
     35 static struct msg_type	*msg_type_lookup(u_int8_t);
     36 static int		msg_recv_data(struct netbuf *);
     37 static int		msg_recv_packet(struct netbuf *);
     38 static void		msg_disconnected_worker(struct connection *);
     39 static void		msg_type_shutdown(struct kore_msg *, const void *);
     40 
     41 #if !defined(KORE_NO_HTTP)
     42 static void		msg_type_websocket(struct kore_msg *, const void *);
     43 #endif
     44 
     45 static TAILQ_HEAD(, msg_type)	msg_types;
     46 static size_t			cacheidx = 0;
     47 static struct connection	**conncache = NULL;
     48 
     49 void
     50 kore_msg_init(void)
     51 {
     52 	TAILQ_INIT(&msg_types);
     53 }
     54 
     55 void
     56 kore_msg_parent_init(void)
     57 {
     58 	u_int8_t		idx;
     59 	struct kore_worker	*kw;
     60 
     61 	for (idx = 0; idx < worker_count; idx++) {
     62 		kw = kore_worker_data(idx);
     63 		if (kw->ps != NULL)
     64 			kore_msg_parent_add(kw);
     65 	}
     66 
     67 	kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
     68 }
     69 
     70 void
     71 kore_msg_parent_add(struct kore_worker *kw)
     72 {
     73 	kw->msg[0] = kore_connection_new(NULL);
     74 	kw->msg[0]->fd = kw->pipe[0];
     75 	kw->msg[0]->read = net_read;
     76 	kw->msg[0]->write = net_write;
     77 	kw->msg[0]->proto = CONN_PROTO_MSG;
     78 	kw->msg[0]->state = CONN_STATE_ESTABLISHED;
     79 	kw->msg[0]->hdlr_extra = &kw->id;
     80 	kw->msg[0]->disconnect = msg_disconnected_worker;
     81 	kw->msg[0]->handle = kore_connection_handle;
     82 
     83 	conncache = kore_realloc(conncache,
     84 	    (cacheidx + 1) * sizeof(struct connection *));
     85 
     86 	conncache[cacheidx++] = kw->msg[0];
     87 
     88 	TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
     89 	kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
     90 
     91 	net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
     92 }
     93 
     94 void
     95 kore_msg_parent_remove(struct kore_worker *kw)
     96 {
     97 	kore_connection_disconnect(kw->msg[0]);
     98 	kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
     99 	(void)close(kw->pipe[1]);
    100 }
    101 
    102 void
    103 kore_msg_worker_init(void)
    104 {
    105 #if !defined(KORE_NO_HTTP)
    106 	kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
    107 #endif
    108 
    109 	worker->msg[1] = kore_connection_new(NULL);
    110 	worker->msg[1]->fd = worker->pipe[1];
    111 	worker->msg[1]->read = net_read;
    112 	worker->msg[1]->write = net_write;
    113 	worker->msg[1]->proto = CONN_PROTO_MSG;
    114 	worker->msg[1]->state = CONN_STATE_ESTABLISHED;
    115 	worker->msg[1]->handle = kore_connection_handle;
    116 	worker->msg[1]->evt.flags = KORE_EVENT_WRITE;
    117 
    118 	TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
    119 	kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
    120 
    121 	net_recv_queue(worker->msg[1],
    122 	    sizeof(struct kore_msg), 0, msg_recv_packet);
    123 }
    124 
    125 void
    126 kore_msg_unregister(u_int8_t id)
    127 {
    128 	struct msg_type		*type;
    129 
    130 	if ((type = msg_type_lookup(id)) == NULL)
    131 		return;
    132 
    133 	TAILQ_REMOVE(&msg_types, type, list);
    134 	kore_free(type);
    135 }
    136 
    137 int
    138 kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
    139 {
    140 	struct msg_type		*type;
    141 
    142 	if (msg_type_lookup(id) != NULL)
    143 		return (KORE_RESULT_ERROR);
    144 
    145 	type = kore_malloc(sizeof(*type));
    146 	type->id = id;
    147 	type->cb = cb;
    148 	TAILQ_INSERT_TAIL(&msg_types, type, list);
    149 
    150 	return (KORE_RESULT_OK);
    151 }
    152 
    153 void
    154 kore_msg_send(u_int16_t dst, u_int8_t id, const void *data, size_t len)
    155 {
    156 	struct kore_msg		m;
    157 	struct connection	*c;
    158 	struct kore_worker	*kw;
    159 
    160 	m.id = id;
    161 	m.dst = dst;
    162 	m.length = len;
    163 
    164 	if (worker == NULL) {
    165 		m.src = KORE_MSG_PARENT;
    166 
    167 		if ((kw = kore_worker_data_byid(dst)) == NULL) {
    168 			kore_log(LOG_NOTICE, "no such worker by id %u", dst);
    169 			return;
    170 		}
    171 
    172 		c = kw->msg[0];
    173 		m.dst = kw->id;
    174 	} else {
    175 		m.src = worker->id;
    176 		c = worker->msg[1];
    177 	}
    178 
    179 	net_send_queue(c, &m, sizeof(m));
    180 	if (data != NULL && len > 0)
    181 		net_send_queue(c, data, len);
    182 
    183 	net_send_flush(c);
    184 }
    185 
    186 static int
    187 msg_recv_packet(struct netbuf *nb)
    188 {
    189 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
    190 
    191 	if (msg->length > 0) {
    192 		net_recv_expand(nb->owner, msg->length, msg_recv_data);
    193 		return (KORE_RESULT_OK);
    194 	}
    195 
    196 	return (msg_recv_data(nb));
    197 }
    198 
    199 static int
    200 msg_recv_data(struct netbuf *nb)
    201 {
    202 	size_t			i;
    203 	struct connection	*c;
    204 	struct msg_type		*type;
    205 	int			deliver;
    206 	u_int16_t		dst, destination;
    207 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
    208 
    209 	if ((type = msg_type_lookup(msg->id)) != NULL) {
    210 		if (worker == NULL && msg->dst != KORE_MSG_PARENT)
    211 			fatal("received parent msg for non parent dst");
    212 		if (worker != NULL && msg->dst != worker->id)
    213 			fatal("received message for incorrect worker");
    214 
    215 		if (msg->length > 0)
    216 			type->cb(msg, nb->buf + sizeof(*msg));
    217 		else
    218 			type->cb(msg, NULL);
    219 	}
    220 
    221 	if (worker == NULL && type == NULL) {
    222 		destination = msg->dst;
    223 
    224 		for (i = 0; i < cacheidx; i++) {
    225 			c = conncache[i];
    226 			if (c->proto != CONN_PROTO_MSG)
    227 				fatal("connection not a msg connection");
    228 
    229 			/*
    230 			 * If hdlr_extra is NULL it just means the worker
    231 			 * never started, ignore it.
    232 			 */
    233 			if (c->hdlr_extra == NULL)
    234 				continue;
    235 
    236 			deliver = 1;
    237 			dst = *(u_int16_t *)c->hdlr_extra;
    238 
    239 			if (destination == KORE_MSG_WORKER_ALL) {
    240 				if (kore_keymgr_active && dst == 0)
    241 					deliver = 0;
    242 			} else {
    243 				if (dst != destination)
    244 					deliver = 0;
    245 			}
    246 
    247 			if (deliver == 0)
    248 				continue;
    249 
    250 			/* This allows the worker to receive the correct id. */
    251 			msg->dst = *(u_int16_t *)c->hdlr_extra;
    252 
    253 			net_send_queue(c, nb->buf, nb->s_off);
    254 			net_send_flush(c);
    255 		}
    256 	}
    257 
    258 	net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
    259 	return (KORE_RESULT_OK);
    260 }
    261 
    262 static void
    263 msg_disconnected_worker(struct connection *c)
    264 {
    265 	c->hdlr_extra = NULL;
    266 }
    267 
    268 static void
    269 msg_type_shutdown(struct kore_msg *msg, const void *data)
    270 {
    271 	if (!kore_quiet) {
    272 		kore_log(LOG_NOTICE,
    273 		    "shutdown requested by worker %u, going down", msg->src);
    274 	}
    275 
    276 	kore_quit = 1;
    277 }
    278 
    279 #if !defined(KORE_NO_HTTP)
    280 static void
    281 msg_type_websocket(struct kore_msg *msg, const void *data)
    282 {
    283 	struct connection	*c;
    284 
    285 	TAILQ_FOREACH(c, &connections, list) {
    286 		if (c->proto == CONN_PROTO_WEBSOCKET) {
    287 			net_send_queue(c, data, msg->length);
    288 			net_send_flush(c);
    289 		}
    290 	}
    291 }
    292 #endif
    293 
    294 static struct msg_type *
    295 msg_type_lookup(u_int8_t id)
    296 {
    297 	struct msg_type		*type;
    298 
    299 	TAILQ_FOREACH(type, &msg_types, list) {
    300 		if (type->id == id)
    301 			return (type);
    302 	}
    303 
    304 	return (NULL);
    305 }