kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

pipe_task.c (4855B)



      1 /*
      2  * Copyright (c) 2015 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 /*
     18  * This example demos Kore its task and websocket capabilities.
     19  *
     20  * It will spawn a task which connects to a named pipe and writes
     21  * responses to all connected websocket clients.
     22  */
     23 
     24 #include <sys/param.h>
     25 #include <sys/socket.h>
     26 #include <sys/un.h>
     27 
     28 #include <kore/kore.h>
     29 #include <kore/http.h>
     30 #include <kore/tasks.h>
     31 #include <kore/hooks.h>
     32 
     33 #include <fcntl.h>
     34 #include <unistd.h>
     35 
     36 #include "assets.h"
     37 
     38 int		init(int);
     39 int		page(struct http_request *);
     40 int		page_ws_connect(struct http_request *);
     41 
     42 void		websocket_connect(struct connection *);
     43 void		websocket_disconnect(struct connection *);
     44 void		websocket_message(struct connection *,
     45 		    u_int8_t, void *, size_t);
     46 
     47 int		pipe_reader(struct kore_task *);
     48 void		pipe_data_available(struct kore_task *);
     49 
     50 /* Our pipe reader. */
     51 struct kore_task	pipe_task;
     52 
     53 void
     54 kore_worker_configure(void)
     55 {
     56 	/* Only do this on a dedicated worker. */
     57 	if (worker->id != 1)
     58 		return;
     59 
     60 	/* Create our task. */
     61 	kore_task_create(&pipe_task, pipe_reader);
     62 
     63 	/* Bind a callback whenever data is available from task. */
     64 	kore_task_bind_callback(&pipe_task, pipe_data_available);
     65 
     66 	/* Start the task. */
     67 	kore_task_run(&pipe_task);
     68 }
     69 
     70 /* Called whenever we get a new websocket connection. */
     71 void
     72 websocket_connect(struct connection *c)
     73 {
     74 	kore_log(LOG_NOTICE, "%p: connected", c);
     75 }
     76 
     77 /* Called whenever we receive a websocket message from a client. */
     78 void
     79 websocket_message(struct connection *c, u_int8_t op, void *data, size_t len)
     80 {
     81 	/* Not doing anything with this. */
     82 }
     83 
     84 /* Called whenever a websocket goes away. */
     85 void
     86 websocket_disconnect(struct connection *c)
     87 {
     88 	kore_log(LOG_NOTICE, "%p: disconnecting", c);
     89 }
     90 
     91 /* The / page. */
     92 int
     93 page(struct http_request *req)
     94 {
     95 	http_response_header(req, "content-type", "text/html");
     96 	http_response(req, 200, asset_frontend_html, asset_len_frontend_html);
     97 
     98 	return (KORE_RESULT_OK);
     99 }
    100 
    101 /* The /connect page. */
    102 int
    103 page_ws_connect(struct http_request *req)
    104 {
    105 	kore_websocket_handshake(req, "websocket_connect",
    106 	    "websocket_message", "websocket_disconnect");
    107 	return (KORE_RESULT_OK);
    108 }
    109 
    110 /*
    111  * The pipe reader task. This task simply waits for a writer end
    112  * on a named pipe and reads from it. The bytes read are written
    113  * on the task channel because the task does not own any connection
    114  * data structures and shouldn't reference them directly.
    115  */
    116 int
    117 pipe_reader(struct kore_task *t)
    118 {
    119 	int		fd;
    120 	ssize_t		ret;
    121 	u_int8_t	buf[BUFSIZ];
    122 
    123 	fd = -1;
    124 
    125 	kore_log(LOG_INFO, "pipe_reader starting");
    126 
    127 	/* Just run forever. */
    128 	for (;;) {
    129 		/* Attempt to open the pipe if needed. */
    130 		if (fd == -1) {
    131 			kore_log(LOG_NOTICE, "waiting for writer");
    132 
    133 			if ((fd = open("/tmp/pipe", O_RDONLY)) == -1) {
    134 				kore_log(LOG_NOTICE, "failed to open pipe");
    135 				sleep(1);
    136 				continue;
    137 			}
    138 
    139 			kore_log(LOG_NOTICE, "writer connected");
    140 		}
    141 
    142 		/* Got a writer on the other end so start reading. */
    143 		ret = read(fd, buf, sizeof(buf));
    144 		if (ret == -1) {
    145 			kore_log(LOG_ERR, "read error on pipe");
    146 			(void)close(fd);
    147 			fd = -1;
    148 			continue;
    149 		}
    150 
    151 		if (ret == 0) {
    152 			kore_log(LOG_NOTICE, "writer disconnected");
    153 			(void)close(fd);
    154 			fd = -1;
    155 			continue;
    156 		}
    157 
    158 		kore_log(LOG_NOTICE, "got %ld bytes from pipe", ret);
    159 
    160 		/*
    161 		 * Write data on the task channel so our main event loop
    162 		 * will call the registered callback.
    163 		 */
    164 		kore_task_channel_write(t, buf, ret);
    165 	}
    166 
    167 	return (KORE_RESULT_OK);
    168 }
    169 
    170 /* Called on the main event loop whenever a task event fires. */
    171 void
    172 pipe_data_available(struct kore_task *t)
    173 {
    174 	size_t		len;
    175 	u_int8_t	buf[BUFSIZ];
    176 
    177 	/* Deal with the task finishing, we could restart it from here. */
    178 	if (kore_task_finished(t)) {
    179 		kore_log(LOG_WARNING, "task finished");
    180 		return;
    181 	}
    182 
    183 	/* Read data from the task channel. */
    184 	len = kore_task_channel_read(t, buf, sizeof(buf));
    185 	if (len > sizeof(buf))
    186 		kore_log(LOG_WARNING, "truncated data from task");
    187 
    188 	/* Broadcast it to all connected websocket clients. */
    189 	kore_log(LOG_NOTICE, "got %zu bytes from task", len);
    190 
    191 	kore_websocket_broadcast(NULL, WEBSOCKET_OP_TEXT,
    192 	    buf, len, WEBSOCKET_BROADCAST_GLOBAL);
    193 }