kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

sse.c (4959B)



      1 /*
      2  * Copyright (c) 2015 Joris Vink <joris@coders.se>
      3  *
      4  * Permission to use, copy, modify, and distribute this software for any
      5  * purpose with or without fee is hereby granted, provided that the above
      6  * copyright notice and this permission notice appear in all copies.
      7  *
      8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
      9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     15  */
     16 
     17 /*
     18  * Simple example of how SSE (Server Side Events) could be used in Kore.
     19  *
     20  * Upon new arrivals, a join event is broadcast to all clients.
     21  * If a client goes away a leave event is broadcasted.
     22  * Each connection gets its own 10 second ping timer which will emit
     23  * a ping event to the connection endpoint.
     24  */
     25 
     26 #include <kore/kore.h>
     27 #include <kore/http.h>
     28 
     29 #include "assets.h"
     30 
     31 void	sse_ping(void *, u_int64_t);
     32 int	page(struct http_request *);
     33 int	subscribe(struct http_request *);
     34 void	sse_disconnect(struct connection *);
     35 void	sse_send(struct connection *, void *, size_t);
     36 void	sse_broadcast(struct connection *, void *, size_t);
     37 int	check_header(struct http_request *, const char *, const char *);
     38 
     39 /*
     40  * Each client subscribed to our SSE gets a state attached
     41  * to their hdlr_extra pointer member.
     42  */
     43 struct sse_state {
     44 	struct kore_timer		*timer;
     45 };
     46 
     47 int
     48 page(struct http_request *req)
     49 {
     50 	if (req->method != HTTP_METHOD_GET) {
     51 		http_response_header(req, "allow", "get");
     52 		http_response(req, 405, NULL, 0);
     53 		return (KORE_RESULT_OK);
     54 	}
     55 
     56 	http_response_header(req, "content-type", "text/html");
     57 	http_response(req, 200, asset_index_html, asset_len_index_html);
     58 	return (KORE_RESULT_OK);
     59 }
     60 
     61 int
     62 subscribe(struct http_request *req)
     63 {
     64 	struct sse_state	*state;
     65 	char			*hello = "event:join\ndata: client\n\n";
     66 
     67 	/* Preventive paranoia. */
     68 	if (req->hdlr_extra != NULL) {
     69 		kore_log(LOG_ERR, "%p: already subscribed", req->owner);
     70 		http_response(req, 500, NULL, 0);
     71 		return (KORE_RESULT_OK);
     72 	}
     73 
     74 	/* Only allow GET methods. */
     75 	if (req->method != HTTP_METHOD_GET) {
     76 		http_response_header(req, "allow", "get");
     77 		http_response(req, 405, NULL, 0);
     78 		return (KORE_RESULT_OK);
     79 	}
     80 
     81 	/* Only do SSE if the client told us it wanted too. */
     82 	if (!check_header(req, "accept", "text/event-stream"))
     83 		return (KORE_RESULT_OK);
     84 
     85 	/* Do not include content-length in our response. */
     86 	req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH;
     87 
     88 	/* Notify existing clients of our new client now. */
     89 	sse_broadcast(req->owner, hello, strlen(hello));
     90 
     91 	/* Set a disconnection method so we know when this client goes away. */
     92 	req->owner->disconnect = sse_disconnect;
     93 
     94 	/* Allocate a state to be carried by our connection. */
     95 	state = kore_malloc(sizeof(*state));
     96 	req->owner->hdlr_extra = state;
     97 
     98 	/* Now start a timer to send a ping back every 10 second. */
     99 	state->timer = kore_timer_add(sse_ping, 10000, req->owner, 0);
    100 
    101 	/* Respond that the SSE channel is now open. */
    102 	kore_log(LOG_NOTICE, "%p: connected for SSE", req->owner);
    103 	http_response_header(req, "content-type", "text/event-stream");
    104 	http_response(req, 200, NULL, 0);
    105 
    106 	return (KORE_RESULT_OK);
    107 }
    108 
    109 void
    110 sse_broadcast(struct connection *src, void *data, size_t len)
    111 {
    112 	struct connection	*c;
    113 
    114 	/* Broadcast the message to all other clients. */
    115 	TAILQ_FOREACH(c, &connections, list) {
    116 		if (c == src)
    117 			continue;
    118 		sse_send(c, data, len);
    119 	}
    120 }
    121 
    122 void
    123 sse_send(struct connection *c, void *data, size_t len)
    124 {
    125 	struct sse_state	*state = c->hdlr_extra;
    126 
    127 	/* Do not send to clients that do not have a state. */
    128 	if (state == NULL)
    129 		return;
    130 
    131 	/* Queue outgoing data now. */
    132 	net_send_queue(c, data, len);
    133 	net_send_flush(c);
    134 }
    135 
    136 void
    137 sse_ping(void *arg, u_int64_t now)
    138 {
    139 	struct connection		*c = arg;
    140 	char				*ping = "event:ping\ndata:\n\n";
    141 
    142 	/* Send our ping to the client. */
    143 	sse_send(c, ping, strlen(ping));
    144 }
    145 
    146 void
    147 sse_disconnect(struct connection *c)
    148 {
    149 	struct sse_state	*state = c->hdlr_extra;
    150 	char			*leaving = "event: leave\ndata: client\n\n";
    151 
    152 	kore_log(LOG_NOTICE, "%p: disconnecting for SSE", c);
    153 
    154 	/* Tell others we are leaving. */
    155 	sse_broadcast(c, leaving, strlen(leaving));
    156 
    157 	/* Kill our timer and free/remove the state. */
    158 	kore_timer_remove(state->timer);
    159 	kore_free(state);
    160 
    161 	/* Prevent us to be called again. */
    162 	c->hdlr_extra = NULL;
    163 	c->disconnect = NULL;
    164 }
    165 
    166 int
    167 check_header(struct http_request *req, const char *name, const char *value)
    168 {
    169 	const char		*hdr;
    170 
    171 	if (!http_request_header(req, name, &hdr)) {
    172 		http_response(req, 400, NULL, 0);
    173 		return (KORE_RESULT_ERROR);
    174 	}
    175 
    176 	if (strcmp(hdr, value)) {
    177 		http_response(req, 400, NULL, 0);
    178 		return (KORE_RESULT_ERROR);
    179 	}
    180 
    181 	return (KORE_RESULT_OK);
    182 }