kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

connection.c (9694B)



      1 /*
      2  * Copyright (c) 2013-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/socket.h>
     19 
     20 #include <netinet/tcp.h>
     21 
     22 #include <inttypes.h>
     23 #include <fcntl.h>
     24 
     25 #include "kore.h"
     26 #include "http.h"
     27 
     28 struct kore_pool		connection_pool;
     29 struct connection_list		connections;
     30 struct connection_list		disconnected;
     31 
     32 void
     33 kore_connection_init(void)
     34 {
     35 	u_int32_t	elm;
     36 
     37 	TAILQ_INIT(&connections);
     38 	TAILQ_INIT(&disconnected);
     39 
     40 	/* Add some overhead so we don't rollover for internal items. */
     41 	elm = worker_max_connections + 10;
     42 
     43 	kore_pool_init(&connection_pool, "connection_pool",
     44 	    sizeof(struct connection), elm);
     45 }
     46 
     47 void
     48 kore_connection_cleanup(void)
     49 {
     50 	/* Drop all connections */
     51 	kore_connection_prune(KORE_CONNECTION_PRUNE_ALL);
     52 	kore_pool_cleanup(&connection_pool);
     53 }
     54 
     55 struct connection *
     56 kore_connection_new(void *owner)
     57 {
     58 	struct connection	*c;
     59 
     60 	c = kore_pool_get(&connection_pool);
     61 
     62 	c->flags = 0;
     63 	c->rnb = NULL;
     64 	c->snb = NULL;
     65 	c->owner = owner;
     66 	c->handle = NULL;
     67 
     68 	c->tls = NULL;
     69 	c->tls_cert = NULL;
     70 	c->tls_reneg = 0;
     71 	c->tls_sni = NULL;
     72 
     73 	c->disconnect = NULL;
     74 	c->hdlr_extra = NULL;
     75 	c->proto = CONN_PROTO_UNKNOWN;
     76 	c->idle_timer.start = 0;
     77 	c->idle_timer.length = KORE_IDLE_TIMER_MAX;
     78 
     79 	c->evt.type = KORE_TYPE_CONNECTION;
     80 	c->evt.handle = kore_connection_event;
     81 
     82 #if !defined(KORE_NO_HTTP)
     83 	c->ws_connect = NULL;
     84 	c->ws_message = NULL;
     85 	c->ws_disconnect = NULL;
     86 	c->http_start = kore_time_ms();
     87 	c->http_timeout = http_header_timeout * 1000;
     88 	TAILQ_INIT(&(c->http_requests));
     89 #endif
     90 
     91 	TAILQ_INIT(&(c->send_queue));
     92 
     93 	return (c);
     94 }
     95 
     96 int
     97 kore_connection_accept(struct listener *listener, struct connection **out)
     98 {
     99 	struct connection	*c;
    100 	struct sockaddr		*s;
    101 	socklen_t		len;
    102 
    103 	*out = NULL;
    104 	c = kore_connection_new(listener);
    105 
    106 	c->family = listener->family;
    107 
    108 	switch (c->family) {
    109 	case AF_INET:
    110 		len = sizeof(struct sockaddr_in);
    111 		s = (struct sockaddr *)&(c->addr.ipv4);
    112 		break;
    113 	case AF_INET6:
    114 		len = sizeof(struct sockaddr_in6);
    115 		s = (struct sockaddr *)&(c->addr.ipv6);
    116 		break;
    117 	case AF_UNIX:
    118 		len = sizeof(struct sockaddr_un);
    119 		s = (struct sockaddr *)&(c->addr.sun);
    120 		break;
    121 	default:
    122 		fatal("unknown family type %d", c->family);
    123 	}
    124 
    125 	if ((c->fd = accept(listener->fd, s, &len)) == -1) {
    126 		kore_pool_put(&connection_pool, c);
    127 		return (KORE_RESULT_ERROR);
    128 	}
    129 
    130 	if (!kore_connection_nonblock(c->fd, listener->family != AF_UNIX)) {
    131 		close(c->fd);
    132 		kore_pool_put(&connection_pool, c);
    133 		return (KORE_RESULT_ERROR);
    134 	}
    135 
    136 	if (fcntl(c->fd, F_SETFD, FD_CLOEXEC) == -1) {
    137 		close(c->fd);
    138 		kore_pool_put(&connection_pool, c);
    139 		return (KORE_RESULT_ERROR);
    140 	}
    141 
    142 	c->handle = kore_connection_handle;
    143 	TAILQ_INSERT_TAIL(&connections, c, list);
    144 
    145 	if (listener->server->tls) {
    146 		c->state = CONN_STATE_TLS_SHAKE;
    147 		c->write = kore_tls_write;
    148 		c->read = kore_tls_read;
    149 	} else {
    150 		c->state = CONN_STATE_ESTABLISHED;
    151 		c->write = net_write;
    152 		c->read = net_read;
    153 
    154 		if (listener->connect != NULL) {
    155 			kore_runtime_connect(listener->connect, c);
    156 		} else {
    157 #if !defined(KORE_NO_HTTP)
    158 			c->proto = CONN_PROTO_HTTP;
    159 			if (http_keepalive_time != 0) {
    160 				c->idle_timer.length =
    161 				    http_keepalive_time * 1000;
    162 			}
    163 			net_recv_queue(c, http_header_max,
    164 			    NETBUF_CALL_CB_ALWAYS, http_header_recv);
    165 #endif
    166 		}
    167 	}
    168 
    169 	kore_connection_start_idletimer(c);
    170 	worker_active_connections++;
    171 
    172 	*out = c;
    173 	return (KORE_RESULT_OK);
    174 }
    175 
    176 void
    177 kore_connection_check_timeout(u_int64_t now)
    178 {
    179 	struct connection	*c, *next;
    180 
    181 	for (c = TAILQ_FIRST(&connections); c != NULL; c = next) {
    182 		next = TAILQ_NEXT(c, list);
    183 		if (c->proto == CONN_PROTO_MSG)
    184 			continue;
    185 #if !defined(KORE_NO_HTTP)
    186 		if (c->state == CONN_STATE_ESTABLISHED &&
    187 		    c->proto == CONN_PROTO_HTTP) {
    188 			if (!TAILQ_EMPTY(&c->send_queue))
    189 				continue;
    190 			if (!http_check_timeout(c, now))
    191 				continue;
    192 			if (!TAILQ_EMPTY(&c->http_requests))
    193 				continue;
    194 		}
    195 #endif
    196 		if (c->flags & CONN_IDLE_TIMER_ACT)
    197 			kore_connection_check_idletimer(now, c);
    198 	}
    199 }
    200 
    201 void
    202 kore_connection_prune(int all)
    203 {
    204 	struct connection	*c, *cnext;
    205 
    206 	if (all) {
    207 		for (c = TAILQ_FIRST(&connections); c != NULL; c = cnext) {
    208 			cnext = TAILQ_NEXT(c, list);
    209 			net_send_flush(c);
    210 			kore_connection_disconnect(c);
    211 		}
    212 	}
    213 
    214 	for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
    215 		cnext = TAILQ_NEXT(c, list);
    216 		TAILQ_REMOVE(&disconnected, c, list);
    217 		kore_connection_remove(c);
    218 	}
    219 }
    220 
    221 void
    222 kore_connection_disconnect(struct connection *c)
    223 {
    224 	if (c->state != CONN_STATE_DISCONNECTING) {
    225 		c->state = CONN_STATE_DISCONNECTING;
    226 		if (c->disconnect)
    227 			c->disconnect(c);
    228 
    229 		TAILQ_REMOVE(&connections, c, list);
    230 		TAILQ_INSERT_TAIL(&disconnected, c, list);
    231 	}
    232 }
    233 
    234 void
    235 kore_connection_event(void *arg, int error)
    236 {
    237 	struct connection	*c = arg;
    238 
    239 	if (error) {
    240 		kore_connection_disconnect(c);
    241 		return;
    242 	}
    243 
    244 	if (!c->handle(c))
    245 		kore_connection_disconnect(c);
    246 }
    247 
    248 int
    249 kore_connection_handle(struct connection *c)
    250 {
    251 	struct listener		*listener;
    252 
    253 	kore_connection_stop_idletimer(c);
    254 
    255 	switch (c->state) {
    256 	case CONN_STATE_TLS_SHAKE:
    257 		switch (kore_tls_connection_accept(c)) {
    258 		case KORE_RESULT_OK:
    259 			break;
    260 		case KORE_RESULT_RETRY:
    261 			return (KORE_RESULT_OK);
    262 		default:
    263 			return (KORE_RESULT_ERROR);
    264 		}
    265 
    266 		if (c->owner != NULL) {
    267 			listener = (struct listener *)c->owner;
    268 			if (listener->connect != NULL) {
    269 				kore_runtime_connect(listener->connect, c);
    270 				kore_connection_start_idletimer(c);
    271 				return (KORE_RESULT_OK);
    272 			}
    273 		}
    274 
    275 #if !defined(KORE_NO_HTTP)
    276 		c->proto = CONN_PROTO_HTTP;
    277 		if (http_keepalive_time != 0) {
    278 			c->idle_timer.length =
    279 			    http_keepalive_time * 1000;
    280 		}
    281 
    282 		net_recv_queue(c, http_header_max,
    283 		    NETBUF_CALL_CB_ALWAYS, http_header_recv);
    284 #endif
    285 
    286 		c->state = CONN_STATE_ESTABLISHED;
    287 		/* FALLTHROUGH */
    288 	case CONN_STATE_ESTABLISHED:
    289 		if (c->evt.flags & KORE_EVENT_READ) {
    290 			if (!net_recv_flush(c))
    291 				return (KORE_RESULT_ERROR);
    292 		}
    293 
    294 		if (c->evt.flags & KORE_EVENT_WRITE) {
    295 			if (!net_send_flush(c))
    296 				return (KORE_RESULT_ERROR);
    297 		}
    298 		break;
    299 	case CONN_STATE_DISCONNECTING:
    300 		break;
    301 	default:
    302 		break;
    303 	}
    304 
    305 	kore_connection_start_idletimer(c);
    306 
    307 	return (KORE_RESULT_OK);
    308 }
    309 
    310 void
    311 kore_connection_remove(struct connection *c)
    312 {
    313 	struct netbuf		*nb, *next;
    314 #if !defined(KORE_NO_HTTP)
    315 	struct http_request	*req, *rnext;
    316 #endif
    317 
    318 	kore_tls_connection_cleanup(c);
    319 
    320 	close(c->fd);
    321 
    322 	if (c->hdlr_extra != NULL)
    323 		kore_free(c->hdlr_extra);
    324 
    325 #if !defined(KORE_NO_HTTP)
    326 	for (req = TAILQ_FIRST(&(c->http_requests));
    327 	    req != NULL; req = rnext) {
    328 		rnext = TAILQ_NEXT(req, olist);
    329 		TAILQ_REMOVE(&(c->http_requests), req, olist);
    330 		req->owner = NULL;
    331 		req->flags |= HTTP_REQUEST_DELETE;
    332 		http_request_wakeup(req);
    333 	}
    334 
    335 	kore_free(c->ws_connect);
    336 	kore_free(c->ws_message);
    337 	kore_free(c->ws_disconnect);
    338 #endif
    339 
    340 	for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) {
    341 		next = TAILQ_NEXT(nb, list);
    342 		nb->flags &= ~NETBUF_MUST_RESEND;
    343 		net_remove_netbuf(c, nb);
    344 	}
    345 
    346 	if (c->rnb != NULL) {
    347 		kore_free(c->rnb->buf);
    348 		kore_pool_put(&nb_pool, c->rnb);
    349 	}
    350 
    351 	kore_pool_put(&connection_pool, c);
    352 	worker_active_connections--;
    353 }
    354 
    355 void
    356 kore_connection_check_idletimer(u_int64_t now, struct connection *c)
    357 {
    358 	u_int64_t	d;
    359 
    360 	if (now > c->idle_timer.start)
    361 		d = now - c->idle_timer.start;
    362 	else
    363 		d = 0;
    364 
    365 	if (d >= c->idle_timer.length)
    366 		kore_connection_disconnect(c);
    367 }
    368 
    369 void
    370 kore_connection_start_idletimer(struct connection *c)
    371 {
    372 	c->flags |= CONN_IDLE_TIMER_ACT;
    373 	c->idle_timer.start = kore_time_ms();
    374 }
    375 
    376 void
    377 kore_connection_stop_idletimer(struct connection *c)
    378 {
    379 	c->flags &= ~CONN_IDLE_TIMER_ACT;
    380 	c->idle_timer.start = 0;
    381 }
    382 
    383 int
    384 kore_connection_nonblock(int fd, int nodelay)
    385 {
    386 	int		flags;
    387 
    388 	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
    389 		return (KORE_RESULT_ERROR);
    390 
    391 	flags |= O_NONBLOCK;
    392 	if (fcntl(fd, F_SETFL, flags) == -1)
    393 		return (KORE_RESULT_ERROR);
    394 
    395 	if (nodelay) {
    396 		if (!kore_sockopt(fd, IPPROTO_TCP, TCP_NODELAY)) {
    397 			kore_log(LOG_NOTICE,
    398 			    "failed to set TCP_NODELAY on %d", fd);
    399 		}
    400 	}
    401 
    402 	return (KORE_RESULT_OK);
    403 }
    404 
    405 void
    406 kore_connection_log(struct connection *c, const char *fmt, ...)
    407 {
    408 	struct kore_buf		buf;
    409 	va_list			args;
    410 	char			*ptr;
    411 
    412 	kore_buf_init(&buf, 128);
    413 	kore_buf_appendf(&buf, "ip=[%s] msg=[", kore_connection_ip(c));
    414 
    415 	va_start(args, fmt);
    416 	kore_buf_appendv(&buf, fmt, args);
    417 	va_end(args);
    418 
    419 	kore_buf_appendf(&buf, "]");
    420 
    421 	ptr = kore_buf_stringify(&buf, NULL);
    422 	kore_log(LOG_NOTICE, "%s", ptr);
    423 	kore_free(ptr);
    424 }
    425 
    426 const char *
    427 kore_connection_ip(struct connection *c)
    428 {
    429 	static char	addr[INET6_ADDRSTRLEN];
    430 
    431 	memset(addr, 0, sizeof(addr));
    432 
    433 	switch (c->family) {
    434 	case AF_INET:
    435 		if (inet_ntop(c->family,
    436 		    &(c->addr.ipv4.sin_addr), addr, sizeof(addr)) == NULL)
    437 			fatal("inet_ntop: %s", errno_s);
    438 		break;
    439 	case AF_INET6:
    440 		if (inet_ntop(c->family,
    441 		    &(c->addr.ipv6.sin6_addr), addr, sizeof(addr)) == NULL)
    442 			fatal("inet_ntop: %s", errno_s);
    443 		break;
    444 	case AF_UNIX:
    445 		(void)kore_strlcpy(addr, "unix-socket", sizeof(addr));
    446 		break;
    447 	default:
    448 		fatal("unknown family %d", c->family);
    449 	}
    450 
    451 	return (addr);
    452 }