diff mbox series

[v4] Execute subprocess IPC in separate thread

Message ID 20201221115557.122133-1-sava.jakovljev@teufel.de
State Accepted
Headers show
Series [v4] Execute subprocess IPC in separate thread | expand

Commit Message

Sava Jakovljev Dec. 21, 2020, 11:55 a.m. UTC
Signed-off-by: Sava Jakovljev <sava.jakovljev@teufel.de>
---
Subprocesses may want to alter global SWUpdate state.
In order to do this, subprocess RPC must be handled with network thread
being kept free, in order to avoid deadlocks. One currently exists in
Suricatta activation IPC.

v3 of this patch replaces printf calls with swupdate logging macros.
Secondly, variable containing number of messages in the subprocess queue
has been removed, since there is no need for it and the predicate for
the corresponding condition variable has been modified to use SIMPLEQ_EMPTY
helper.
Last but not least, potential race during start-up has been resolved.

v4 of this patch declares argument to send_subprocess_reply a const.

 core/network_thread.c | 218 ++++++++++++++++++++++++++++--------------
 1 file changed, 145 insertions(+), 73 deletions(-)

--
2.26.2

Comments

Stefano Babic Dec. 23, 2020, noon UTC | #1
On 21.12.20 12:55, Sava Jakovljev wrote:
> Signed-off-by: Sava Jakovljev <sava.jakovljev@teufel.de>
> ---
> Subprocesses may want to alter global SWUpdate state.
> In order to do this, subprocess RPC must be handled with network thread
> being kept free, in order to avoid deadlocks. One currently exists in
> Suricatta activation IPC.
> 
> v3 of this patch replaces printf calls with swupdate logging macros.
> Secondly, variable containing number of messages in the subprocess queue
> has been removed, since there is no need for it and the predicate for
> the corresponding condition variable has been modified to use SIMPLEQ_EMPTY
> helper.
> Last but not least, potential race during start-up has been resolved.
> 
> v4 of this patch declares argument to send_subprocess_reply a const.
> 
>  core/network_thread.c | 218 ++++++++++++++++++++++++++++--------------
>  1 file changed, 145 insertions(+), 73 deletions(-)
> 
> diff --git a/core/network_thread.c b/core/network_thread.c
> index 0e6080d..76a7ca1 100644
> --- a/core/network_thread.c
> +++ b/core/network_thread.c
> @@ -56,6 +56,19 @@ static unsigned long nrmsgs = 0;
> 
>  static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;
> 
> +struct subprocess_msg_elem {
> +	ipc_message message;
> +	int client;
> +	SIMPLEQ_ENTRY(subprocess_msg_elem) next;
> +};
> +
> +SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
> +static struct subprocess_msglist subprocess_messages;
> +
> +static pthread_t subprocess_ipc_handler_thread_id;
> +static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
> +
>  static bool is_selection_allowed(const char *software_set, char *running_mode,
>  				 struct dict const *acceptedlist)
>  {
> @@ -239,6 +252,111 @@ static void unlink_socket(void)
>  	unlink(get_ctrl_socket());
>  }
> 
> +static void send_subprocess_reply(
> +		const struct subprocess_msg_elem *const subprocess_msg)
> +{
> +	if (write(subprocess_msg->client, &subprocess_msg->message,
> +			sizeof(subprocess_msg->message)) < 0)
> +		ERROR("Error write on socket ctrl");
> +}
> +
> +static void handle_subprocess_ipc(struct subprocess_msg_elem *subprocess_msg)
> +{
> +	ipc_message *msg = &subprocess_msg->message;
> +	int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
> +	if (pipe < 0) {
> +		ERROR("Cannot find channel for requested process");
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	TRACE("Received Message for %s",
> +		pctl_getname_from_type(msg->data.procmsg.source));
> +	if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> +		ERROR("Pipe not available or closed: %d", pipe);
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	/*
> +	 * Cleanup the queue to be sure there are not
> +	 * outstanding messages
> +	 */
> +	empty_pipe(pipe);
> +
> +	int ret = write(pipe, msg, sizeof(*msg));
> +	if (ret != sizeof(*msg)) {
> +		ERROR("Writing to pipe failed !");
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	/*
> +	 * Do not block forever for an answer
> +	 * This would block the whole thread
> +	 * If a message requires more time,
> +	 * the destination process should sent an
> +	 * answer back explaining this in the payload
> +	 */
> +	fd_set pipefds;
> +	FD_ZERO(&pipefds);
> +	FD_SET(pipe, &pipefds);
> +
> +	struct timeval tv;
> +	tv.tv_usec = 0;
> +	if (!msg->data.procmsg.timeout)
> +		tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> +	else
> +		tv.tv_sec = msg->data.procmsg.timeout;
> +	ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> +
> +	/*
> +	 * If there is an error or timeout,
> +	 * send a NACK back
> +	 */
> +	if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	ret = read(pipe, msg, sizeof(*msg));
> +	if (ret != sizeof(*msg)) {
> +		ERROR("Reading from pipe failed !");
> +		msg->type = NACK;
> +	}
> +}
> +
> +static void *subprocess_thread (void *data)
> +{
> +	(void)data;
> +	pthread_mutex_lock(&subprocess_msg_lock);
> +
> +	while(1) {
> +		while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
> +			struct subprocess_msg_elem *subprocess_msg;
> +			subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
> +			SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
> +
> +			pthread_mutex_unlock(&subprocess_msg_lock);
> +
> +			handle_subprocess_ipc(subprocess_msg);
> +			send_subprocess_reply(subprocess_msg);
> +			close(subprocess_msg->client);
> +
> +			free(subprocess_msg);
> +			pthread_mutex_lock(&subprocess_msg_lock);
> +		}
> +
> +		pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> +	}
> +
> +	return NULL;
> +}
> +
>  void *network_thread (void *data)
>  {
>  	struct installer *instp = (struct installer *)data;
> @@ -249,10 +367,8 @@ void *network_thread (void *data)
>  	int nread;
>  	struct msg_elem *notification;
>  	int ret;
> -	int pipe;
> -	fd_set pipefds;
> -	struct timeval tv;
>  	update_state_t value;
> +	struct subprocess_msg_elem *subprocess_msg;
> 
>  	if (!instp) {
>  		TRACE("Fatal error: Network thread aborting...");
> @@ -260,8 +376,11 @@ void *network_thread (void *data)
>  	}
> 
>  	SIMPLEQ_INIT(&notifymsgs);
> +	SIMPLEQ_INIT(&subprocess_messages);
>  	register_notifier(network_notifier);
> 
> +	subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, NULL);
> +
>  	/* Initialize and bind to UDS */
>  	ctrllisten = listener_create(get_ctrl_socket(), SOCK_STREAM);
>  	if (ctrllisten < 0 ) {
> @@ -287,7 +406,8 @@ void *network_thread (void *data)
>  		nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));
> 
>  		if (nread != sizeof(msg)) {
> -			TRACE("IPC message too short: fragmentation not supported");
> +			TRACE("IPC message too short: fragmentation not supported (read %d bytes)",
> +				nread);
>  			close(ctrlconnfd);
>  			continue;
>  		}
> @@ -309,72 +429,21 @@ void *network_thread (void *data)
>  				}
>  				break;
>  			case SWUPDATE_SUBPROCESS:
> -				/*
> -				 *  this request is not for the installer,
> -				 *  but for one of the subprocesses
> -				 *  forward the request without checking
> -				 *  the payload
> -				 */
> -
> -				pipe = pctl_getfd_from_type(msg.data.procmsg.source);
> -				if (pipe < 0) {
> -					ERROR("Cannot find channel for requested process");
> -					msg.type = NACK;
> -					break;
> -				}
> -				TRACE("Received Message for %s",
> -					pctl_getname_from_type(msg.data.procmsg.source));
> -				if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> -					ERROR("Pipe not available or closed: %d", pipe);
> -					msg.type = NACK;
> -					break;
> -				}
> -
> -				/*
> -				 * Cleanup the queue to be sure there are not
> -				 * outstanding messages
> -				 */
> -				empty_pipe(pipe);
> -
> -				ret = write(pipe, &msg, sizeof(msg));
> -				if (ret != sizeof(msg)) {
> -					ERROR("Writing to pipe failed !");
> -					msg.type = NACK;
> -					break;
> -				}
> -
> -				/*
> -				 * Do not block forever for an answer
> -				 * This would block the whole thread
> -				 * If a message requires more time,
> -				 * the destination process should sent an
> -				 * answer back explaining this in the payload
> -				 */
> -				FD_ZERO(&pipefds);
> -				FD_SET(pipe, &pipefds);
> -				tv.tv_usec = 0;
> -				if (!msg.data.procmsg.timeout)
> -					tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> -				else
> -					tv.tv_sec = msg.data.procmsg.timeout;
> -				ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> -
> -				/*
> -				 * If there is an error or timeout,
> -				 * send a NACK back
> -				 */
> -				if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> +				subprocess_msg = (struct subprocess_msg_elem*)malloc(
> +						sizeof(struct subprocess_msg_elem));
> +				if (subprocess_msg == NULL) {
> +					ERROR("Cannot handle subprocess IPC because of OOM.");
>  					msg.type = NACK;
>  					break;
>  				}
> 
> -				ret = read(pipe, &msg, sizeof(msg));
> -				if (ret != sizeof(msg)) {
> -					ERROR("Reading from pipe failed !");
> -					msg.type = NACK;
> -					break;
> -				}
> +				subprocess_msg->client = ctrlconnfd;
> +				subprocess_msg->message = msg;
> 
> +				pthread_mutex_lock(&subprocess_msg_lock);
> +				SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
> +				pthread_cond_signal(&subprocess_wkup);
> +				pthread_mutex_unlock(&subprocess_msg_lock);
>  				/*
>  				 * ACK/NACK was inserted by the called SUBPROCESS
>  				 * It should not be touched here
> @@ -423,7 +492,7 @@ void *network_thread (void *data)
>  					strncpy(msg.data.status.desc, notification->msg,
>  						sizeof(msg.data.status.desc) - 1);
>  #ifdef DEBUG_IPC
> -					printf("GET STATUS: %s\n", msg.data.status.desc);
> +					DEBUG("GET STATUS: %s\n", msg.data.status.desc);
>  #endif
>  					msg.data.status.current = notification->status;
>  					msg.data.status.error = notification->error;
> @@ -453,12 +522,15 @@ void *network_thread (void *data)
>  			msg.type = NACK;
>  			sprintf(msg.data.msg, "Wrong request: aborting");
>  		}
> -		ret = write(ctrlconnfd, &msg, sizeof(msg));
> -		if (ret < 0)
> -			printf("Error write on socket ctrl");
> 
> -		if (msg.type != ACK)
> -			close(ctrlconnfd);
> +		if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) {
> +			ret = write(ctrlconnfd, &msg, sizeof(msg));
> +			if (ret < 0)
> +				ERROR("Error write on socket ctrl");
> +
> +			if (msg.type != ACK)
> +				close(ctrlconnfd);
> +		}
>  		pthread_mutex_unlock(&stream_mutex);
>  	} while (1);
>  	return (void *)0;
> --
> 2.26.2
> 

Applied to -master, thanks !

Best regards,
Stefano Babic
diff mbox series

Patch

diff --git a/core/network_thread.c b/core/network_thread.c
index 0e6080d..76a7ca1 100644
--- a/core/network_thread.c
+++ b/core/network_thread.c
@@ -56,6 +56,19 @@  static unsigned long nrmsgs = 0;

 static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;

+struct subprocess_msg_elem {
+	ipc_message message;
+	int client;
+	SIMPLEQ_ENTRY(subprocess_msg_elem) next;
+};
+
+SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
+static struct subprocess_msglist subprocess_messages;
+
+static pthread_t subprocess_ipc_handler_thread_id;
+static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
+
 static bool is_selection_allowed(const char *software_set, char *running_mode,
 				 struct dict const *acceptedlist)
 {
@@ -239,6 +252,111 @@  static void unlink_socket(void)
 	unlink(get_ctrl_socket());
 }

+static void send_subprocess_reply(
+		const struct subprocess_msg_elem *const subprocess_msg)
+{
+	if (write(subprocess_msg->client, &subprocess_msg->message,
+			sizeof(subprocess_msg->message)) < 0)
+		ERROR("Error write on socket ctrl");
+}
+
+static void handle_subprocess_ipc(struct subprocess_msg_elem *subprocess_msg)
+{
+	ipc_message *msg = &subprocess_msg->message;
+	int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
+	if (pipe < 0) {
+		ERROR("Cannot find channel for requested process");
+		msg->type = NACK;
+
+		return;
+	}
+
+	TRACE("Received Message for %s",
+		pctl_getname_from_type(msg->data.procmsg.source));
+	if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
+		ERROR("Pipe not available or closed: %d", pipe);
+		msg->type = NACK;
+
+		return;
+	}
+
+	/*
+	 * Cleanup the queue to be sure there are not
+	 * outstanding messages
+	 */
+	empty_pipe(pipe);
+
+	int ret = write(pipe, msg, sizeof(*msg));
+	if (ret != sizeof(*msg)) {
+		ERROR("Writing to pipe failed !");
+		msg->type = NACK;
+
+		return;
+	}
+
+	/*
+	 * Do not block forever for an answer
+	 * This would block the whole thread
+	 * If a message requires more time,
+	 * the destination process should sent an
+	 * answer back explaining this in the payload
+	 */
+	fd_set pipefds;
+	FD_ZERO(&pipefds);
+	FD_SET(pipe, &pipefds);
+
+	struct timeval tv;
+	tv.tv_usec = 0;
+	if (!msg->data.procmsg.timeout)
+		tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
+	else
+		tv.tv_sec = msg->data.procmsg.timeout;
+	ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
+
+	/*
+	 * If there is an error or timeout,
+	 * send a NACK back
+	 */
+	if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
+		msg->type = NACK;
+
+		return;
+	}
+
+	ret = read(pipe, msg, sizeof(*msg));
+	if (ret != sizeof(*msg)) {
+		ERROR("Reading from pipe failed !");
+		msg->type = NACK;
+	}
+}
+
+static void *subprocess_thread (void *data)
+{
+	(void)data;
+	pthread_mutex_lock(&subprocess_msg_lock);
+
+	while(1) {
+		while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
+			struct subprocess_msg_elem *subprocess_msg;
+			subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
+			SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
+
+			pthread_mutex_unlock(&subprocess_msg_lock);
+
+			handle_subprocess_ipc(subprocess_msg);
+			send_subprocess_reply(subprocess_msg);
+			close(subprocess_msg->client);
+
+			free(subprocess_msg);
+			pthread_mutex_lock(&subprocess_msg_lock);
+		}
+
+		pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
+	}
+
+	return NULL;
+}
+
 void *network_thread (void *data)
 {
 	struct installer *instp = (struct installer *)data;
@@ -249,10 +367,8 @@  void *network_thread (void *data)
 	int nread;
 	struct msg_elem *notification;
 	int ret;
-	int pipe;
-	fd_set pipefds;
-	struct timeval tv;
 	update_state_t value;
+	struct subprocess_msg_elem *subprocess_msg;

 	if (!instp) {
 		TRACE("Fatal error: Network thread aborting...");
@@ -260,8 +376,11 @@  void *network_thread (void *data)
 	}

 	SIMPLEQ_INIT(&notifymsgs);
+	SIMPLEQ_INIT(&subprocess_messages);
 	register_notifier(network_notifier);

+	subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, NULL);
+
 	/* Initialize and bind to UDS */
 	ctrllisten = listener_create(get_ctrl_socket(), SOCK_STREAM);
 	if (ctrllisten < 0 ) {
@@ -287,7 +406,8 @@  void *network_thread (void *data)
 		nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));

 		if (nread != sizeof(msg)) {
-			TRACE("IPC message too short: fragmentation not supported");
+			TRACE("IPC message too short: fragmentation not supported (read %d bytes)",
+				nread);
 			close(ctrlconnfd);
 			continue;
 		}
