sse.c (4959B)
1 /*
2 * Copyright (c) 2015 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 /*
18 * Simple example of how SSE (Server Side Events) could be used in Kore.
19 *
20 * Upon new arrivals, a join event is broadcast to all clients.
21 * If a client goes away a leave event is broadcasted.
22 * Each connection gets its own 10 second ping timer which will emit
23 * a ping event to the connection endpoint.
24 */
25
26 #include <kore/kore.h>
27 #include <kore/http.h>
28
29 #include "assets.h"
30
31 void sse_ping(void *, u_int64_t);
32 int page(struct http_request *);
33 int subscribe(struct http_request *);
34 void sse_disconnect(struct connection *);
35 void sse_send(struct connection *, void *, size_t);
36 void sse_broadcast(struct connection *, void *, size_t);
37 int check_header(struct http_request *, const char *, const char *);
38
39 /*
40 * Each client subscribed to our SSE gets a state attached
41 * to their hdlr_extra pointer member.
42 */
43 struct sse_state {
44 struct kore_timer *timer;
45 };
46
47 int
48 page(struct http_request *req)
49 {
50 if (req->method != HTTP_METHOD_GET) {
51 http_response_header(req, "allow", "get");
52 http_response(req, 405, NULL, 0);
53 return (KORE_RESULT_OK);
54 }
55
56 http_response_header(req, "content-type", "text/html");
57 http_response(req, 200, asset_index_html, asset_len_index_html);
58 return (KORE_RESULT_OK);
59 }
60
61 int
62 subscribe(struct http_request *req)
63 {
64 struct sse_state *state;
65 char *hello = "event:join\ndata: client\n\n";
66
67 /* Preventive paranoia. */
68 if (req->hdlr_extra != NULL) {
69 kore_log(LOG_ERR, "%p: already subscribed", req->owner);
70 http_response(req, 500, NULL, 0);
71 return (KORE_RESULT_OK);
72 }
73
74 /* Only allow GET methods. */
75 if (req->method != HTTP_METHOD_GET) {
76 http_response_header(req, "allow", "get");
77 http_response(req, 405, NULL, 0);
78 return (KORE_RESULT_OK);
79 }
80
81 /* Only do SSE if the client told us it wanted too. */
82 if (!check_header(req, "accept", "text/event-stream"))
83 return (KORE_RESULT_OK);
84
85 /* Do not include content-length in our response. */
86 req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH;
87
88 /* Notify existing clients of our new client now. */
89 sse_broadcast(req->owner, hello, strlen(hello));
90
91 /* Set a disconnection method so we know when this client goes away. */
92 req->owner->disconnect = sse_disconnect;
93
94 /* Allocate a state to be carried by our connection. */
95 state = kore_malloc(sizeof(*state));
96 req->owner->hdlr_extra = state;
97
98 /* Now start a timer to send a ping back every 10 second. */
99 state->timer = kore_timer_add(sse_ping, 10000, req->owner, 0);
100
101 /* Respond that the SSE channel is now open. */
102 kore_log(LOG_NOTICE, "%p: connected for SSE", req->owner);
103 http_response_header(req, "content-type", "text/event-stream");
104 http_response(req, 200, NULL, 0);
105
106 return (KORE_RESULT_OK);
107 }
108
109 void
110 sse_broadcast(struct connection *src, void *data, size_t len)
111 {
112 struct connection *c;
113
114 /* Broadcast the message to all other clients. */
115 TAILQ_FOREACH(c, &connections, list) {
116 if (c == src)
117 continue;
118 sse_send(c, data, len);
119 }
120 }
121
122 void
123 sse_send(struct connection *c, void *data, size_t len)
124 {
125 struct sse_state *state = c->hdlr_extra;
126
127 /* Do not send to clients that do not have a state. */
128 if (state == NULL)
129 return;
130
131 /* Queue outgoing data now. */
132 net_send_queue(c, data, len);
133 net_send_flush(c);
134 }
135
136 void
137 sse_ping(void *arg, u_int64_t now)
138 {
139 struct connection *c = arg;
140 char *ping = "event:ping\ndata:\n\n";
141
142 /* Send our ping to the client. */
143 sse_send(c, ping, strlen(ping));
144 }
145
146 void
147 sse_disconnect(struct connection *c)
148 {
149 struct sse_state *state = c->hdlr_extra;
150 char *leaving = "event: leave\ndata: client\n\n";
151
152 kore_log(LOG_NOTICE, "%p: disconnecting for SSE", c);
153
154 /* Tell others we are leaving. */
155 sse_broadcast(c, leaving, strlen(leaving));
156
157 /* Kill our timer and free/remove the state. */
158 kore_timer_remove(state->timer);
159 kore_free(state);
160
161 /* Prevent us to be called again. */
162 c->hdlr_extra = NULL;
163 c->disconnect = NULL;
164 }
165
166 int
167 check_header(struct http_request *req, const char *name, const char *value)
168 {
169 const char *hdr;
170
171 if (!http_request_header(req, name, &hdr)) {
172 http_response(req, 400, NULL, 0);
173 return (KORE_RESULT_ERROR);
174 }
175
176 if (strcmp(hdr, value)) {
177 http_response(req, 400, NULL, 0);
178 return (KORE_RESULT_ERROR);
179 }
180
181 return (KORE_RESULT_OK);
182 }