pipe_task.c (4855B)
1 /*
2 * Copyright (c) 2015 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 /*
18 * This example demos Kore its task and websocket capabilities.
19 *
20 * It will spawn a task which connects to a named pipe and writes
21 * responses to all connected websocket clients.
22 */
23
24 #include <sys/param.h>
25 #include <sys/socket.h>
26 #include <sys/un.h>
27
28 #include <kore/kore.h>
29 #include <kore/http.h>
30 #include <kore/tasks.h>
31 #include <kore/hooks.h>
32
33 #include <fcntl.h>
34 #include <unistd.h>
35
36 #include "assets.h"
37
38 int init(int);
39 int page(struct http_request *);
40 int page_ws_connect(struct http_request *);
41
42 void websocket_connect(struct connection *);
43 void websocket_disconnect(struct connection *);
44 void websocket_message(struct connection *,
45 u_int8_t, void *, size_t);
46
47 int pipe_reader(struct kore_task *);
48 void pipe_data_available(struct kore_task *);
49
50 /* Our pipe reader. */
51 struct kore_task pipe_task;
52
53 void
54 kore_worker_configure(void)
55 {
56 /* Only do this on a dedicated worker. */
57 if (worker->id != 1)
58 return;
59
60 /* Create our task. */
61 kore_task_create(&pipe_task, pipe_reader);
62
63 /* Bind a callback whenever data is available from task. */
64 kore_task_bind_callback(&pipe_task, pipe_data_available);
65
66 /* Start the task. */
67 kore_task_run(&pipe_task);
68 }
69
70 /* Called whenever we get a new websocket connection. */
71 void
72 websocket_connect(struct connection *c)
73 {
74 kore_log(LOG_NOTICE, "%p: connected", c);
75 }
76
77 /* Called whenever we receive a websocket message from a client. */
78 void
79 websocket_message(struct connection *c, u_int8_t op, void *data, size_t len)
80 {
81 /* Not doing anything with this. */
82 }
83
84 /* Called whenever a websocket goes away. */
85 void
86 websocket_disconnect(struct connection *c)
87 {
88 kore_log(LOG_NOTICE, "%p: disconnecting", c);
89 }
90
91 /* The / page. */
92 int
93 page(struct http_request *req)
94 {
95 http_response_header(req, "content-type", "text/html");
96 http_response(req, 200, asset_frontend_html, asset_len_frontend_html);
97
98 return (KORE_RESULT_OK);
99 }
100
101 /* The /connect page. */
102 int
103 page_ws_connect(struct http_request *req)
104 {
105 kore_websocket_handshake(req, "websocket_connect",
106 "websocket_message", "websocket_disconnect");
107 return (KORE_RESULT_OK);
108 }
109
110 /*
111 * The pipe reader task. This task simply waits for a writer end
112 * on a named pipe and reads from it. The bytes read are written
113 * on the task channel because the task does not own any connection
114 * data structures and shouldn't reference them directly.
115 */
116 int
117 pipe_reader(struct kore_task *t)
118 {
119 int fd;
120 ssize_t ret;
121 u_int8_t buf[BUFSIZ];
122
123 fd = -1;
124
125 kore_log(LOG_INFO, "pipe_reader starting");
126
127 /* Just run forever. */
128 for (;;) {
129 /* Attempt to open the pipe if needed. */
130 if (fd == -1) {
131 kore_log(LOG_NOTICE, "waiting for writer");
132
133 if ((fd = open("/tmp/pipe", O_RDONLY)) == -1) {
134 kore_log(LOG_NOTICE, "failed to open pipe");
135 sleep(1);
136 continue;
137 }
138
139 kore_log(LOG_NOTICE, "writer connected");
140 }
141
142 /* Got a writer on the other end so start reading. */
143 ret = read(fd, buf, sizeof(buf));
144 if (ret == -1) {
145 kore_log(LOG_ERR, "read error on pipe");
146 (void)close(fd);
147 fd = -1;
148 continue;
149 }
150
151 if (ret == 0) {
152 kore_log(LOG_NOTICE, "writer disconnected");
153 (void)close(fd);
154 fd = -1;
155 continue;
156 }
157
158 kore_log(LOG_NOTICE, "got %ld bytes from pipe", ret);
159
160 /*
161 * Write data on the task channel so our main event loop
162 * will call the registered callback.
163 */
164 kore_task_channel_write(t, buf, ret);
165 }
166
167 return (KORE_RESULT_OK);
168 }
169
170 /* Called on the main event loop whenever a task event fires. */
171 void
172 pipe_data_available(struct kore_task *t)
173 {
174 size_t len;
175 u_int8_t buf[BUFSIZ];
176
177 /* Deal with the task finishing, we could restart it from here. */
178 if (kore_task_finished(t)) {
179 kore_log(LOG_WARNING, "task finished");
180 return;
181 }
182
183 /* Read data from the task channel. */
184 len = kore_task_channel_read(t, buf, sizeof(buf));
185 if (len > sizeof(buf))
186 kore_log(LOG_WARNING, "truncated data from task");
187
188 /* Broadcast it to all connected websocket clients. */
189 kore_log(LOG_NOTICE, "got %zu bytes from task", len);
190
191 kore_websocket_broadcast(NULL, WEBSOCKET_OP_TEXT,
192 buf, len, WEBSOCKET_BROADCAST_GLOBAL);
193 }