@@ -309,72 +429,21 @@  void *network_thread (void *data)
 				}
 				break;
 			case SWUPDATE_SUBPROCESS:
-				/*
-				 *  this request is not for the installer,
-				 *  but for one of the subprocesses
-				 *  forward the request without checking
-				 *  the payload
-				 */
-
-				pipe = pctl_getfd_from_type(msg.data.procmsg.source);
-				if (pipe < 0) {
-					ERROR("Cannot find channel for requested process");
-					msg.type = NACK;
-					break;
-				}
-				TRACE("Received Message for %s",
-					pctl_getname_from_type(msg.data.procmsg.source));
-				if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
-					ERROR("Pipe not available or closed: %d", pipe);
-					msg.type = NACK;
-					break;
-				}
-
-				/*
-				 * Cleanup the queue to be sure there are not
-				 * outstanding messages
-				 */
-				empty_pipe(pipe);
-
-				ret = write(pipe, &msg, sizeof(msg));
-				if (ret != sizeof(msg)) {
-					ERROR("Writing to pipe failed !");
-					msg.type = NACK;
-					break;
-				}
-
-				/*
-				 * Do not block forever for an answer
-				 * This would block the whole thread
-				 * If a message requires more time,
-				 * the destination process should sent an
-				 * answer back explaining this in the payload
-				 */
-				FD_ZERO(&pipefds);
-				FD_SET(pipe, &pipefds);
-				tv.tv_usec = 0;
-				if (!msg.data.procmsg.timeout)
-					tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
-				else
-					tv.tv_sec = msg.data.procmsg.timeout;
-				ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
-
-				/*
-				 * If there is an error or timeout,
-				 * send a NACK back
-				 */
-				if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
+				subprocess_msg = (struct subprocess_msg_elem*)malloc(
+						sizeof(struct subprocess_msg_elem));
+				if (subprocess_msg == NULL) {
+					ERROR("Cannot handle subprocess IPC because of OOM.");
 					msg.type = NACK;
 					break;
 				}

