diff mbox series

Execute subprocess IPC in separate thread

Message ID 20201211135556.96421-1-sava.jakovljev@teufel.de
State Changes Requested
Headers show
Series Execute subprocess IPC in separate thread | expand

Commit Message

Sava Jakovljev Dec. 11, 2020, 1:55 p.m. UTC
Signed-off-by: Sava Jakovljev <sava.jakovljev@teufel.de>
---
Subprocesses may want to alter global SWUpdate state.
In order to do this, subprocess RPC must be handled with network thread 
being kept free, in order to avoid deadlocks. One currently exists in 
Suricatta activation IPC. 

 core/network_thread.c | 216 ++++++++++++++++++++++++++++--------------
 1 file changed, 143 insertions(+), 73 deletions(-)
diff mbox series

Patch

diff --git a/core/network_thread.c b/core/network_thread.c
index 0e6080d..f45589f 100644
--- a/core/network_thread.c
+++ b/core/network_thread.c
@@ -56,6 +56,20 @@  static unsigned long nrmsgs = 0;
 
 static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;
 
+struct subprocess_msg_elem {
+	ipc_message message;
+	int client;
+	SIMPLEQ_ENTRY(subprocess_msg_elem) next;
+};
+
+SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
+static struct subprocess_msglist subprocess_messages;
+
+static pthread_t subprocess_ipc_handler_thread_id;
+static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
+static unsigned long subprocess_messages_count = 0;
+
 static bool is_selection_allowed(const char *software_set, char *running_mode,
 				 struct dict const *acceptedlist)
 {
@@ -87,9 +101,9 @@  static bool is_selection_allowed(const char *software_set, char *running_mode,
 
 	if (allowed) {
 		INFO("Accepted selection %s,%s", software_set, running_mode);
-	}else 
+	}else
 		ERROR("Selection %s,%s is not allowed, rejected !",
-		      software_set, running_mode); 
+		      software_set, running_mode);
 	return allowed;
 }
 
@@ -239,6 +253,109 @@  static void unlink_socket(void)
 	unlink(get_ctrl_socket());
 }
 
+void send_subprocess_reply(struct subprocess_msg_elem *subprocess_msg)
+{
+	if (write(subprocess_msg->client, &subprocess_msg->message,
+			sizeof(subprocess_msg->message)) < 0)
+		printf("Error write on socket ctrl");
+}
+
+void handle_subprocess_ipc(struct subprocess_msg_elem *subprocess_msg)
+{
+	ipc_message *msg = &subprocess_msg->message;
+	int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
+	if (pipe < 0) {
+		ERROR("Cannot find channel for requested process");
+		msg->type = NACK;
+
+		return;
+	}
+
+	TRACE("Received Message for %s",
+		pctl_getname_from_type(msg->data.procmsg.source));
+	if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
+		ERROR("Pipe not available or closed: %d", pipe);
+		msg->type = NACK;
+
+		return;
+	}
+
+	/*
+	 * Cleanup the queue to be sure there are not
+	 * outstanding messages
+	 */
+	empty_pipe(pipe);
+
+	int ret = write(pipe, msg, sizeof(*msg));
+	if (ret != sizeof(*msg)) {
+		ERROR("Writing to pipe failed !");
+		msg->type = NACK;
+
+		return;
+	}
+
+	/*
+	 * Do not block forever for an answer
+	 * This would block the whole thread
+	 * If a message requires more time,
+	 * the destination process should sent an
+	 * answer back explaining this in the payload
+	 */
+	fd_set pipefds;
+	FD_ZERO(&pipefds);
+	FD_SET(pipe, &pipefds);
+
+	struct timeval tv;
+	tv.tv_usec = 0;
+	if (!msg->data.procmsg.timeout)
+		tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
+	else
+		tv.tv_sec = msg->data.procmsg.timeout;
+	ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
+
+	/*
+	 * If there is an error or timeout,
+	 * send a NACK back
+	 */
+	if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
+		msg->type = NACK;
+
+		return;
+	}
+
+	ret = read(pipe, msg, sizeof(*msg));
+	if (ret != sizeof(*msg)) {
+		ERROR("Reading from pipe failed !");
+		msg->type = NACK;
+	}
+}
+
+void *subprocess_thread (void *data)
+{
+	pthread_mutex_lock(&subprocess_msg_lock);
+
+	while(1) {
+		pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
+
+		while(subprocess_messages_count) {
+			struct subprocess_msg_elem *subprocess_msg;
+			subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
+			SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
+			--subprocess_messages_count;
+			pthread_mutex_unlock(&subprocess_msg_lock);
+
+			handle_subprocess_ipc(subprocess_msg);
+			send_subprocess_reply(subprocess_msg);
+			close(subprocess_msg->client);
+
+			free(subprocess_msg);
+			pthread_mutex_lock(&subprocess_msg_lock);
+		}
+	}
+
+	return NULL;
+}
+
 void *network_thread (void *data)
 {
 	struct installer *instp = (struct installer *)data;
@@ -249,9 +366,6 @@  void *network_thread (void *data)
 	int nread;
 	struct msg_elem *notification;
 	int ret;
-	int pipe;
-	fd_set pipefds;
-	struct timeval tv;
 	update_state_t value;
 
 	if (!instp) {
@@ -259,7 +373,10 @@  void *network_thread (void *data)
 		return (void *)0;
 	}
 
+	subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, NULL);
+
 	SIMPLEQ_INIT(&notifymsgs);
