kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

connection.c (9643B)



      1 /*
      2  * Copyright (c) 2013-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/socket.h>
     19 
     20 #include <netinet/tcp.h>
     21 
     22 #include <inttypes.h>
     23 #include <fcntl.h>
     24 
     25 #include "kore.h"
     26 #include "http.h"
     27 
     28 struct kore_pool		connection_pool;
     29 struct connection_list		connections;
     30 struct connection_list		disconnected;
     31 
     32 void
     33 kore_connection_init(void)
     34 {
     35 	u_int32_t	elm;
     36 
     37 	TAILQ_INIT(&connections);
     38 	TAILQ_INIT(&disconnected);
     39 
     40 	/* Add some overhead so we don't rollover for internal items. */
     41 	elm = worker_max_connections + 10;
     42 
     43 	kore_pool_init(&connection_pool, "connection_pool",
     44 	    sizeof(struct connection), elm);
     45 }
     46 
     47 void
     48 kore_connection_cleanup(void)
     49 {
     50 	/* Drop all connections */
     51 	kore_connection_prune(KORE_CONNECTION_PRUNE_ALL);
     52 	kore_pool_cleanup(&connection_pool);
     53 }
     54 
     55 struct connection *
     56 kore_connection_new(void *owner)
     57 {
     58 	struct connection	*c;
     59 
     60 	c = kore_pool_get(&connection_pool);
     61 
     62 	c->flags = 0;
     63 	c->rnb = NULL;
     64 	c->snb = NULL;
     65 	c->owner = owner;
     66 	c->handle = NULL;
     67 
     68 	c->tls = NULL;
     69 	c->tls_cert = NULL;
     70 	c->tls_reneg = 0;
     71 	c->tls_sni = NULL;
     72 
     73 	c->disconnect = NULL;
     74 	c->hdlr_extra = NULL;
     75 	c->proto = CONN_PROTO_UNKNOWN;
     76 	c->idle_timer.start = 0;
     77 	c->idle_timer.length = KORE_IDLE_TIMER_MAX;
     78 
     79 	c->evt.type = KORE_TYPE_CONNECTION;
     80 	c->evt.handle = kore_connection_event;
     81 
     82 #if !defined(KORE_NO_HTTP)
     83 	c->ws_connect = NULL;
     84 	c->ws_message = NULL;
     85 	c->ws_disconnect = NULL;
     86 	c->http_start = kore_time_ms();
     87 	c->http_timeout = http_header_timeout * 1000;
     88 	TAILQ_INIT(&(c->http_requests));
     89 #endif
     90 
     91 	TAILQ_INIT(&(c->send_queue));
     92 
     93 	return (c);
     94 }
     95 
     96 int
     97 kore_connection_accept(struct listener *listener, struct connection **out)
     98 {
     99 	struct connection	*c;
    100 	struct sockaddr		*s;
    101 	socklen_t		len;
    102 
    103 	*out = NULL;
    104 	c = kore_connection_new(listener);
    105 
    106 	c->family = listener->family;
    107 
    108 	switch (c->family) {
    109 	case AF_INET:
    110 		len = sizeof(struct sockaddr_in);
    111 		s = (struct sockaddr *)&(c->addr.ipv4);
    112 		break;
    113 	case AF_INET6:
    114 		len = sizeof(struct sockaddr_in6);
    115 		s = (struct sockaddr *)&(c->addr.ipv6);
    116 		break;
    117 	case AF_UNIX:
    118 		len = sizeof(struct sockaddr_un);
    119 		s = (struct sockaddr *)&(c->addr.sun);
    120 		break;
    121 	default:
    122 		fatal("unknown family type %d", c->family);
    123 	}
    124 
    125 	if ((c->fd = accept(listener->fd, s, &len)) == -1) {
    126 		kore_pool_put(&connection_pool, c);
    127 		return (KORE_RESULT_ERROR);
    128 	}
    129 
    130 	if (!kore_connection_nonblock(c->fd, listener->family != AF_UNIX)) {
    131 		close(c->fd);
    132 		kore_pool_put(&connection_pool, c);
    133 		return (KORE_RESULT_ERROR);
    134 	}
    135 
    136 	if (fcntl(c->fd, F_SETFD, FD_CLOEXEC) == -1) {
    137 		close(c->fd);
    138 		kore_pool_put(&connection_pool, c);
    139 		return (KORE_RESULT_ERROR);
    140 	}
    141 
    142 	c->handle = kore_connection_handle;
    143 	TAILQ_INSERT_TAIL(&connections, c, list);
    144 
    145 	if (listener->server->tls) {
    146 		c->state = CONN_STATE_TLS_SHAKE;
    147 		c->write = kore_tls_write;
    148 		c->read = kore_tls_read;
    149 	} else {
    150 		c->state = CONN_STATE_ESTABLISHED;
    151 		c->write = net_write;
    152 		c->read = net_read;
    153 
    154 		if (listener->connect != NULL) {
    155 			kore_runtime_connect(listener->connect, c);
    156 		} else {
    157 #if !defined(KORE_NO_HTTP)
    158 			c->proto = CONN_PROTO_HTTP;
    159 			if (http_keepalive_time != 0) {
    160 				c->idle_timer.length =
    161 				    http_keepalive_time * 1000;
    162 			}
    163 			net_recv_queue(c, http_header_max,
    164 			    NETBUF_CALL_CB_ALWAYS, http_header_recv);
    165 #endif
    166 		}
    167 	}
    168 
    169 	kore_connection_start_idletimer(c);
    170 	worker_active_connections++;
    171 
    172 	*out = c;
    173 	return (KORE_RESULT_OK);
    174 }
    175 
    176 void
    177 kore_connection_check_timeout(u_int64_t now)
    178 {
    179 	struct connection	*c, *next;
    180 
    181 	for (c = TAILQ_FIRST(&connections); c != NULL; c = next) {
    182 		next = TAILQ_NEXT(c, list);
    183 		if (c->proto == CONN_PROTO_MSG)
    184 			continue;
    185 #if !defined(KORE_NO_HTTP)
    186 		if (c->state == CONN_STATE_ESTABLISHED &&
    187 		    c->proto == CONN_PROTO_HTTP) {
    188 			if (!http_check_timeout(c, now))
    189 				continue;
    190 			if (!TAILQ_EMPTY(&c->http_requests))
    191 				continue;
    192 		}
    193 #endif
    194 		if (c->flags & CONN_IDLE_TIMER_ACT)
    195 			kore_connection_check_idletimer(now, c);
    196 	}
    197 }
    198 
    199 void
    200 kore_connection_prune(int all)
    201 {
    202 	struct connection	*c, *cnext;
    203 
    204 	if (all) {
    205 		for (c = TAILQ_FIRST(&connections); c != NULL; c = cnext) {
    206 			cnext = TAILQ_NEXT(c, list);
    207 			net_send_flush(c);
    208 			kore_connection_disconnect(c);
    209 		}
    210 	}
    211 
    212 	for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
    213 		cnext = TAILQ_NEXT(c, list);
    214 		TAILQ_REMOVE(&disconnected, c, list);
    215 		kore_connection_remove(c);
    216 	}
    217 }
    218 
    219 void
    220 kore_connection_disconnect(struct connection *c)
    221 {
    222 	if (c->state != CONN_STATE_DISCONNECTING) {
    223 		c->state = CONN_STATE_DISCONNECTING;
    224 		if (c->disconnect)
    225 			c->disconnect(c);
    226 
    227 		TAILQ_REMOVE(&connections, c, list);
    228 		TAILQ_INSERT_TAIL(&disconnected, c, list);
    229 	}
    230 }
    231 
    232 void
    233 kore_connection_event(void *arg, int error)
    234 {
    235 	struct connection	*c = arg;
    236 
    237 	if (error) {
    238 		kore_connection_disconnect(c);
    239 		return;
    240 	}
    241 
    242 	if (!c->handle(c))
    243 		kore_connection_disconnect(c);
    244 }
    245 
    246 int
    247 kore_connection_handle(struct connection *c)
    248 {
    249 	struct listener		*listener;
    250 
    251 	kore_connection_stop_idletimer(c);
    252 
    253 	switch (c->state) {
    254 	case CONN_STATE_TLS_SHAKE:
    255 		switch (kore_tls_connection_accept(c)) {
    256 		case KORE_RESULT_OK:
    257 			break;
    258 		case KORE_RESULT_RETRY:
    259 			return (KORE_RESULT_OK);
    260 		default:
    261 			return (KORE_RESULT_ERROR);
    262 		}
    263 
    264 		if (c->owner != NULL) {
    265 			listener = (struct listener *)c->owner;
    266 			if (listener->connect != NULL) {
    267 				kore_runtime_connect(listener->connect, c);
    268 				kore_connection_start_idletimer(c);
    269 				return (KORE_RESULT_OK);
    270 			}
    271 		}
    272 
    273 #if !defined(KORE_NO_HTTP)
    274 		c->proto = CONN_PROTO_HTTP;
    275 		if (http_keepalive_time != 0) {
    276 			c->idle_timer.length =
    277 			    http_keepalive_time * 1000;
    278 		}
    279 
    280 		net_recv_queue(c, http_header_max,
    281 		    NETBUF_CALL_CB_ALWAYS, http_header_recv);
    282 #endif
    283 
    284 		c->state = CONN_STATE_ESTABLISHED;
    285 		/* FALLTHROUGH */
    286 	case CONN_STATE_ESTABLISHED:
    287 		if (c->evt.flags & KORE_EVENT_READ) {
    288 			if (!net_recv_flush(c))
    289 				return (KORE_RESULT_ERROR);
    290 		}
    291 
    292 		if (c->evt.flags & KORE_EVENT_WRITE) {
    293 			if (!net_send_flush(c))
    294 				return (KORE_RESULT_ERROR);
    295 		}
    296 		break;
    297 	case CONN_STATE_DISCONNECTING:
    298 		break;
    299 	default:
    300 		break;
    301 	}
    302 
    303 	kore_connection_start_idletimer(c);
    304 
    305 	return (KORE_RESULT_OK);
    306 }
    307 
    308 void
    309 kore_connection_remove(struct connection *c)
    310 {
    311 	struct netbuf		*nb, *next;
    312 #if !defined(KORE_NO_HTTP)
    313 	struct http_request	*req, *rnext;
    314 #endif
    315 
    316 	kore_tls_connection_cleanup(c);
    317 
    318 	close(c->fd);
    319 
    320 	if (c->hdlr_extra != NULL)
    321 		kore_free(c->hdlr_extra);
    322 
    323 #if !defined(KORE_NO_HTTP)
    324 	for (req = TAILQ_FIRST(&(c->http_requests));
    325 	    req != NULL; req = rnext) {
    326 		rnext = TAILQ_NEXT(req, olist);
    327 		TAILQ_REMOVE(&(c->http_requests), req, olist);
    328 		req->owner = NULL;
    329 		req->flags |= HTTP_REQUEST_DELETE;
    330 		http_request_wakeup(req);
    331 	}
    332 
    333 	kore_free(c->ws_connect);
    334 	kore_free(c->ws_message);
    335 	kore_free(c->ws_disconnect);
    336 #endif
    337 
    338 	for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) {
    339 		next = TAILQ_NEXT(nb, list);
    340 		nb->flags &= ~NETBUF_MUST_RESEND;
    341 		net_remove_netbuf(c, nb);
    342 	}
    343 
    344 	if (c->rnb != NULL) {
    345 		kore_free(c->rnb->buf);
    346 		kore_pool_put(&nb_pool, c->rnb);
    347 	}
    348 
    349 	kore_pool_put(&connection_pool, c);
    350 	worker_active_connections--;
    351 }
    352 
    353 void
    354 kore_connection_check_idletimer(u_int64_t now, struct connection *c)
    355 {
    356 	u_int64_t	d;
    357 
    358 	if (now > c->idle_timer.start)
    359 		d = now - c->idle_timer.start;
    360 	else
    361 		d = 0;
    362 
    363 	if (d >= c->idle_timer.length)
    364 		kore_connection_disconnect(c);
    365 }
    366 
    367 void
    368 kore_connection_start_idletimer(struct connection *c)
    369 {
    370 	c->flags |= CONN_IDLE_TIMER_ACT;
    371 	c->idle_timer.start = kore_time_ms();
    372 }
    373 
    374 void
    375 kore_connection_stop_idletimer(struct connection *c)
    376 {
    377 	c->flags &= ~CONN_IDLE_TIMER_ACT;
    378 	c->idle_timer.start = 0;
    379 }
    380 
    381 int
    382 kore_connection_nonblock(int fd, int nodelay)
    383 {
    384 	int		flags;
    385 
    386 	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
    387 		return (KORE_RESULT_ERROR);
    388 
    389 	flags |= O_NONBLOCK;
    390 	if (fcntl(fd, F_SETFL, flags) == -1)
    391 		return (KORE_RESULT_ERROR);
    392 
    393 	if (nodelay) {
    394 		if (!kore_sockopt(fd, IPPROTO_TCP, TCP_NODELAY)) {
    395 			kore_log(LOG_NOTICE,
    396 			    "failed to set TCP_NODELAY on %d", fd);
    397 		}
    398 	}
    399 
    400 	return (KORE_RESULT_OK);
    401 }
    402 
    403 void
    404 kore_connection_log(struct connection *c, const char *fmt, ...)
    405 {
    406 	struct kore_buf		buf;
    407 	va_list			args;
    408 	char			*ptr;
    409 
    410 	kore_buf_init(&buf, 128);
    411 	kore_buf_appendf(&buf, "ip=[%s] msg=[", kore_connection_ip(c));
    412 
    413 	va_start(args, fmt);
    414 	kore_buf_appendv(&buf, fmt, args);
    415 	va_end(args);
    416 
    417 	kore_buf_appendf(&buf, "]");
    418 
    419 	ptr = kore_buf_stringify(&buf, NULL);
    420 	kore_log(LOG_NOTICE, "%s", ptr);
    421 	kore_free(ptr);
    422 }
    423 
    424 const char *
    425 kore_connection_ip(struct connection *c)
    426 {
    427 	static char	addr[INET6_ADDRSTRLEN];
    428 
    429 	memset(addr, 0, sizeof(addr));
    430 
    431 	switch (c->family) {
    432 	case AF_INET:
    433 		if (inet_ntop(c->family,
    434 		    &(c->addr.ipv4.sin_addr), addr, sizeof(addr)) == NULL)
    435 			fatal("inet_ntop: %s", errno_s);
    436 		break;
    437 	case AF_INET6:
    438 		if (inet_ntop(c->family,
    439 		    &(c->addr.ipv6.sin6_addr), addr, sizeof(addr)) == NULL)
    440 			fatal("inet_ntop: %s", errno_s);
    441 		break;
    442 	case AF_UNIX:
    443 		(void)kore_strlcpy(addr, "unix-socket", sizeof(addr));
    444 		break;
    445 	default:
    446 		fatal("unknown family %d", c->family);
    447 	}
    448 
    449 	return (addr);
    450 }