kore

Kore is a web application platform for writing scalable, concurrent web based processes in C or Python.
Commits | Files | Refs | README | LICENSE | git clone https://git.kore.io/kore.git

commit 61b6f823c52c959d202f79f736e37f1a323b38e0
parent 2f044cc7eb0b968339cb3243885f368dcd7178b4
Author: Joris Vink <joris@coders.se>
Date:   Mon, 31 Mar 2014 00:57:00 +0200

Improvements to pgsql contrib code.

Including but not limited to:
- Correctly use PQerrorMessage() in case we cleanup with PQfinish
- If we get a network error, cleanup the connection
- No longer call the page handler from inside kore_pgsql_handle()
  but instead just put it to sleep in case we don't need it.
  This does grow the http_requests list quite a bit with sleeping
  connections and can perhaps be improved later on.
- Allow us to on error return OK from a page handler from inside
  the completetion block for KORE_PGSQL().
- Count the cummulative time for a request to finish instead
  of the latest run time for the handler.

Diffstat:
contrib/postgres/kore_pgsql.c | 52+++++++++++++++++++++++++++++++++-------------------
includes/contrib/postgres/kore_pgsql.h | 17++++++++++++++++-
includes/http.h | 1+
modules/example/src/example.c | 2++
src/accesslog.c | 2+-
src/http.c | 4++--
6 files changed, 55 insertions(+), 23 deletions(-)

