kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

msg.c (7108B)



      1 /*
      2  * Copyright (c) 2015-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/types.h>
     18 #include <sys/socket.h>
     19 
     20 #include <signal.h>
     21 
     22 #include "kore.h"
     23 #include "http.h"
     24 
     25 #if defined(KORE_USE_ACME)
     26 #include "acme.h"
     27 #endif
     28 
     29 struct msg_type {
     30 	u_int8_t		id;
     31 	void			(*cb)(struct kore_msg *, const void *);
     32 	TAILQ_ENTRY(msg_type)	list;
     33 };
     34 
     35 static struct msg_type	*msg_type_lookup(u_int8_t);
     36 static int		msg_recv_data(struct netbuf *);
     37 static int		msg_recv_packet(struct netbuf *);
     38 static void		msg_disconnected_worker(struct connection *);
     39 static void		msg_type_shutdown(struct kore_msg *, const void *);
     40 
     41 #if !defined(KORE_NO_HTTP)
     42 static void		msg_type_websocket(struct kore_msg *, const void *);
     43 #endif
     44 
     45 static TAILQ_HEAD(, msg_type)	msg_types;
     46 static size_t			cacheidx = 0;
     47 static struct connection	**conncache = NULL;
     48 
     49 void
     50 kore_msg_init(void)
     51 {
     52 	TAILQ_INIT(&msg_types);
     53 }
     54 
     55 void
     56 kore_msg_parent_init(void)
     57 {
     58 	u_int8_t		idx;
     59 	struct kore_worker	*kw;
     60 
     61 	for (idx = 0; idx < worker_count; idx++) {
     62 		kw = kore_worker_data(idx);
     63 		if (kw->ps != NULL)
     64 			kore_msg_parent_add(kw);
     65 	}
     66 
     67 	kore_msg_register(KORE_MSG_FATALX, msg_type_shutdown);
     68 	kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
     69 }
     70 
     71 void
     72 kore_msg_parent_add(struct kore_worker *kw)
     73 {
     74 	kw->msg[0] = kore_connection_new(NULL);
     75 	kw->msg[0]->fd = kw->pipe[0];
     76 	kw->msg[0]->read = net_read;
     77 	kw->msg[0]->write = net_write;
     78 	kw->msg[0]->proto = CONN_PROTO_MSG;
     79 	kw->msg[0]->state = CONN_STATE_ESTABLISHED;
     80 	kw->msg[0]->hdlr_extra = &kw->id;
     81 	kw->msg[0]->disconnect = msg_disconnected_worker;
     82 	kw->msg[0]->handle = kore_connection_handle;
     83 
     84 	conncache = kore_realloc(conncache,
     85 	    (cacheidx + 1) * sizeof(struct connection *));
     86 
     87 	conncache[cacheidx++] = kw->msg[0];
     88 
     89 	TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
     90 	kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
     91 
     92 	net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
     93 }
     94 
     95 void
     96 kore_msg_parent_remove(struct kore_worker *kw)
     97 {
     98 	kore_connection_disconnect(kw->msg[0]);
     99 	kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
    100 	(void)close(kw->pipe[1]);
    101 }
    102 
    103 void
    104 kore_msg_worker_init(void)
    105 {
    106 #if !defined(KORE_NO_HTTP)
    107 	kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
    108 #endif
    109 
    110 	worker->msg[1] = kore_connection_new(NULL);
    111 	worker->msg[1]->fd = worker->pipe[1];
    112 	worker->msg[1]->read = net_read;
    113 	worker->msg[1]->write = net_write;
    114 	worker->msg[1]->proto = CONN_PROTO_MSG;
    115 	worker->msg[1]->state = CONN_STATE_ESTABLISHED;
    116 	worker->msg[1]->handle = kore_connection_handle;
    117 	worker->msg[1]->evt.flags = KORE_EVENT_WRITE;
    118 
    119 	TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
    120 	kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
    121 
    122 	net_recv_queue(worker->msg[1],
    123 	    sizeof(struct kore_msg), 0, msg_recv_packet);
    124 }
    125 
    126 void
    127 kore_msg_unregister(u_int8_t id)
    128 {
    129 	struct msg_type		*type;
    130 
    131 	if ((type = msg_type_lookup(id)) == NULL)
    132 		return;
    133 
    134 	TAILQ_REMOVE(&msg_types, type, list);
    135 	kore_free(type);
    136 }
    137 
    138 int
    139 kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
    140 {
    141 	struct msg_type		*type;
    142 
    143 	if (msg_type_lookup(id) != NULL)
    144 		return (KORE_RESULT_ERROR);
    145 
    146 	type = kore_malloc(sizeof(*type));
    147 	type->id = id;
    148 	type->cb = cb;
    149 	TAILQ_INSERT_TAIL(&msg_types, type, list);
    150 
    151 	return (KORE_RESULT_OK);
    152 }
    153 
    154 void
    155 kore_msg_send(u_int16_t dst, u_int8_t id, const void *data, size_t len)
    156 {
    157 	struct kore_msg		m;
    158 	struct connection	*c;
    159 	struct kore_worker	*kw;
    160 
    161 	m.id = id;
    162 	m.dst = dst;
    163 	m.length = len;
    164 
    165 	if (worker == NULL) {
    166 		m.src = KORE_MSG_PARENT;
    167 
    168 		if ((kw = kore_worker_data_byid(dst)) == NULL) {
    169 			kore_log(LOG_NOTICE, "no such worker by id %u", dst);
    170 			return;
    171 		}
    172 
    173 		c = kw->msg[0];
    174 		m.dst = kw->id;
    175 	} else {
    176 		m.src = worker->id;
    177 		c = worker->msg[1];
    178 	}
    179 
    180 	net_send_queue(c, &m, sizeof(m));
    181 	if (data != NULL && len > 0)
    182 		net_send_queue(c, data, len);
    183 
    184 	net_send_flush(c);
    185 }
    186 
    187 static int
    188 msg_recv_packet(struct netbuf *nb)
    189 {
    190 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
    191 
    192 	if (msg->length > 0) {
    193 		net_recv_expand(nb->owner, msg->length, msg_recv_data);
    194 		return (KORE_RESULT_OK);
    195 	}
    196 
    197 	return (msg_recv_data(nb));
    198 }
    199 
    200 static int
    201 msg_recv_data(struct netbuf *nb)
    202 {
    203 	size_t			i;
    204 	struct connection	*c;
    205 	struct msg_type		*type;
    206 	int			deliver;
    207 	u_int16_t		dst, destination;
    208 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
    209 
    210 	if ((type = msg_type_lookup(msg->id)) != NULL) {
    211 		if (worker == NULL && msg->dst != KORE_MSG_PARENT)
    212 			fatal("received parent msg for non parent dst");
    213 		if (worker != NULL && msg->dst != worker->id)
    214 			fatal("received message for incorrect worker");
    215 
    216 		if (msg->length > 0)
    217 			type->cb(msg, nb->buf + sizeof(*msg));
    218 		else
    219 			type->cb(msg, NULL);
    220 	}
    221 
    222 	if (worker == NULL && type == NULL) {
    223 		destination = msg->dst;
    224 
    225 		for (i = 0; i < cacheidx; i++) {
    226 			c = conncache[i];
    227 			if (c->proto != CONN_PROTO_MSG)
    228 				fatal("connection not a msg connection");
    229 
    230 			/*
    231 			 * If hdlr_extra is NULL it just means the worker
    232 			 * never started, ignore it.
    233 			 */
    234 			if (c->hdlr_extra == NULL)
    235 				continue;
    236 
    237 			deliver = 1;
    238 			dst = *(u_int16_t *)c->hdlr_extra;
    239 
    240 			if (destination == KORE_MSG_WORKER_ALL) {
    241 				if (kore_keymgr_active && dst == 0)
    242 					deliver = 0;
    243 			} else {
    244 				if (dst != destination)
    245 					deliver = 0;
    246 			}
    247 
    248 			if (deliver == 0)
    249 				continue;
    250 
    251 			/* This allows the worker to receive the correct id. */
    252 			msg->dst = *(u_int16_t *)c->hdlr_extra;
    253 
    254 			net_send_queue(c, nb->buf, nb->s_off);
    255 			net_send_flush(c);
    256 		}
    257 	}
    258 
    259 	net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
    260 	return (KORE_RESULT_OK);
    261 }
    262 
    263 static void
    264 msg_disconnected_worker(struct connection *c)
    265 {
    266 	c->hdlr_extra = NULL;
    267 }
    268 
    269 static void
    270 msg_type_shutdown(struct kore_msg *msg, const void *data)
    271 {
    272 	if (!kore_quiet) {
    273 		kore_log(LOG_NOTICE,
    274 		    "shutdown requested by worker %u, going down", msg->src);
    275 	}
    276 
    277 	if (msg->id == KORE_MSG_FATALX)
    278 		kore_quit = KORE_QUIT_FATAL;
    279 	else
    280 		kore_quit = KORE_QUIT_NORMAL;
    281 }
    282 
    283 #if !defined(KORE_NO_HTTP)
    284 static void
    285 msg_type_websocket(struct kore_msg *msg, const void *data)
    286 {
    287 	struct connection	*c;
    288 
    289 	TAILQ_FOREACH(c, &connections, list) {
    290 		if (c->proto == CONN_PROTO_WEBSOCKET) {
    291 			net_send_queue(c, data, msg->length);
    292 			net_send_flush(c);
    293 		}
    294 	}
    295 }
    296 #endif
    297 
    298 static struct msg_type *
    299 msg_type_lookup(u_int8_t id)
    300 {
    301 	struct msg_type		*type;
    302 
    303 	TAILQ_FOREACH(type, &msg_types, list) {
    304 		if (type->id == id)
    305 			return (type);
    306 	}
    307 
    308 	return (NULL);
    309 }