commit 7e74cd6e62b1b7b3734be3659ab5ca7c8d5f167a
parent c063dd52f63ea556c128c69f501da2f53499bb5c
Author: Joris Vink <joris@coders.se>
Date: Thu, 21 Aug 2014 16:36:12 +0200
Introduce a wait queue for our pgsql code.
Instead of letting http_requests spin, if we cannot allocate
a connection for the request we will queue them up put them to sleep.
When a connection becomes available, we'll wake up a request that
was waiting for a connection and let it continue.
This completely avoids consuming massive amounts of cpu time
when dealing with thousands of requests waiting for a pgsql
worker to become ready.
Diffstat:
4 files changed, 83 insertions(+), 10 deletions(-)
diff --git a/includes/http.h b/includes/http.h
@@ -156,6 +156,7 @@ struct http_file {
#define HTTP_REQUEST_COMPLETE 0x01
#define HTTP_REQUEST_DELETE 0x02
#define HTTP_REQUEST_SLEEPING 0x04
+#define HTTP_REQUEST_PGSQL_QUEUE 0x10
struct kore_task;
diff --git a/includes/pgsql.h b/includes/pgsql.h
@@ -50,6 +50,7 @@ int kore_pgsql_async(struct kore_pgsql *,
int kore_pgsql_ntuples(struct kore_pgsql *);
void kore_pgsql_logerror(struct kore_pgsql *);
+void kore_pgsql_queue_remove(struct http_request *);
char *kore_pgsql_getvalue(struct kore_pgsql *, int, int);
#define KORE_PGSQL_STATE_INIT 1
diff --git a/src/http.c b/src/http.c
@@ -336,6 +336,9 @@ http_request_free(struct http_request *req)
pgsql = LIST_FIRST(&(req->pgsqls));
kore_pgsql_cleanup(pgsql);
}
+
+ if (req->flags & HTTP_REQUEST_PGSQL_QUEUE)
+ kore_pgsql_queue_remove(req);
#endif
kore_debug("http_request_free: %p->%p", req->owner, req);
diff --git a/src/pgsql.c b/src/pgsql.c
@@ -34,6 +34,11 @@ struct pgsql_job {
TAILQ_ENTRY(pgsql_job) list;
};
+struct pgsql_wait {
+ struct http_request *req;
+ TAILQ_ENTRY(pgsql_wait) list;
+};
+
#define PGSQL_IS_BLOCKING 0
#define PGSQL_IS_ASYNC 1
@@ -44,8 +49,13 @@ static void pgsql_conn_release(struct kore_pgsql *);
static void pgsql_conn_cleanup(struct pgsql_conn *);
static int pgsql_conn_create(struct kore_pgsql *);
static void pgsql_read_result(struct kore_pgsql *, int);
+static void pgsql_queue_add(struct http_request *);
+static void pgsql_queue_wakeup(void);
+static struct kore_pool pgsql_job_pool;
+static struct kore_pool pgsql_wait_pool;
static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free;
+static TAILQ_HEAD(, pgsql_wait) pgsql_wait_queue;
static u_int16_t pgsql_conn_count;
char *pgsql_conn_string = NULL;
u_int16_t pgsql_conn_max = PGSQL_CONN_MAX;
@@ -55,6 +65,12 @@ kore_pgsql_init(void)
{
pgsql_conn_count = 0;
TAILQ_INIT(&pgsql_conn_free);
+ TAILQ_INIT(&pgsql_wait_queue);
+
+ kore_pool_init(&pgsql_job_pool, "pgsql_job_pool",
+ sizeof(struct pgsql_job), 100);
+ kore_pool_init(&pgsql_wait_pool, "pgsql_wait_pool",
+ sizeof(struct pgsql_wait), 100);
}
int
@@ -70,8 +86,12 @@ kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req,
pgsql->conn = NULL;
if (TAILQ_EMPTY(&pgsql_conn_free)) {
- if ((pgsql_conn_count >= pgsql_conn_max) ||
- !pgsql_conn_create(pgsql))
+ if (pgsql_conn_count >= pgsql_conn_max) {
+ pgsql_queue_add(req);
+ return (KORE_RESULT_ERROR);
+ }
+
+ if (!pgsql_conn_create(pgsql))
return (KORE_RESULT_ERROR);
}
@@ -84,7 +104,7 @@ kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req,
TAILQ_REMOVE(&pgsql_conn_free, conn, list);
pgsql->conn = conn;
- conn->job = kore_malloc(sizeof(struct pgsql_job));
+ conn->job = kore_pool_get(&pgsql_job_pool);
conn->job->query = kore_strdup(query);
conn->job->start = kore_time_ms();
conn->job->pgsql = pgsql;
@@ -182,11 +202,8 @@ kore_pgsql_cleanup(struct kore_pgsql *pgsql)
if (pgsql->error != NULL)
kore_mem_free(pgsql->error);
- if (pgsql->conn != NULL) {
- while (PQgetResult(pgsql->conn->db) != NULL)
- ;
+ if (pgsql->conn != NULL)
pgsql_conn_release(pgsql);
- }
pgsql->result = NULL;
pgsql->error = NULL;
@@ -214,6 +231,55 @@ kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col)
return (PQgetvalue(pgsql->result, row, col));
}
+void
+kore_pgsql_queue_remove(struct http_request *req)
+{
+ struct pgsql_wait *pgw, *next;
+
+ for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) {
+ next = TAILQ_NEXT(pgw, list);
+ if (pgw->req != req)
+ continue;
+
+ TAILQ_REMOVE(&pgsql_wait_queue, pgw, list);
+ kore_pool_put(&pgsql_wait_pool, pgw);
+ return;
+ }
+}
+
+static void
+pgsql_queue_add(struct http_request *req)
+{
+ struct pgsql_wait *pgw;
+
+ http_request_sleep(req);
+
+ pgw = kore_pool_get(&pgsql_wait_pool);
+ pgw->req = req;
+ pgw->req->flags |= HTTP_REQUEST_PGSQL_QUEUE;
+
+ TAILQ_INSERT_TAIL(&pgsql_wait_queue, pgw, list);
+}
+
+static void
+pgsql_queue_wakeup(void)
+{
+ struct pgsql_wait *pgw, *next;
+
+ for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) {
+ next = TAILQ_NEXT(pgw, list);
+ if (pgw->req->flags & HTTP_REQUEST_DELETE)
+ continue;
+
+ http_request_wakeup(pgw->req);
+ pgw->req->flags &= ~HTTP_REQUEST_PGSQL_QUEUE;
+
+ TAILQ_REMOVE(&pgsql_wait_queue, pgw, list);
+ kore_pool_put(&pgsql_wait_pool, pgw);
+ return;
+ }
+}
+
static int
pgsql_conn_create(struct kore_pgsql *pgsql)
{
@@ -252,7 +318,7 @@ pgsql_conn_release(struct kore_pgsql *pgsql)
return;
kore_mem_free(pgsql->conn->job->query);
- kore_mem_free(pgsql->conn->job);
+ kore_pool_put(&pgsql_job_pool, pgsql->conn->job);
/* Drain just in case. */
while (PQgetResult(pgsql->conn->db) != NULL)
@@ -264,9 +330,11 @@ pgsql_conn_release(struct kore_pgsql *pgsql)
fd = PQsocket(pgsql->conn->db);
kore_platform_disable_read(fd);
- pgsql->state = KORE_PGSQL_STATE_COMPLETE;
pgsql->conn = NULL;
+ pgsql->state = KORE_PGSQL_STATE_COMPLETE;
+
+ pgsql_queue_wakeup();
}
static void
@@ -290,7 +358,7 @@ pgsql_conn_cleanup(struct pgsql_conn *conn)
pgsql->error = kore_strdup(PQerrorMessage(conn->db));
kore_mem_free(conn->job->query);
- kore_mem_free(conn->job);
+ kore_pool_put(&pgsql_job_pool, conn->job);
conn->job = NULL;
}