@@ -27,22 +27,22 @@ struct db_driver {
};
-enum {
- RING_NO_QUERY,
- RING_QUERY_READY,
-};
-
struct db_stmt_ring {
- /* Ring buffer: 1 status byte + string */
- char *ring; /* pointer to the ring */
- uint32_t size; /* size of ring buffer in element */
- int length; /* length of one ring buffer element */
- uint32_t wr_item; /* write item in ring buffer */
- uint32_t rd_item; /* read item in ring buffer */
- char *wr_place;
+
+ char *elems; /* Buffer containing `size` strings of `length` bytes */
+
+ uint32_t size; /* No. of elements in ring buffer */
+ uint32_t used; /* No. of elements in ring buffer in use */
+ uint32_t length; /* Length of one element in ring buffer */
+ uint32_t wr_idx; /* Index of next element to write in ring buffer */
+ uint32_t rd_idx; /* Index of next element to read in ring buffer */
+
+ pthread_t thread_id;
pthread_cond_t cond;
pthread_mutex_t mutex;
+
int full;
+
};
struct db_stmt {
@@ -60,7 +60,6 @@ struct db_instance {
struct db_driver *driver;
/* DB ring buffer */
struct db_stmt_ring ring;
- pthread_t db_thread_id;
/* Backlog system */
unsigned int backlog_memcap;
unsigned int backlog_memusage;
@@ -63,8 +63,10 @@ static int _process_backlog(struct ulogd_pluginstance *upi);
static int _configure_ring(struct ulogd_pluginstance *upi);
static int _start_ring(struct ulogd_pluginstance *upi);
-static int _add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di);
+static int _add_to_ring(struct ulogd_pluginstance *upi);
static void *_process_ring(void *arg);
+static char *_get_ring_elem(struct db_stmt_ring *r, uint32_t i);
+static void _incr_ring_used(struct db_stmt_ring *r, int incr);
int
ulogd_db_configure(struct ulogd_pluginstance *upi,
@@ -182,17 +184,16 @@ ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
case SIGTERM:
case SIGINT:
if (di->ring.size) {
- int s = pthread_cancel(di->db_thread_id);
+ int s = pthread_cancel(di->ring.thread_id);
if (s != 0) {
ulogd_log(ULOGD_ERROR,
- "Can't cancel injection thread\n");
+ "Can't cancel ring-processing thread\n");
break;
}
- s = pthread_join(di->db_thread_id, NULL);
+ s = pthread_join(di->ring.thread_id, NULL);
if (s != 0) {
ulogd_log(ULOGD_ERROR,
- "Error waiting for injection thread"
- "cancelation\n");
+ "Error waiting for ring-processing thread cancellation\n");
}
}
break;
@@ -293,7 +294,7 @@ _interp_db_main(struct ulogd_pluginstance *upi)
struct db_instance *di = (struct db_instance *) &upi->private;
if (di->ring.size) {
- if (_add_to_ring(upi, di) < 0)
+ if (_add_to_ring(upi) < 0)
return ULOGD_IRET_ERR;
return ULOGD_IRET_OK;
}
@@ -372,11 +373,11 @@ _stop_db(struct ulogd_pluginstance *upi)
di->stmt = NULL;
}
if (di->ring.size > 0) {
- pthread_cancel(di->db_thread_id);
- free(di->ring.ring);
+ pthread_cancel(di->ring.thread_id);
pthread_cond_destroy(&di->ring.cond);
pthread_mutex_destroy(&di->ring.mutex);
- di->ring.ring = NULL;
+ free(di->ring.elems);
+ di->ring.elems = NULL;
}
}
@@ -743,18 +744,17 @@ _start_ring(struct ulogd_pluginstance *upi)
return 0;
/* allocate */
- di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length);
- if (di->ring.ring == NULL)
+ di->ring.elems = calloc(di->ring.size, di->ring.length);
+ if (di->ring.elems == NULL)
return -1;
- di->ring.wr_place = di->ring.ring;
+ di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0;
ulogd_log(ULOGD_NOTICE,
- "Allocating %d elements of size %d for ring\n",
+ "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n",
di->ring.size, di->ring.length);
/* init start of query for each element */
for(i = 0; i < di->ring.size; i++)
- strcpy(di->ring.ring + di->ring.length * i + 1,
- di->stmt);
+ strcpy(_get_ring_elem(&di->ring, i), di->stmt);
/* init cond & mutex */
ret = pthread_cond_init(&di->ring.cond, NULL);
@@ -765,7 +765,7 @@ _start_ring(struct ulogd_pluginstance *upi)
goto cond_error;
/* create thread */
- ret = pthread_create(&di->db_thread_id, NULL, _process_ring, upi);
+ ret = pthread_create(&di->ring.thread_id, NULL, _process_ring, upi);
if (ret != 0)
goto mutex_error;
@@ -776,66 +776,78 @@ mutex_error:
cond_error:
pthread_cond_destroy(&di->ring.cond);
alloc_error:
- free(di->ring.ring);
+ free(di->ring.elems);
return -1;
}
static int
-_add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di)
+_add_to_ring(struct ulogd_pluginstance *upi)
{
- if (*di->ring.wr_place == RING_QUERY_READY) {
- if (di->ring.full == 0) {
+ struct db_instance *di = (struct db_instance *) &upi->private;
+
+ if (di->ring.used == di->ring.size) {
+ if (!di->ring.full) {
ulogd_log(ULOGD_ERROR, "No place left in ring\n");
di->ring.full = 1;
}
return ULOGD_IRET_OK;
- } else if (di->ring.full) {
+ }
+
+ if (di->ring.full) {
ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n");
di->ring.full = 0;
}
- _bind_sql_stmt(upi, di->ring.wr_place + 1);
- *di->ring.wr_place = RING_QUERY_READY;
+
+ _bind_sql_stmt(upi, _get_ring_elem(&di->ring, di->ring.wr_idx));
+ _incr_ring_used(&di->ring, 1);
+
pthread_cond_signal(&di->ring.cond);
- di->ring.wr_item ++;
- di->ring.wr_place += di->ring.length;
- if (di->ring.wr_item == di->ring.size) {
- di->ring.wr_item = 0;
- di->ring.wr_place = di->ring.ring;
- }
return ULOGD_IRET_OK;
}
static void *
-_process_ring(void *gdi)
+_process_ring(void *arg)
{
- struct ulogd_pluginstance *upi = gdi;
+ struct ulogd_pluginstance *upi = arg;
struct db_instance *di = (struct db_instance *) &upi->private;
- char *wr_place;
- wr_place = di->ring.ring;
pthread_mutex_lock(&di->ring.mutex);
while(1) {
/* wait cond */
pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
- while (*wr_place == RING_QUERY_READY) {
- if (di->driver->execute(upi, wr_place + 1,
- strlen(wr_place + 1)) < 0) {
+ while (di->ring.used > 0) {
+ char *stmt = _get_ring_elem(&di->ring, di->ring.rd_idx);
+
+ if (di->driver->execute(upi, stmt,
+ strlen(stmt)) < 0) {
+
di->driver->close_db(upi);
while (di->driver->open_db(upi) < 0)
sleep(1);
/* try to re run query */
continue;
}
- *wr_place = RING_NO_QUERY;
- di->ring.rd_item++;
- if (di->ring.rd_item == di->ring.size) {
- di->ring.rd_item = 0;
- wr_place = di->ring.ring;
- } else
- wr_place += di->ring.length;
+
+ _incr_ring_used(&di->ring, -1);
}
}
return NULL;
}
+
+static char *
+_get_ring_elem(struct db_stmt_ring *r, uint32_t i)
+{
+ return &r->elems[i * r->length];
+}
+
+static void
+_incr_ring_used(struct db_stmt_ring *r, int incr)
+{
+ uint32_t *idx = incr > 0 ? &r->wr_idx : &r->rd_idx;
+
+ *idx = (*idx + 1) % r->size;
+
+ r->used += incr;
+}
* Rename some fields. * Use `uint32_t` consistently for all sizes and indices. * Move thread ID into the ring structure. * Replace status flag with a count of in-use elements. Signed-off-by: Jeremy Sowden <jeremy@azazel.net> --- include/ulogd/db.h | 25 ++++++------ util/db.c | 100 +++++++++++++++++++++++++-------------------- 2 files changed, 68 insertions(+), 57 deletions(-)