diff --git a/contrib/postgres/kore_pgsql.c b/contrib/postgres/kore_pgsql.c @@ -91,6 +91,7 @@ kore_pgsql_query(struct http_request *req, char *query, int idx) conn->flags &= ~PGSQL_CONN_FREE; TAILQ_REMOVE(&pgsql_conn_free, conn, list); + req->pgsql[idx]->conn = conn; conn->job = kore_malloc(sizeof(struct pgsql_job)); conn->job->query = kore_strdup(query); conn->job->start = kore_time_ms(); @@ -107,18 +108,23 @@ kore_pgsql_query(struct http_request *req, char *query, int idx) fatal("PQsocket returned < 0 fd on open connection"); kore_platform_schedule_read(fd, conn); + req->pgsql[idx]->state = KORE_PGSQL_STATE_WAIT; kore_debug("query '%s' for %p sent on %p", query, req, conn); - req->pgsql[idx]->state = KORE_PGSQL_STATE_WAIT; return (KORE_RESULT_OK); } void kore_pgsql_handle(void *c, int err) { + int i; struct http_request *req; struct pgsql_conn *conn = (struct pgsql_conn *)c; - int fd, i, (*cb)(struct http_request *); + + if (err) { + pgsql_conn_cleanup(conn); + return; + } i = conn->job->idx; req = conn->job->req; @@ -126,7 +132,7 @@ kore_pgsql_handle(void *c, int err) if (!PQconsumeInput(conn->db)) { req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = PQerrorMessage(conn->db); + req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); } else { if (PQisBusy(conn->db)) { req->pgsql[i]->state = KORE_PGSQL_STATE_WAIT; @@ -152,23 +158,34 @@ kore_pgsql_handle(void *c, int err) req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; req->pgsql[i]->error = - PQresultErrorMessage(req->pgsql[i]->result); + kore_strdup(PQresultErrorMessage(req->pgsql[i]->result)); break; } } } } - if (req->pgsql[i]->state == KORE_PGSQL_STATE_ERROR || - req->pgsql[i]->state == KORE_PGSQL_STATE_RESULT) { - cb = req->hdlr->addr; - cb(req); + if (req->pgsql[i]->state == KORE_PGSQL_STATE_WAIT) + req->flags |= HTTP_REQUEST_SLEEPING; + else + req->flags &= ~HTTP_REQUEST_SLEEPING; +} + +void +kore_pgsql_continue(struct http_request *req, int i) +{ + int fd; + struct pgsql_conn *conn; + + if (req->pgsql[i]->error) { + kore_mem_free(req->pgsql[i]->error); + req->pgsql[i]->error = NULL; } - req->pgsql[i]->error = NULL; if (req->pgsql[i]->result) PQclear(req->pgsql[i]->result); + conn = req->pgsql[i]->conn; switch (req->pgsql[i]->state) { case KORE_PGSQL_STATE_INIT: case KORE_PGSQL_STATE_WAIT: @@ -205,13 +222,14 @@ kore_pgsql_cleanup(struct http_request *req) if (req->pgsql[i] == NULL) continue; - kore_debug("cleaning up pgsql result %d for %p", i, req); - if (req->pgsql[i]->result != NULL) { kore_log(LOG_NOTICE, "cleaning up leaked pgsql result"); PQclear(req->pgsql[i]->result); } + if (req->pgsql[i]->error != NULL) + kore_mem_free(req->pgsql[i]->error); + kore_mem_free(req->pgsql[i]); req->pgsql[i] = NULL; } @@ -235,12 +253,13 @@ pgsql_conn_create(struct http_request *req, int idx) conn->db = PQconnectdb("host=/tmp/ user=joris"); if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) { + req->pgsql[idx]->state = KORE_PGSQL_STATE_ERROR; + req->pgsql[idx]->error = kore_strdup(PQerrorMessage(conn->db)); pgsql_conn_cleanup(conn); return (KORE_RESULT_ERROR); } conn->job = NULL; - pgsql_conn_count++; conn->flags = PGSQL_CONN_FREE; conn->type = KORE_TYPE_PGSQL_CONN; TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list); @@ -251,8 +270,8 @@ pgsql_conn_create(struct http_request *req, int idx) static void pgsql_conn_cleanup(struct pgsql_conn *conn) { + int i; struct http_request *req; - int i, (*cb)(struct http_request *); kore_debug("pgsql_conn_cleanup(): %p", conn); @@ -264,12 +283,7 @@ pgsql_conn_cleanup(struct pgsql_conn *conn) req = conn->job->req; req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR; - req->pgsql[i]->error = PQerrorMessage(conn->db); - - cb = req->hdlr->addr; - cb(req); - - req->pgsql[i]->state = KORE_PGSQL_STATE_COMPLETE; + req->pgsql[i]->error = kore_strdup(PQerrorMessage(conn->db)); req->flags &= ~HTTP_REQUEST_SLEEPING; kore_mem_free(conn->job->query); diff --git a/includes/contrib/postgres/kore_pgsql.h b/includes/contrib/postgres/kore_pgsql.h @@ -22,6 +22,7 @@ void kore_pgsql_init(void); void kore_pgsql_handle(void *, int); void kore_pgsql_cleanup(struct http_request *); +void kore_pgsql_continue(struct http_request *, int); int kore_pgsql_query(struct http_request *, char *, int); int kore_pgsql_ntuples(struct http_request *, int); @@ -30,6 +31,7 @@ struct kore_pgsql { u_int8_t state; char *error; PGresult *result; + void *conn; }; #define KORE_PGSQL_STATE_INIT 1 @@ -49,12 +51,25 @@ struct kore_pgsql { case KORE_PGSQL_STATE_ERROR: \ case KORE_PGSQL_STATE_RESULT: \ s; \ - return (KORE_RESULT_RETRY); \ case KORE_PGSQL_STATE_COMPLETE: \ break; \ default: \ + kore_pgsql_continue(r, i); \ + return (KORE_RESULT_RETRY); \ + } \ + if (r->pgsql[i]->state == KORE_PGSQL_STATE_ERROR || \ + r->pgsql[i]->state == KORE_PGSQL_STATE_RESULT) { \ + kore_pgsql_continue(r, i); \ return (KORE_RESULT_RETRY); \ } \ } while (0); +#define KORE_PGSQL_EXEC(r, q, i) \ + do { \ + if (r->pgsql[i] == NULL) \ + kore_pgsql_query(r, q, i); \ + if (r->pgsql[i] == NULL) \ + return (KORE_RESULT_RETRY); \ + } while (0); + #endif diff --git a/includes/http.h b/includes/http.h @@ -129,6 +129,7 @@ struct http_request { int status; u_int64_t start; u_int64_t end; + u_int64_t total; char host[KORE_DOMAINNAME_LEN]; char path[HTTP_URI_LEN]; char *agent; diff --git a/modules/example/src/example.c b/modules/example/src/example.c @@ -341,6 +341,8 @@ serve_pgsql_test(struct http_request *req) kore_log(LOG_NOTICE, "pgsql: %s", (req->pgsql[0]->error) ? req->pgsql[0]->error : "unknown"); + http_response(req, 500, "fail", 4); + return (KORE_RESULT_OK); } else { //r = kore_pgsql_ntuples(req, 0); } diff --git a/src/accesslog.c b/src/accesslog.c @@ -162,7 +162,7 @@ kore_accesslog(struct http_request *req) logpacket.method = req->method; logpacket.worker_id = worker->id; logpacket.worker_cpu = worker->cpu; - logpacket.time_req = req->end - req->start; + logpacket.time_req = req->total; kore_strlcpy(logpacket.host, req->host, sizeof(logpacket.host)); kore_strlcpy(logpacket.path, req->path, sizeof(logpacket.path)); diff --git a/src/http.c b/src/http.c @@ -86,12 +86,10 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, return (KORE_RESULT_ERROR); } -#if 0 if (strcasecmp(version, "http/1.1")) { http_error_response(c, 505); return (KORE_RESULT_ERROR); } -#endif if (!strcasecmp(method, "get")) { m = HTTP_METHOD_GET; @@ -106,6 +104,7 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host, req = kore_pool_get(&http_request_pool); req->end = 0; + req->total = 0; req->start = 0; req->owner = c; req->status = 0; @@ -200,6 +199,7 @@ http_process(void) } } req->end = kore_time_ms(); + req->total += req->end - req->start; switch (r) { case KORE_RESULT_OK: