sse.c (5049B)
1 /*
2 * Copyright (c) 2015 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 /*
18 * Simple example of how SSE (Server Side Events) could be used in Kore.
19 *
20 * Upon new arrivals, a join event is broadcast to all clients.
21 * If a client goes away a leave event is broadcasted.
22 * Each connection gets its own 10 second ping timer which will emit
23 * a ping event to the connection endpoint.
24 */
25
26 #include <kore/kore.h>
27 #include <kore/http.h>
28
29 #include "assets.h"
30
31 void sse_ping(void *, u_int64_t);
32 int page(struct http_request *);
33 int subscribe(struct http_request *);
34 void sse_disconnect(struct connection *);
35 void sse_send(struct connection *, void *, size_t);
36 void sse_broadcast(struct connection *, void *, size_t);
37 int check_header(struct http_request *, const char *, const char *);
38
39 /*
40 * Each client subscribed to our SSE gets a state attached
41 * to their hdlr_extra pointer member.
42 */
43 struct sse_state {
44 struct kore_timer *timer;
45 };
46
47 int
48 page(struct http_request *req)
49 {
50 if (req->method != HTTP_METHOD_GET) {
51 http_response_header(req, "allow", "get");
52 http_response(req, 405, NULL, 0);
53 return (KORE_RESULT_OK);
54 }
55
56 http_response_header(req, "content-type", "text/html");
57 http_response(req, 200, asset_index_html, asset_len_index_html);
58 return (KORE_RESULT_OK);
59 }
60
61 int
62 subscribe(struct http_request *req)
63 {
64 struct connection *c;
65 struct sse_state *state;
66 char *hello = "event:join\ndata: client\n\n";
67
68 /* Preventive paranoia. */
69 if (req->hdlr_extra != NULL) {
70 kore_log(LOG_ERR, "%p: already subscribed", req->owner);
71 http_response(req, 500, NULL, 0);
72 return (KORE_RESULT_OK);
73 }
74
75 /* Only allow GET methods. */
76 if (req->method != HTTP_METHOD_GET) {
77 http_response_header(req, "allow", "get");
78 http_response(req, 405, NULL, 0);
79 return (KORE_RESULT_OK);
80 }
81
82 /* Only do SSE if the client told us it wanted too. */
83 if (!check_header(req, "accept", "text/event-stream"))
84 return (KORE_RESULT_OK);
85
86 /* Do not include content-length in our response. */
87 req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH;
88
89 /* Notify existing clients of our new client now. */
90 sse_broadcast(req->owner, hello, strlen(hello));
91
92 /* Set a disconnection method so we know when this client goes away. */
93 req->owner->disconnect = sse_disconnect;
94
95 /* Allocate a state to be carried by our connection. */
96 state = kore_malloc(sizeof(*state));
97 req->owner->hdlr_extra = state;
98
99 /* Now start a timer to send a ping back every 10 second. */
100 state->timer = kore_timer_add(sse_ping, 10000, req->owner, 0);
101
102 /* Respond that the SSE channel is now open. */
103 kore_log(LOG_NOTICE, "%p: connected for SSE", req->owner);
104 http_response_header(req, "content-type", "text/event-stream");
105 http_response(req, 200, NULL, 0);
106
107 /* Kill HTTP timeouts. */
108 c = req->owner;
109 c->http_timeout = 0;
110
111 return (KORE_RESULT_OK);
112 }
113
114 void
115 sse_broadcast(struct connection *src, void *data, size_t len)
116 {
117 struct connection *c;
118
119 /* Broadcast the message to all other clients. */
120 TAILQ_FOREACH(c, &connections, list) {
121 if (c == src)
122 continue;
123 sse_send(c, data, len);
124 }
125 }
126
127 void
128 sse_send(struct connection *c, void *data, size_t len)
129 {
130 struct sse_state *state = c->hdlr_extra;
131
132 /* Do not send to clients that do not have a state. */
133 if (state == NULL)
134 return;
135
136 /* Queue outgoing data now. */
137 net_send_queue(c, data, len);
138 net_send_flush(c);
139 }
140
141 void
142 sse_ping(void *arg, u_int64_t now)
143 {
144 struct connection *c = arg;
145 char *ping = "event:ping\ndata:\n\n";
146
147 /* Send our ping to the client. */
148 sse_send(c, ping, strlen(ping));
149 }
150
151 void
152 sse_disconnect(struct connection *c)
153 {
154 struct sse_state *state = c->hdlr_extra;
155 char *leaving = "event: leave\ndata: client\n\n";
156
157 kore_log(LOG_NOTICE, "%p: disconnecting for SSE", c);
158
159 /* Tell others we are leaving. */
160 sse_broadcast(c, leaving, strlen(leaving));
161
162 /* Kill our timer and free/remove the state. */
163 kore_timer_remove(state->timer);
164 kore_free(state);
165
166 /* Prevent us to be called again. */
167 c->hdlr_extra = NULL;
168 c->disconnect = NULL;
169 }
170
171 int
172 check_header(struct http_request *req, const char *name, const char *value)
173 {
174 const char *hdr;
175
176 if (!http_request_header(req, name, &hdr)) {
177 http_response(req, 400, NULL, 0);
178 return (KORE_RESULT_ERROR);
179 }
180
181 if (strcmp(hdr, value)) {
182 http_response(req, 400, NULL, 0);
183 return (KORE_RESULT_ERROR);
184 }
185
186 return (KORE_RESULT_OK);
187 }