kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit cdc3347120e536fdaf98923f72b3a882e4e876ce
parent ffb9fd2fc5c5c89de4725c67fc3cbe6cc0b64b32
Author: Joris Vink <joris@coders.se>
Date:   Wed, 16 Oct 2019 12:05:27 +0200

Add kore.sendmsg(object, worker=None) to the python api.

This allows you to send Python objects that can be run through pickle
to other worker processes.

If your application implements koreapp.onmsg() you will be able to receive
these objects.

Diffstat:
include/kore/kore.h | 4++++
include/kore/python_methods.h | 4+++-
src/config.c | 2+-
src/keymgr.c | 4+++-
src/kore.c | 6+++++-
src/msg.c | 48+++++++++++++++++++++++++++++++++++++++++++-----
src/python.c | 95++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
7 files changed, 153 insertions(+), 10 deletions(-)

diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -418,6 +418,8 @@ struct kore_alog_header { u_int16_t loglen; } __attribute__((packed)); +#define KORE_WORKER_MAX UCHAR_MAX + struct kore_worker { u_int8_t id; u_int8_t cpu; @@ -512,6 +514,7 @@ struct kore_timer { #define KORE_MSG_CERTIFICATE_REQ 8 #define KORE_MSG_CRL 9 #define KORE_MSG_ACCEPT_AVAILABLE 10 +#define KORE_PYTHON_SEND_OBJ 11 /* Predefined message targets. */ #define KORE_MSG_PARENT 1000 @@ -743,6 +746,7 @@ void kore_websocket_broadcast(struct connection *, void kore_msg_init(void); void kore_msg_worker_init(void); void kore_msg_parent_init(void); +void kore_msg_unregister(u_int8_t); void kore_msg_parent_add(struct kore_worker *); void kore_msg_parent_remove(struct kore_worker *); void kore_msg_send(u_int16_t, u_int8_t, const void *, size_t); diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -55,7 +55,8 @@ static PyObject *python_kore_task_create(PyObject *, PyObject *); static PyObject *python_kore_socket_wrap(PyObject *, PyObject *); static PyObject *python_kore_domain(PyObject *, PyObject *, PyObject *); static PyObject *python_kore_gather(PyObject *, PyObject *, PyObject *); - +static PyObject *python_kore_sendobj(PyObject *, PyObject *, + PyObject *); static PyObject *python_kore_server(PyObject *, PyObject *, PyObject *); @@ -100,6 +101,7 @@ static struct PyMethodDef pykore_methods[] = { METHOD("server", python_kore_server, METH_VARARGS | METH_KEYWORDS), METHOD("gather", python_kore_gather, METH_VARARGS | METH_KEYWORDS), METHOD("domain", python_kore_domain, METH_VARARGS | METH_KEYWORDS), + METHOD("sendobj", python_kore_sendobj, METH_VARARGS | METH_KEYWORDS), METHOD("websocket_broadcast", python_websocket_broadcast, METH_VARARGS), #if defined(KORE_USE_PGSQL) METHOD("dbsetup", python_kore_pgsql_register, METH_VARARGS), diff --git a/src/config.c b/src/config.c @@ -1488,7 +1488,7 @@ configure_workers(char *option) { int err; - worker_count = kore_strtonum(option, 10, 1, 255, &err); + worker_count = kore_strtonum(option, 10, 1, KORE_WORKER_MAX, &err); if (err != KORE_RESULT_OK) { printf("%s is not a valid worker number\n", option); return (KORE_RESULT_ERROR); diff --git a/src/keymgr.c b/src/keymgr.c @@ -155,7 +155,9 @@ kore_keymgr_run(void) kore_seccomp_filter("keymgr", filter_keymgr, KORE_FILTER_LEN(filter_keymgr)); #endif - +#if defined(KORE_USE_PYTHON) + kore_msg_unregister(KORE_PYTHON_SEND_OBJ); +#endif kore_worker_privdrop(keymgr_runas_user, keymgr_root_path); if (rand_file != NULL) { diff --git a/src/kore.c b/src/kore.c @@ -243,6 +243,7 @@ main(int argc, char *argv[]) kore_platform_init(); kore_log_init(); + kore_msg_init(); #if !defined(KORE_NO_HTTP) http_parent_init(); #if defined(KORE_USE_CURL) @@ -865,7 +866,6 @@ kore_server_start(int argc, char *argv[]) } kore_platform_proctitle("[parent]"); - kore_msg_init(); kore_worker_init(); /* Set worker_max_connections for kore_connection_init(). */ @@ -885,6 +885,10 @@ kore_server_start(int argc, char *argv[]) kore_timer_add(kore_accesslog_run, 100, NULL, 0); #endif +#if defined(KORE_USE_PYTHON) + kore_msg_unregister(KORE_PYTHON_SEND_OBJ); +#endif + while (quit != 1) { if (sig_recv != 0) { last_sig = sig_recv; diff --git a/src/msg.c b/src/msg.c @@ -28,8 +28,6 @@ struct msg_type { TAILQ_ENTRY(msg_type) list; }; -TAILQ_HEAD(, msg_type) msg_types; - static struct msg_type *msg_type_lookup(u_int8_t); static int msg_recv_packet(struct netbuf *); static int msg_recv_data(struct netbuf *); @@ -41,9 +39,18 @@ static void msg_type_shutdown(struct kore_msg *, const void *); static void msg_type_websocket(struct kore_msg *, const void *); #endif +static TAILQ_HEAD(, msg_type) msg_types; +static int cacheidx = 0; +static struct connection *conncache[KORE_WORKER_MAX]; + void kore_msg_init(void) { + int i; + + for (i = 0; i < KORE_WORKER_MAX; i++) + conncache[i] = NULL; + TAILQ_INIT(&msg_types); } @@ -76,6 +83,11 @@ kore_msg_parent_add(struct kore_worker *kw) kw->msg[0]->disconnect = msg_disconnected_worker; kw->msg[0]->handle = kore_connection_handle; + if (cacheidx >= KORE_WORKER_MAX) + fatal("%s: too many workers", __func__); + + conncache[cacheidx++] = kw->msg[0]; + TAILQ_INSERT_TAIL(&connections, kw->msg[0], list); kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]); @@ -114,6 +126,18 @@ kore_msg_worker_init(void) sizeof(struct kore_msg), 0, msg_recv_packet); } +void +kore_msg_unregister(u_int8_t id) +{ + struct msg_type *type; + + if ((type = msg_type_lookup(id)) == NULL) + return; + + TAILQ_REMOVE(&msg_types, type, list); + kore_free(type); +} + int kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *)) { @@ -164,7 +188,9 @@ static int msg_recv_data(struct netbuf *nb) { struct connection *c; + u_int8_t dst; struct msg_type *type; + int deliver, i; u_int16_t destination; struct kore_msg *msg = (struct kore_msg *)nb->buf; @@ -182,12 +208,24 @@ msg_recv_data(struct netbuf *nb) if (worker == NULL && type == NULL) { destination = msg->dst; - TAILQ_FOREACH(c, &connections, list) { + + for (i = 0; conncache[i] != NULL; i++) { + c = conncache[i]; if (c->proto != CONN_PROTO_MSG || c->hdlr_extra == NULL) continue; - if (destination != KORE_MSG_WORKER_ALL && - *(u_int8_t *)c->hdlr_extra != destination) + deliver = 1; + dst = *(u_int8_t *)c->hdlr_extra; + + if (destination == KORE_MSG_WORKER_ALL) { + if (keymgr_active && dst == 0) + deliver = 0; + } else { + if (dst != destination) + deliver = 0; + } + + if (deliver == 0) continue; /* This allows the worker to receive the correct id. */ diff --git a/src/python.c b/src/python.c @@ -56,6 +56,7 @@ static PyObject *python_import(const char *); static PyObject *pyconnection_alloc(struct connection *); static PyObject *python_callable(PyObject *, const char *); static void python_split_arguments(char *, char **, size_t); +static void python_kore_recvobj(struct kore_msg *, const void *); static const char *python_string_from_dict(PyObject *, const char *); static int python_bool_from_dict(PyObject *, const char *, int *); @@ -255,9 +256,13 @@ static struct coro_list coro_suspended; extern const char *__progname; +static PyObject *pickle = NULL; +static PyObject *pickle_dumps = NULL; +static PyObject *pickle_loads = NULL; +static PyObject *python_tracer = NULL; + /* XXX */ static struct python_coro *coro_running = NULL; -static PyObject *python_tracer = NULL; #if !defined(KORE_SINGLE_BINARY) const char *kore_pymodule = NULL; @@ -296,6 +301,8 @@ kore_python_init(void) PyMem_SetAllocator(PYMEM_DOMAIN_RAW, &allocator); PyMem_SetupDebugHooks(); + kore_msg_register(KORE_PYTHON_SEND_OBJ, python_kore_recvobj); + if (PyImport_AppendInittab("kore", &python_module_init) == -1) fatal("kore_python_init: failed to add new module"); @@ -307,6 +314,15 @@ kore_python_init(void) Py_InitializeEx(0); + if ((pickle = PyImport_ImportModule("pickle")) == NULL) + fatal("failed to import pickle module"); + + if ((pickle_dumps = PyObject_GetAttrString(pickle, "dumps")) == NULL) + fatal("pickle module has no dumps method"); + + if ((pickle_loads = PyObject_GetAttrString(pickle, "loads")) == NULL) + fatal("pickle module has no loads method"); + #if defined(__linux__) kore_seccomp_filter("python", filter_python, KORE_FILTER_LEN(filter_python)); @@ -2056,6 +2072,83 @@ python_kore_setname(PyObject *self, PyObject *args) } static PyObject * +python_kore_sendobj(PyObject *self, PyObject *args, PyObject *kwargs) +{ + long val; + u_int16_t dst; + char *ptr; + Py_ssize_t length; + PyObject *object, *bytes; + + if (!PyArg_ParseTuple(args, "O", &object)) + return (NULL); + + bytes = PyObject_CallFunctionObjArgs(pickle_dumps, object, NULL); + if (bytes == NULL) + return (NULL); + + if (PyBytes_AsStringAndSize(bytes, &ptr, &length) == -1) { + Py_DECREF(bytes); + return (NULL); + } + + dst = KORE_MSG_WORKER_ALL; + + if (kwargs != NULL) { + if (python_long_from_dict(kwargs, "worker", &val)) { + if (val <= 0 || val > worker_count || + val >= KORE_WORKER_MAX) { + PyErr_Format(PyExc_RuntimeError, + "worker %ld invalid", val); + Py_DECREF(bytes); + return (NULL); + } + + dst = val; + } + } + + kore_msg_send(dst, KORE_PYTHON_SEND_OBJ, ptr, length); + Py_DECREF(bytes); + + Py_RETURN_NONE; +} + +static void +python_kore_recvobj(struct kore_msg *msg, const void *data) +{ + struct kore_runtime *rt; + PyObject *onmsg, *ret, *bytes, *obj; + + if ((onmsg = kore_module_getsym("koreapp.onmsg", &rt)) == NULL) + return; + + if (rt->type != KORE_RUNTIME_PYTHON) + return; + + if ((bytes = PyBytes_FromStringAndSize(data, msg->length)) == NULL) { + Py_DECREF(onmsg); + kore_python_log_error("kore.recvobj"); + return; + } + + obj = PyObject_CallFunctionObjArgs(pickle_loads, bytes, NULL); + Py_DECREF(bytes); + + if (obj == NULL) { + Py_DECREF(onmsg); + kore_python_log_error("kore.recvobj"); + return; + } + + ret = PyObject_CallFunctionObjArgs(onmsg, obj, NULL); + + Py_DECREF(obj); + Py_DECREF(onmsg); + Py_XDECREF(ret); +} + +static PyObject * python_kore_suspend(PyObject *self, PyObject *args) { struct pysuspend_op *op;