kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

net.c (8377B)



      1 /*
      2  * Copyright (c) 2013-2022 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 #include <sys/param.h>
     18 #include <sys/socket.h>
     19 #include <sys/types.h>
     20 
     21 #if defined(__linux__)
     22 #include <endian.h>
     23 #elif defined(__MACH__)
     24 #include <libkern/OSByteOrder.h>
     25 #define htobe64(x)	OSSwapHostToBigInt64(x)
     26 #define be64toh(x)	OSSwapBigToHostInt64(x)
     27 #else
     28 #include <sys/endian.h>
     29 #endif
     30 
     31 #include "kore.h"
     32 
     33 struct kore_pool		nb_pool;
     34 
     35 void
     36 net_init(void)
     37 {
     38 	u_int32_t	elm;
     39 
     40 	/* Add some overhead so we don't roll over for internal items. */
     41 	elm = worker_max_connections + 10;
     42 	kore_pool_init(&nb_pool, "nb_pool", sizeof(struct netbuf), elm);
     43 }
     44 
     45 void
     46 net_cleanup(void)
     47 {
     48 	kore_pool_cleanup(&nb_pool);
     49 }
     50 
     51 struct netbuf *
     52 net_netbuf_get(void)
     53 {
     54 	struct netbuf	*nb;
     55 
     56 	nb = kore_pool_get(&nb_pool);
     57 
     58 	nb->cb = NULL;
     59 	nb->buf = NULL;
     60 	nb->owner = NULL;
     61 	nb->extra = NULL;
     62 	nb->file_ref = NULL;
     63 
     64 	nb->type = 0;
     65 	nb->s_off = 0;
     66 	nb->b_len = 0;
     67 	nb->m_len = 0;
     68 	nb->flags = 0;
     69 
     70 #if defined(KORE_USE_PLATFORM_SENDFILE)
     71 	nb->fd_off = -1;
     72 	nb->fd_len = -1;
     73 #endif
     74 
     75 	return (nb);
     76 }
     77 
     78 void
     79 net_send_queue(struct connection *c, const void *data, size_t len)
     80 {
     81 	const u_int8_t		*d;
     82 	struct netbuf		*nb;
     83 	size_t			avail;
     84 
     85 	d = data;
     86 	nb = TAILQ_LAST(&(c->send_queue), netbuf_head);
     87 	if (nb != NULL && !(nb->flags & NETBUF_IS_STREAM) &&
     88 	    nb->b_len < nb->m_len) {
     89 		avail = nb->m_len - nb->b_len;
     90 		if (len < avail) {
     91 			memcpy(nb->buf + nb->b_len, d, len);
     92 			nb->b_len += len;
     93 			return;
     94 		} else {
     95 			memcpy(nb->buf + nb->b_len, d, avail);
     96 			nb->b_len += avail;
     97 
     98 			len -= avail;
     99 			d += avail;
    100 			if (len == 0)
    101 				return;
    102 		}
    103 	}
    104 
    105 	nb = net_netbuf_get();
    106 
    107 	nb->owner = c;
    108 	nb->b_len = len;
    109 	nb->type = NETBUF_SEND;
    110 
    111 	if (nb->b_len < NETBUF_SEND_PAYLOAD_MAX)
    112 		nb->m_len = NETBUF_SEND_PAYLOAD_MAX;
    113 	else
    114 		nb->m_len = nb->b_len;
    115 
    116 	nb->buf = kore_malloc(nb->m_len);
    117 	memcpy(nb->buf, d, nb->b_len);
    118 
    119 	TAILQ_INSERT_TAIL(&(c->send_queue), nb, list);
    120 }
    121 
    122 void
    123 net_send_stream(struct connection *c, void *data, size_t len,
    124     int (*cb)(struct netbuf *), struct netbuf **out)
    125 {
    126 	struct netbuf		*nb;
    127 
    128 	nb = net_netbuf_get();
    129 	nb->cb = cb;
    130 	nb->owner = c;
    131 	nb->buf = data;
    132 	nb->b_len = len;
    133 	nb->m_len = nb->b_len;
    134 	nb->type = NETBUF_SEND;
    135 	nb->flags  = NETBUF_IS_STREAM;
    136 
    137 	TAILQ_INSERT_TAIL(&(c->send_queue), nb, list);
    138 
    139 	if (out != NULL)
    140 		*out = nb;
    141 }
    142 
    143 void
    144 net_send_fileref(struct connection *c, struct kore_fileref *ref)
    145 {
    146 	struct netbuf		*nb;
    147 
    148 	nb = net_netbuf_get();
    149 	nb->owner = c;
    150 	nb->file_ref = ref;
    151 	nb->type = NETBUF_SEND;
    152 	nb->flags = NETBUF_IS_FILEREF;
    153 
    154 #if defined(KORE_USE_PLATFORM_SENDFILE)
    155 	if (c->owner->server->tls == 0) {
    156 		nb->fd_off = 0;
    157 		nb->fd_len = ref->size;
    158 	} else {
    159 		nb->buf = ref->base;
    160 		nb->b_len = ref->size;
    161 		nb->m_len = nb->b_len;
    162 		nb->flags |= NETBUF_IS_STREAM;
    163 	}
    164 #else
    165 	nb->buf = ref->base;
    166 	nb->b_len = ref->size;
    167 	nb->m_len = nb->b_len;
    168 	nb->flags |= NETBUF_IS_STREAM;
    169 #endif
    170 
    171 	TAILQ_INSERT_TAIL(&(c->send_queue), nb, list);
    172 }
    173 
    174 void
    175 net_recv_reset(struct connection *c, size_t len, int (*cb)(struct netbuf *))
    176 {
    177 	c->rnb->cb = cb;
    178 	c->rnb->s_off = 0;
    179 	c->rnb->b_len = len;
    180 
    181 	if (c->rnb->buf != NULL && c->rnb->b_len <= c->rnb->m_len &&
    182 	    c->rnb->m_len < (NETBUF_SEND_PAYLOAD_MAX / 2))
    183 		return;
    184 
    185 	kore_free(c->rnb->buf);
    186 	c->rnb->m_len = len;
    187 	c->rnb->buf = kore_malloc(c->rnb->m_len);
    188 }
    189 
    190 void
    191 net_recv_queue(struct connection *c, size_t len, int flags,
    192     int (*cb)(struct netbuf *))
    193 {
    194 	if (c->rnb != NULL)
    195 		fatal("net_recv_queue(): called incorrectly");
    196 
    197 	c->rnb = net_netbuf_get();
    198 	c->rnb->cb = cb;
    199 	c->rnb->owner = c;
    200 	c->rnb->b_len = len;
    201 	c->rnb->m_len = len;
    202 	c->rnb->flags = flags;
    203 	c->rnb->type = NETBUF_RECV;
    204 	c->rnb->buf = kore_malloc(c->rnb->b_len);
    205 }
    206 
    207 void
    208 net_recv_expand(struct connection *c, size_t len, int (*cb)(struct netbuf *))
    209 {
    210 	c->rnb->cb = cb;
    211 	c->rnb->b_len += len;
    212 	c->rnb->m_len = c->rnb->b_len;
    213 	c->rnb->buf = kore_realloc(c->rnb->buf, c->rnb->b_len);
    214 }
    215 
    216 int
    217 net_send(struct connection *c)
    218 {
    219 	size_t		r, len, smin;
    220 
    221 	c->snb = TAILQ_FIRST(&(c->send_queue));
    222 
    223 #if defined(KORE_USE_PLATFORM_SENDFILE)
    224 	if ((c->snb->flags & NETBUF_IS_FILEREF) &&
    225 	    !(c->snb->flags & NETBUF_IS_STREAM)) {
    226 		return (kore_platform_sendfile(c, c->snb));
    227 	}
    228 #endif
    229 
    230 	if (c->snb->b_len != 0) {
    231 		smin = c->snb->b_len - c->snb->s_off;
    232 		len = MIN(NETBUF_SEND_PAYLOAD_MAX, smin);
    233 
    234 		if (!c->write(c, len, &r))
    235 			return (KORE_RESULT_ERROR);
    236 		if (!(c->evt.flags & KORE_EVENT_WRITE))
    237 			return (KORE_RESULT_OK);
    238 
    239 		c->snb->s_off += r;
    240 		c->snb->flags &= ~NETBUF_MUST_RESEND;
    241 	}
    242 
    243 	if (c->snb->s_off == c->snb->b_len ||
    244 	    (c->snb->flags & NETBUF_FORCE_REMOVE)) {
    245 		net_remove_netbuf(c, c->snb);
    246 		c->snb = NULL;
    247 	}
    248 
    249 	return (KORE_RESULT_OK);
    250 }
    251 
    252 int
    253 net_send_flush(struct connection *c)
    254 {
    255 	while (!TAILQ_EMPTY(&(c->send_queue)) &&
    256 	    (c->evt.flags & KORE_EVENT_WRITE)) {
    257 		if (!net_send(c))
    258 			return (KORE_RESULT_ERROR);
    259 	}
    260 
    261 	if ((c->flags & CONN_CLOSE_EMPTY) && TAILQ_EMPTY(&(c->send_queue))) {
    262 		kore_connection_disconnect(c);
    263 	}
    264 
    265 	return (KORE_RESULT_OK);
    266 }
    267 
    268 int
    269 net_recv_flush(struct connection *c)
    270 {
    271 	size_t		r;
    272 
    273 	if (c->rnb == NULL)
    274 		return (KORE_RESULT_OK);
    275 
    276 	while (c->evt.flags & KORE_EVENT_READ) {
    277 		if (c->rnb->buf == NULL)
    278 			return (KORE_RESULT_OK);
    279 
    280 		if ((c->rnb->b_len - c->rnb->s_off) == 0)
    281 			return (KORE_RESULT_OK);
    282 
    283 		if (!c->read(c, &r))
    284 			return (KORE_RESULT_ERROR);
    285 		if (!(c->evt.flags & KORE_EVENT_READ))
    286 			break;
    287 
    288 		c->rnb->s_off += r;
    289 		if (c->rnb->s_off == c->rnb->b_len ||
    290 		    (c->rnb->flags & NETBUF_CALL_CB_ALWAYS)) {
    291 			r = c->rnb->cb(c->rnb);
    292 			if (r != KORE_RESULT_OK)
    293 				return (r);
    294 		}
    295 	}
    296 
    297 	return (KORE_RESULT_OK);
    298 }
    299 
    300 void
    301 net_remove_netbuf(struct connection *c, struct netbuf *nb)
    302 {
    303 	if (nb->type == NETBUF_RECV)
    304 		fatal("net_remove_netbuf(): cannot remove recv netbuf");
    305 
    306 	if (nb->flags & NETBUF_MUST_RESEND) {
    307 		nb->flags |= NETBUF_FORCE_REMOVE;
    308 		return;
    309 	}
    310 
    311 	if (!(nb->flags & NETBUF_IS_STREAM)) {
    312 		kore_free(nb->buf);
    313 	} else if (nb->cb != NULL) {
    314 		(void)nb->cb(nb);
    315 	}
    316 
    317 	if (nb->flags & NETBUF_IS_FILEREF)
    318 		kore_fileref_release(nb->file_ref);
    319 
    320 	TAILQ_REMOVE(&(c->send_queue), nb, list);
    321 
    322 	kore_pool_put(&nb_pool, nb);
    323 }
    324 
    325 int
    326 net_write(struct connection *c, size_t len, size_t *written)
    327 {
    328 	ssize_t		r;
    329 
    330 	r = send(c->fd, (c->snb->buf + c->snb->s_off), len, 0);
    331 	if (r == -1) {
    332 		switch (errno) {
    333 		case EINTR:
    334 			*written = 0;
    335 			return (KORE_RESULT_OK);
    336 		case EAGAIN:
    337 			c->evt.flags &= ~KORE_EVENT_WRITE;
    338 			return (KORE_RESULT_OK);
    339 		default:
    340 			return (KORE_RESULT_ERROR);
    341 		}
    342 	}
    343 
    344 	*written = (size_t)r;
    345 
    346 	return (KORE_RESULT_OK);
    347 }
    348 
    349 int
    350 net_read(struct connection *c, size_t *bytes)
    351 {
    352 	ssize_t		r;
    353 
    354 	r = recv(c->fd, (c->rnb->buf + c->rnb->s_off),
    355 	    (c->rnb->b_len - c->rnb->s_off), 0);
    356 	if (r == -1) {
    357 		switch (errno) {
    358 		case EINTR:
    359 			*bytes = 0;
    360 			return (KORE_RESULT_OK);
    361 		case EAGAIN:
    362 			c->evt.flags &= ~KORE_EVENT_READ;
    363 			return (KORE_RESULT_OK);
    364 		default:
    365 			return (KORE_RESULT_ERROR);
    366 		}
    367 	}
    368 
    369 	if (r == 0) {
    370 		kore_connection_disconnect(c);
    371 		c->evt.flags &= ~KORE_EVENT_READ;
    372 		return (KORE_RESULT_OK);
    373 	}
    374 
    375 	*bytes = (size_t)r;
    376 
    377 	return (KORE_RESULT_OK);
    378 }
    379 
    380 u_int16_t
    381 net_read16(u_int8_t *b)
    382 {
    383 	u_int16_t	r;
    384 
    385 	r = *(u_int16_t *)b;
    386 	return (ntohs(r));
    387 }
    388 
    389 u_int32_t
    390 net_read32(u_int8_t *b)
    391 {
    392 	u_int32_t	r;
    393 
    394 	r = *(u_int32_t *)b;
    395 	return (ntohl(r));
    396 }
    397 
    398 void
    399 net_write16(u_int8_t *p, u_int16_t n)
    400 {
    401 	u_int16_t	r;
    402 
    403 	r = htons(n);
    404 	memcpy(p, &r, sizeof(r));
    405 }
    406 
    407 void
    408 net_write32(u_int8_t *p, u_int32_t n)
    409 {
    410 	u_int32_t	r;
    411 
    412 	r = htonl(n);
    413 	memcpy(p, &r, sizeof(r));
    414 }
    415 
    416 u_int64_t
    417 net_read64(u_int8_t *b)
    418 {
    419 	u_int64_t	r;
    420 
    421 	r = *(u_int64_t *)b;
    422 	return (be64toh(r));
    423 }
    424 
    425 void
    426 net_write64(u_int8_t *p, u_int64_t n)
    427 {
    428 	u_int64_t	r;
    429 
    430 	r = htobe64(n);
    431 	memcpy(p, &r, sizeof(r));
    432 }