-				ret = read(pipe, &msg, sizeof(msg));
-				if (ret != sizeof(msg)) {
-					ERROR("Reading from pipe failed !");
-					msg.type = NACK;
-					break;
-				}
+				subprocess_msg->client = ctrlconnfd;
+				subprocess_msg->message = msg;

+				pthread_mutex_lock(&subprocess_msg_lock);
+				SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
+				pthread_cond_signal(&subprocess_wkup);
+				pthread_mutex_unlock(&subprocess_msg_lock);
 				/*
 				 * ACK/NACK was inserted by the called SUBPROCESS
 				 * It should not be touched here
@@ -423,7 +492,7 @@  void *network_thread (void *data)
 					strncpy(msg.data.status.desc, notification->msg,
 						sizeof(msg.data.status.desc) - 1);
 #ifdef DEBUG_IPC
-					printf("GET STATUS: %s\n", msg.data.status.desc);
+					DEBUG("GET STATUS: %s\n", msg.data.status.desc);
 #endif
 					msg.data.status.current = notification->status;
 					msg.data.status.error = notification->error;
@@ -453,12 +522,15 @@  void *network_thread (void *data)
 			msg.type = NACK;
 			sprintf(msg.data.msg, "Wrong request: aborting");
 		}
-		ret = write(ctrlconnfd, &msg, sizeof(msg));
-		if (ret < 0)
-			printf("Error write on socket ctrl");

-		if (msg.type != ACK)
-			close(ctrlconnfd);
+		if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) {
+			ret = write(ctrlconnfd, &msg, sizeof(msg));
+			if (ret < 0)
+				ERROR("Error write on socket ctrl");
+
+			if (msg.type != ACK)
+				close(ctrlconnfd);
+		}
 		pthread_mutex_unlock(&stream_mutex);
 	} while (1);
 	return (void *)0;