diff mbox series

[ulogd2,v2,v2,25/34] db: synchronize access to ring-buffer

Message ID 20221129214749.247878-26-jeremy@azazel.net
State Changes Requested
Delegated to: Pablo Neira
Headers show
Series Refactor of the DB output plug-ins | expand

Commit Message

Jeremy Sowden Nov. 29, 2022, 9:47 p.m. UTC
There are a mutex and condition-variable associated with the
ring-buffer.  However, they are not used to synchronize access to the
buffer, but only to wake the thread that processes the buffer when new
statements are added to it.  Thus there is nothing to prevent concurrent
modifications of the buffer.

Instead, acquire the mutex before adding to the buffer, and, in the
processing thread, copy the statement we're about to execute out of the
buffer and release the mutex while processing it.

Signed-off-by: Jeremy Sowden <jeremy@azazel.net>
---
 include/ulogd/db.h |  2 ++
 util/db.c          | 63 +++++++++++++++++++++++++++++++++++++---------
 2 files changed, 53 insertions(+), 12 deletions(-)
diff mbox series

Patch

diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index 6c2e3d07f463..bf4a19dea150 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -34,6 +34,8 @@  struct db_stmt_ring {
 	struct db_stmt *elems; /* Buffer containing `size` statements of
 				* `length` bytes */
 
+	struct db_stmt *stmt; /* Currently executing statement */
+
 	uint32_t size; /* No. of elements in ring buffer */
 	uint32_t used; /* No. of elements in ring buffer in use */
 	uint32_t length; /* Length of one element in ring buffer */
diff --git a/util/db.c b/util/db.c
index 487eaed26153..6cfbcbc16791 100644
--- a/util/db.c
+++ b/util/db.c
@@ -372,10 +372,15 @@  _stop_db(struct ulogd_pluginstance *upi)
 	}
 	if (di->ring.size > 0) {
 		pthread_cancel(di->ring.thread_id);
+
 		pthread_cond_destroy(&di->ring.cond);
 		pthread_mutex_destroy(&di->ring.mutex);
+
 		free(di->ring.elems);
+		free(di->ring.stmt);
+
 		di->ring.elems = NULL;
+		di->ring.stmt = NULL;
 	}
 }
 
@@ -737,14 +742,22 @@  _start_ring(struct ulogd_pluginstance *upi)
 		return 0;
 
 	/* allocate */
+
 	stmt_size = sizeof(*di->stmt) + di->stmt->size;
 	stmt_align = __alignof__(*di->stmt);
 	di->ring.length = stmt_size % stmt_align != 0
 		? (1 + stmt_size / stmt_align) * stmt_align
 		: stmt_size;
+
+	di->ring.stmt = malloc(di->ring.length);
+	if (di->ring.stmt == NULL)
+		return -1;
+
 	di->ring.elems = calloc(di->ring.size, di->ring.length);
-	if (di->ring.elems == NULL)
+	if (di->ring.elems == NULL) {
+		free(di->ring.stmt);
 		return -1;
+	}
 	di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0;
 	ulogd_log(ULOGD_NOTICE,
 		  "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n",
@@ -775,6 +788,7 @@  cond_error:
 	pthread_cond_destroy(&di->ring.cond);
 alloc_error:
 	free(di->ring.elems);
+	free(di->ring.stmt);
 
 	return -1;
 }
@@ -784,12 +798,14 @@  _add_to_ring(struct ulogd_pluginstance *upi)
 {
 	struct db_instance *di = (struct db_instance *) &upi->private;
 
+	pthread_mutex_lock(&di->ring.mutex);
+
 	if (di->ring.used == di->ring.size) {
 		if (!di->ring.full) {
 			ulogd_log(ULOGD_ERROR, "No place left in ring\n");
 			di->ring.full = 1;
 		}
-		return ULOGD_IRET_OK;
+		goto unlock_mutex;
 	}
 
 	if (di->ring.full) {
@@ -801,6 +817,9 @@  _add_to_ring(struct ulogd_pluginstance *upi)
 	_incr_ring_used(&di->ring, 1);
 
 	pthread_cond_signal(&di->ring.cond);
+unlock_mutex:
+	pthread_mutex_unlock(&di->ring.mutex);
+
 	return ULOGD_IRET_OK;
 }
 
@@ -809,27 +828,47 @@  _process_ring(void *arg)
 {
 	struct ulogd_pluginstance *upi = arg;
 	struct db_instance *di = (struct db_instance *) &upi->private;
+	struct db_stmt *stmt = di->ring.stmt;
 
 	pthread_mutex_lock(&di->ring.mutex);
+
 	while(1) {
-		/* wait cond */
+
 		pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
+
 		while (di->ring.used > 0) {
-			struct db_stmt *stmt = _get_ring_elem(&di->ring,
-							      di->ring.rd_idx);
-
-			if (di->driver->execute(upi, stmt) < 0) {
-				di->driver->close_db(upi);
-				while (di->driver->open_db(upi) < 0)
-					sleep(1);
-				/* try to re-run statement */
+
+			memcpy(stmt, _get_ring_elem(&di->ring, di->ring.rd_idx),
+			       di->ring.length);
+
+			pthread_mutex_unlock(&di->ring.mutex);
+
+exec_stmt:
+			if (di->driver->execute(upi, stmt) == 0) {
+
+				pthread_mutex_lock(&di->ring.mutex);
+
+				_incr_ring_used(&di->ring, -1);
+
 				continue;
+
 			}
 
-			_incr_ring_used(&di->ring, -1);
+			/* If the exec fails, close the DB connexion and try to
+			 * open it again.  Once the connexion is made, retry the
+			 * statement.
+			 */
+			di->driver->close_db(upi);
+			while (di->driver->open_db(upi) < 0)
+				sleep(1);
+			goto exec_stmt;
+
 		}
+
 	}
 
+	pthread_mutex_unlock(&di->ring.mutex);
+
 	return NULL;
 }