kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 75108349687ff565e44875dcc0347764e92942ec
parent 1ecb777d411ee2091d2bf8079ba9901bfef3c801
Author: Joris Vink <joris@coders.se>
Date:   Sun, 29 Jan 2017 22:57:34 +0100

initial fudging to add websockets to python

Diffstat:
examples/python/conf/build.conf | 2+-
examples/python/conf/python.conf | 6++++++
examples/websocket/src/websocket.c | 10++--------
includes/kore.h | 22+++++++++++++---------
includes/python_methods.h | 5+++++
src/connection.c | 8+++++++-
src/python.c | 110+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
src/runtime.c | 36++++++++++++++++++++++++++++++++++++
src/websocket.c | 46++++++++++++++++++++++++++++++++++------------
9 files changed, 181 insertions(+), 64 deletions(-)

diff --git a/examples/python/conf/build.conf b/examples/python/conf/build.conf @@ -20,7 +20,7 @@ cxxflags=-Wpointer-arith -Wcast-qual -Wsign-compare # Mime types for assets served via the builtin asset_serve_* #mime_add=txt:text/plain; charset=utf-8 #mime_add=png:image/png -#mime_add=html:text/html; charset=utf-8 +mime_add=html:text/html; charset=utf-8 dev { # These flags are added to the shared ones when diff --git a/examples/python/conf/python.conf b/examples/python/conf/python.conf @@ -2,6 +2,7 @@ load ./python.so onload python_import src/index.py onload +python_import src/websockets.py #bind 127.0.0.1 8888 c_on_connect bind 127.0.0.1 8888 @@ -12,6 +13,9 @@ validator v_id function c_validator validator v_p_id function python_validator validator v_auth function python_auth +websocket_maxframe 65536 +websocket_timeout 20 + authentication auth { authentication_type request authentication_validator v_auth @@ -26,6 +30,8 @@ domain * { static /b minimal static /json json_parse static /state state_test + static /ws ws_connect + static /wspage asset_serve_frontend_html static /auth page auth diff --git a/examples/websocket/src/websocket.c b/examples/websocket/src/websocket.c @@ -27,13 +27,6 @@ void websocket_disconnect(struct connection *); void websocket_message(struct connection *, u_int8_t, void *, size_t); -/* Websocket callbacks. */ -struct kore_wscbs wscbs = { - websocket_connect, - websocket_message, - websocket_disconnect -}; - /* Called whenever we get a new websocket connection. */ void websocket_connect(struct connection *c) @@ -66,7 +59,8 @@ int page_ws_connect(struct http_request *req) { /* Perform the websocket handshake, passing our callbacks. */ - kore_websocket_handshake(req, &wscbs); + kore_websocket_handshake(req, "websocket_connect", + "websocket_message", "websocket_disconnect"); return (KORE_RESULT_OK); } diff --git a/includes/kore.h b/includes/kore.h @@ -196,7 +196,9 @@ struct connection { struct netbuf *rnb; #if !defined(KORE_NO_HTTP) - void *wscbs; + struct kore_runtime_call *ws_connect; + struct kore_runtime_call *ws_message; + struct kore_runtime_call *ws_disconnect; TAILQ_HEAD(, http_request) http_requests; #endif @@ -215,6 +217,10 @@ struct kore_runtime { #if !defined(KORE_NO_HTTP) int (*http_request)(void *, struct http_request *); int (*validator)(void *, struct http_request *, void *); + void (*wsconnect)(void *, struct connection *); + void (*wsdisconnect)(void *, struct connection *); + void (*wsmessage)(void *, struct connection *, + u_int8_t, const void *, size_t); #endif void (*execute)(void *); int (*onload)(void *, int); @@ -393,13 +399,6 @@ struct kore_pool { LIST_HEAD(, kore_pool_entry) freelist; }; -struct kore_wscbs { - void (*connect)(struct connection *); - void (*message)(struct connection *, u_int8_t, - void *, size_t); - void (*disconnect)(struct connection *); -}; - struct kore_timer { u_int64_t nextrun; u_int64_t interval; @@ -580,7 +579,7 @@ char *kore_read_line(FILE *, char *, size_t); #if !defined(KORE_NO_HTTP) void kore_websocket_handshake(struct http_request *, - struct kore_wscbs *); + const char *, const char *, const char *); void kore_websocket_send(struct connection *, u_int8_t, const void *, size_t); void kore_websocket_broadcast(struct connection *, @@ -626,6 +625,11 @@ int kore_runtime_http_request(struct kore_runtime_call *, struct http_request *); int kore_runtime_validator(struct kore_runtime_call *, struct http_request *, void *); +void kore_runtime_wsconnect(struct kore_runtime_call *, struct connection *); +void kore_runtime_wsdisconnect(struct kore_runtime_call *, + struct connection *); +void kore_runtime_wsmessage(struct kore_runtime_call *, + struct connection *, u_int8_t, const void *, size_t); #endif struct kore_domain *kore_domain_lookup(const char *); diff --git a/includes/python_methods.h b/includes/python_methods.h @@ -15,6 +15,7 @@ */ static PyObject *python_exported_log(PyObject *, PyObject *); +static PyObject *python_websocket_broadcast(PyObject *, PyObject *); #define METHOD(n, c, a) { n, (PyCFunction)c, a, NULL } #define GETTER(n, g) { n, (getter)g, NULL, NULL, NULL } @@ -23,6 +24,7 @@ static PyObject *python_exported_log(PyObject *, PyObject *); static struct PyMethodDef pykore_methods[] = { METHOD("log", python_exported_log, METH_VARARGS), + METHOD("websocket_broadcast", python_websocket_broadcast, METH_VARARGS), { NULL, NULL, 0, NULL } }; @@ -71,6 +73,8 @@ static PyObject *pyhttp_populate_get(struct pyhttp_request *, PyObject *); static PyObject *pyhttp_populate_post(struct pyhttp_request *, PyObject *); static PyObject *pyhttp_request_header(struct pyhttp_request *, PyObject *); static PyObject *pyhttp_response_header(struct pyhttp_request *, PyObject *); +static PyObject *pyhttp_websocket_handshake(struct pyhttp_request *, + PyObject *); static PyMethodDef pyhttp_request_methods[] = { METHOD("response", pyhttp_response, METH_VARARGS), @@ -80,6 +84,7 @@ static PyMethodDef pyhttp_request_methods[] = { METHOD("populate_post", pyhttp_populate_post, METH_NOARGS), METHOD("request_header", pyhttp_request_header, METH_VARARGS), METHOD("response_header", pyhttp_response_header, METH_VARARGS), + METHOD("websocket_handshake", pyhttp_websocket_handshake, METH_VARARGS), METHOD(NULL, NULL, -1) }; diff --git a/src/connection.c b/src/connection.c @@ -73,7 +73,9 @@ kore_connection_new(void *owner) c->idle_timer.length = KORE_IDLE_TIMER_MAX; #if !defined(KORE_NO_HTTP) - c->wscbs = NULL; + c->ws_connect = NULL; + c->ws_message = NULL; + c->ws_disconnect = NULL; TAILQ_INIT(&(c->http_requests)); #endif @@ -341,6 +343,10 @@ kore_connection_remove(struct connection *c) req->flags |= HTTP_REQUEST_DELETE; http_request_wakeup(req); } + + kore_free(c->ws_connect); + kore_free(c->ws_message); + kore_free(c->ws_disconnect); #endif for (nb = TAILQ_FIRST(&(c->send_queue)); nb != NULL; nb = next) { diff --git a/src/python.c b/src/python.c @@ -45,6 +45,8 @@ static void python_push_type(const char *, PyObject *, PyTypeObject *); #if !defined(KORE_NO_HTTP) static int python_runtime_http_request(void *, struct http_request *); static int python_runtime_validator(void *, struct http_request *, void *); +static void python_runtime_wsmessage(void *, struct connection *, + u_int8_t, const void *, size_t); #endif static void python_runtime_execute(void *); static int python_runtime_onload(void *, int); @@ -72,6 +74,9 @@ struct kore_runtime kore_python_runtime = { #if !defined(KORE_NO_HTTP) .http_request = python_runtime_http_request, .validator = python_runtime_validator, + .wsconnect = python_runtime_connect, + .wsmessage = python_runtime_wsmessage, + .wsdisconnect = python_runtime_connect, #endif .onload = python_runtime_onload, .connect = python_runtime_connect, @@ -248,12 +253,8 @@ python_runtime_http_request(void *addr, struct http_request *req) callable = (PyObject *)addr; - pyreq = pyhttp_request_alloc(req); - if (pyreq == NULL) { - kore_log(LOG_ERR, "cannot create new pyhttp_request"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); - } + if ((pyreq = pyhttp_request_alloc(req)) == NULL) + fatal("python_runtime_http_request: pyreq alloc failed"); if ((args = PyTuple_New(1)) == NULL) fatal("python_runtime_http_request: PyTuple_New failed"); @@ -266,10 +267,8 @@ python_runtime_http_request(void *addr, struct http_request *req) Py_DECREF(args); if (pyret == NULL) { - Py_XDECREF(req->py_object); python_log_error("python_runtime_http_request"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); + fatal("failed to execute python call"); } if (!PyLong_Check(pyret)) @@ -292,26 +291,15 @@ python_runtime_validator(void *addr, struct http_request *req, void *data) callable = (PyObject *)addr; - if ((pyreq = pyhttp_request_alloc(req)) == NULL) { - kore_log(LOG_ERR, "cannot create new pyhttp_request"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); - } + if ((pyreq = pyhttp_request_alloc(req)) == NULL) + fatal("python_runtime_validator: pyreq alloc failed"); if (req->flags & HTTP_VALIDATOR_IS_REQUEST) { - if ((arg = pyhttp_request_alloc(data)) == NULL) { - Py_DECREF(pyreq); - kore_log(LOG_ERR, "cannot create new pyhttp_request"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); - } + if ((arg = pyhttp_request_alloc(data)) == NULL) + fatal("python_runtime_validator: pyreq failed"); } else { - if ((arg = PyUnicode_FromString(data)) == NULL) { - Py_DECREF(pyreq); - kore_log(LOG_ERR, "cannot create new pyhttp_request"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); - } + if ((arg = PyUnicode_FromString(data)) == NULL) + fatal("python_runtime_validator: PyUnicode failed"); } if ((args = PyTuple_New(2)) == NULL) @@ -327,8 +315,7 @@ python_runtime_validator(void *addr, struct http_request *req, void *data) if (pyret == NULL) { python_log_error("python_runtime_validator"); - http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0); - return (KORE_RESULT_OK); + fatal("failed to execute python call"); } if (!PyLong_Check(pyret)) @@ -339,6 +326,43 @@ python_runtime_validator(void *addr, struct http_request *req, void *data) return (ret); } + +static void +python_runtime_wsmessage(void *addr, struct connection *c, u_int8_t op, + const void *data, size_t len) +{ + PyObject *callable, *args, *pyret, *pyc, *pyop, *pydata; + + callable = (PyObject *)addr; + + if ((pyc = pyconnection_alloc(c)) == NULL) + fatal("python_runtime_wsmessage: pyc alloc failed"); + + if ((pyop = PyLong_FromLong((long)op)) == NULL) + fatal("python_runtime_wsmessage: PyLong_FromLong failed"); + + if ((pydata = PyBytes_FromStringAndSize(data, len)) == NULL) + fatal("python_runtime_wsmessage: PyBytes_FromString failed"); + + if ((args = PyTuple_New(3)) == NULL) + fatal("python_runtime_wsmessage: PyTuple_New failed"); + + if (PyTuple_SetItem(args, 0, pyc) != 0 || + PyTuple_SetItem(args, 1, pyop) != 0 || + PyTuple_SetItem(args, 2, pydata) != 0) + fatal("python_runtime_wsmessage: PyTuple_SetItem failed"); + + PyErr_Clear(); + pyret = PyObject_Call(callable, args, NULL); + Py_DECREF(args); + + if (pyret == NULL) { + python_log_error("python_runtime_wsconnect"); + fatal("failed to execute python call"); + } + + Py_DECREF(pyret); +} #endif static void @@ -405,11 +429,8 @@ python_runtime_connect(void *addr, struct connection *c) callable = (PyObject *)addr; - if ((pyc = pyconnection_alloc(c)) == NULL) { - kore_log(LOG_ERR, "cannot create new pyconnection"); - kore_connection_disconnect(c); - return; - } + if ((pyc = pyconnection_alloc(c)) == NULL) + fatal("python_runtime_connect: pyc alloc failed"); if ((args = PyTuple_New(1)) == NULL) fatal("python_runtime_connect: PyTuple_New failed"); @@ -704,6 +725,29 @@ pyhttp_argument(struct pyhttp_request *pyreq, PyObject *args) } static PyObject * +pyhttp_websocket_handshake(struct pyhttp_request *pyreq, PyObject *args) +{ + const char *onconnect, *onmsg, *ondisconnect; + + if (!PyArg_ParseTuple(args, "sss", &onconnect, &onmsg, &ondisconnect)) { + PyErr_SetString(PyExc_TypeError, "invalid parameters"); + return (NULL); + } + + kore_websocket_handshake(pyreq->req, onconnect, onmsg, ondisconnect); + + Py_RETURN_TRUE; +} + +static PyObject * +python_websocket_broadcast(PyObject *self, PyObject *args) +{ + printf("websocket_broadcast\n"); + + Py_RETURN_TRUE; +} + +static PyObject * pyhttp_get_host(struct pyhttp_request *pyreq, void *closure) { PyObject *host; diff --git a/src/runtime.c b/src/runtime.c @@ -32,6 +32,9 @@ static void native_runtime_connect(void *, struct connection *); #if !defined(KORE_NO_HTTP) static int native_runtime_http_request(void *, struct http_request *); static int native_runtime_validator(void *, struct http_request *, void *); + +static void native_runtime_wsmessage(void *, struct connection *, u_int8_t, + const void *, size_t); #endif struct kore_runtime kore_native_runtime = { @@ -39,6 +42,9 @@ struct kore_runtime kore_native_runtime = { #if !defined(KORE_NO_HTTP) .http_request = native_runtime_http_request, .validator = native_runtime_validator, + .wsconnect = native_runtime_connect, + .wsmessage = native_runtime_wsmessage, + .wsdisconnect = native_runtime_connect, #endif .onload = native_runtime_onload, .connect = native_runtime_connect, @@ -95,6 +101,25 @@ kore_runtime_validator(struct kore_runtime_call *rcall, { return (rcall->runtime->validator(rcall->addr, req, data)); } + +void +kore_runtime_wsconnect(struct kore_runtime_call *rcall, struct connection *c) +{ + rcall->runtime->wsconnect(rcall->addr, c); +} + +void +kore_runtime_wsmessage(struct kore_runtime_call *rcall, struct connection *c, + u_int8_t op, const void *data, size_t len) +{ + rcall->runtime->wsmessage(rcall->addr, c, op, data, len); +} + +void +kore_runtime_wsdisconnect(struct kore_runtime_call *rcall, struct connection *c) +{ + rcall->runtime->wsdisconnect(rcall->addr, c); +} #endif static void @@ -142,4 +167,15 @@ native_runtime_validator(void *addr, struct http_request *req, void *data) *(void **)&(cb) = addr; return (cb(req, data)); } + +static void +native_runtime_wsmessage(void *addr, struct connection *c, u_int8_t op, + const void *data, size_t len) +{ + void (*cb)(struct connection *, u_int8_t, const void *, size_t); + + *(void **)&(cb) = addr; + cb(c, op, data, len); + +} #endif diff --git a/src/websocket.c b/src/websocket.c @@ -49,7 +49,8 @@ static void websocket_frame_build(struct kore_buf *, u_int8_t, const void *, size_t); void -kore_websocket_handshake(struct http_request *req, struct kore_wscbs *wscbs) +kore_websocket_handshake(struct http_request *req, const char *onconnect, + const char *onmessage, const char *ondisconnect) { SHA_CTX sctx; struct kore_buf *buf; @@ -102,12 +103,35 @@ kore_websocket_handshake(struct http_request *req, struct kore_wscbs *wscbs) req->owner->disconnect = websocket_disconnect; req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS; - req->owner->wscbs = wscbs; req->owner->idle_timer.start = kore_time_ms(); req->owner->idle_timer.length = kore_websocket_timeout; - if (wscbs->connect != NULL) - wscbs->connect(req->owner); + if (onconnect != NULL) { + req->owner->ws_connect = kore_runtime_getcall(onconnect); + if (req->owner->ws_connect == NULL) + fatal("no symbol '%s' for ws_connect", onconnect); + } else { + req->owner->ws_connect = NULL; + } + + if (onmessage != NULL) { + req->owner->ws_message = kore_runtime_getcall(onmessage); + if (req->owner->ws_message == NULL) + fatal("no symbol '%s' for ws_message", onmessage); + } else { + req->owner->ws_message = NULL; + } + + if (ondisconnect != NULL) { + req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect); + if (req->owner->ws_disconnect == NULL) + fatal("no symbol '%s' for ws_disconnect", ondisconnect); + } else { + req->owner->ws_disconnect = NULL; + } + + if (req->owner->ws_connect != NULL) + kore_runtime_wsconnect(req->owner->ws_connect, req->owner); } void @@ -245,12 +269,10 @@ websocket_recv_frame(struct netbuf *nb) { struct connection *c; int ret; - struct kore_wscbs *wscbs; u_int64_t len, i, total; u_int8_t op, moff, extra; c = nb->owner; - wscbs = c->wscbs; op = nb->buf[0] & WEBSOCKET_OPCODE_MASK; len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]); @@ -300,8 +322,10 @@ websocket_recv_frame(struct netbuf *nb) break; case WEBSOCKET_OP_TEXT: case WEBSOCKET_OP_BINARY: - if (wscbs->message != NULL) - wscbs->message(c, op, &nb->buf[moff + 4], len); + if (c->ws_message != NULL) { + kore_runtime_wsmessage(c->ws_message, + c, op, &nb->buf[moff + 4], len); + } break; case WEBSOCKET_OP_CLOSE: kore_connection_disconnect(c); @@ -322,8 +346,6 @@ websocket_recv_frame(struct netbuf *nb) static void websocket_disconnect(struct connection *c) { - struct kore_wscbs *wscbs = c->wscbs; - - if (wscbs->disconnect != NULL) - wscbs->disconnect(c); + if (c->ws_disconnect != NULL) + kore_runtime_wsdisconnect(c->ws_disconnect, c); }