+	SIMPLEQ_INIT(&subprocess_messages);
 	register_notifier(network_notifier);
 
 	/* Initialize and bind to UDS */
@@ -287,7 +404,8 @@  void *network_thread (void *data)
 		nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));
 
 		if (nread != sizeof(msg)) {
-			TRACE("IPC message too short: fragmentation not supported");
+			TRACE("IPC message too short: fragmentation not supported (read %d bytes)",
+				nread);
 			close(ctrlconnfd);
 			continue;
 		}
@@ -309,72 +427,21 @@  void *network_thread (void *data)
 				}
 				break;
 			case SWUPDATE_SUBPROCESS:
-				/*
-				 *  this request is not for the installer,
-				 *  but for one of the subprocesses
-				 *  forward the request without checking
-				 *  the payload
-				 */
-
-				pipe = pctl_getfd_from_type(msg.data.procmsg.source);
-				if (pipe < 0) {
-					ERROR("Cannot find channel for requested process");
-					msg.type = NACK;
-					break;
-				}
-				TRACE("Received Message for %s",
-					pctl_getname_from_type(msg.data.procmsg.source));
-				if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
-					ERROR("Pipe not available or closed: %d", pipe);
-					msg.type = NACK;
-					break;
-				}
-
-				/*
-				 * Cleanup the queue to be sure there are not
-				 * outstanding messages
-				 */
-				empty_pipe(pipe);
-
-				ret = write(pipe, &msg, sizeof(msg));
-				if (ret != sizeof(msg)) {
-					ERROR("Writing to pipe failed !");
-					msg.type = NACK;
-					break;
-				}
-
-				/*
-				 * Do not block forever for an answer
-				 * This would block the whole thread
-				 * If a message requires more time,
-				 * the destination process should sent an
-				 * answer back explaining this in the payload
-				 */
-				FD_ZERO(&pipefds);
-				FD_SET(pipe, &pipefds);
-				tv.tv_usec = 0;
-				if (!msg.data.procmsg.timeout)
-					tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
-				else
-					tv.tv_sec = msg.data.procmsg.timeout;
-				ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
-
-				/*
-				 * If there is an error or timeout,
-				 * send a NACK back
-				 */
-				if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
-					msg.type = NACK;
+				struct subprocess_msg_elem *subprocess_msg =
+					(struct subprocess_msg_elem*)malloc(sizeof(struct subprocess_msg_elem));
+				if (subprocess_msg == NULL) {
+					ERROR("Cannot handle subprocess IPC because of OOM.");
 					break;
 				}
 
-				ret = read(pipe, &msg, sizeof(msg));
-				if (ret != sizeof(msg)) {
-					ERROR("Reading from pipe failed !");
-					msg.type = NACK;
-					break;
-				}
+				subprocess_msg->client = ctrlconnfd;
+				subprocess_msg->message = msg;
 
+				pthread_mutex_lock(&subprocess_msg_lock);
+				++subprocess_messages_count;
+				SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
+				pthread_cond_signal(&subprocess_wkup);
+				pthread_mutex_unlock(&subprocess_msg_lock);
 				/*
 				 * ACK/NACK was inserted by the called SUBPROCESS
 				 * It should not be touched here
@@ -453,12 +520,15 @@  void *network_thread (void *data)
 			msg.type = NACK;
 			sprintf(msg.data.msg, "Wrong request: aborting");
 		}
-		ret = write(ctrlconnfd, &msg, sizeof(msg));
-		if (ret < 0)
-			printf("Error write on socket ctrl");
 
-		if (msg.type != ACK)
-			close(ctrlconnfd);
+		if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) {
+			ret = write(ctrlconnfd, &msg, sizeof(msg));
+			if (ret < 0)
+				printf("Error write on socket ctrl");
+
+			if (msg.type != ACK)
+				close(ctrlconnfd);
+		}
 		pthread_mutex_unlock(&stream_mutex);
 	} while (1);
 	return (void *)0;